Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public static void createTopic() throws Exception {
}

public static void publishMessages() throws Exception {
// [START publish]
// [START pubsub_publish]
TopicName topicName = TopicName.create("my-project-id", "my-topic-id");
Publisher publisher = null;
List<ApiFuture<String>> messageIdFutures = new ArrayList<>();
Expand All @@ -55,7 +55,6 @@ public static void publishMessages() throws Exception {
// schedule publishing one message at a time : messages get automatically batched
for (String message : messages) {
ByteString data = ByteString.copyFromUtf8(message);
// message data is converted to base64-encoding
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

// Once published, returns a server-assigned message id (unique within the topic)
Expand All @@ -75,7 +74,7 @@ public static void publishMessages() throws Exception {
publisher.shutdown();
}
}
// [END publish]
// [END pubsub_publish]
}

public static void main(String... args) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public MessageReceiverSnippets(BlockingQueue<PubsubMessage> blockingQueue) {
*/
// [TARGET receiveMessage(PubsubMessage, AckReplyConsumer)]
public MessageReceiver messageReceiver() {
// [START receiveMessage]
MessageReceiver receiver = new MessageReceiver() {
public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
if (blockingQueue.offer(message)) {
Expand All @@ -56,7 +55,6 @@ public void receiveMessage(final PubsubMessage message, final AckReplyConsumer c
}
}
};
// [END receiveMessage]
return receiver;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public static void newBuilder(String projectId, String topicId) throws Exception
}

public Publisher getPublisherWithCustomBatchSettings(TopicName topicName) throws Exception {
// [START publisherBatchSettings]
// [START pubsub_publisher_batch_settings]
// Batch settings control how the publisher batches messages
long requestBytesThreshold = 5000L; // default : 1kb
long messageCountBatchSize = 10L; // default : 100
Expand All @@ -102,12 +102,12 @@ public Publisher getPublisherWithCustomBatchSettings(TopicName topicName) throws

Publisher publisher = Publisher.defaultBuilder(topicName)
.setBatchingSettings(batchingSettings).build();
// [END publisherBatchSettings]
// [END pubsub_publisher_batch_settings]
return publisher;
}

public Publisher getPublisherWithCustomRetrySettings(TopicName topicName) throws Exception {
// [START publisherRetrySettings]
// [START pubsub_publisher_retry_settings]
// Retry settings control how the publisher handles retryable failures
Duration retryDelay = Duration.ofMillis(100); // default : 1 ms
double retryDelayMultiplier = 2.0; // back off for repeated failures
Expand All @@ -121,12 +121,12 @@ public Publisher getPublisherWithCustomRetrySettings(TopicName topicName) throws

Publisher publisher = Publisher.defaultBuilder(topicName)
.setRetrySettings(retrySettings).build();
// [END publisherRetrySettings]
// [END pubsub_publisher_retry_settings]
return publisher;
}

public Publisher getPublisherWithCustomFlowControlSettings(TopicName topicName) throws Exception {
// [START publisherFlowControlSettings]
// [START pubsub_publisher_flow_settings]

// Flow control settings restrict the number of outstanding publish requests
int maxOutstandingBatches = 20;
Expand All @@ -143,23 +143,23 @@ public Publisher getPublisherWithCustomFlowControlSettings(TopicName topicName)

Publisher publisher = Publisher.defaultBuilder(topicName)
.setFlowControlSettings(flowControlSettings).build();
// [END publisherFlowControlSettings]
// [END pubsub_publisher_flow_settings]
return publisher;
}

public Publisher getSingleThreadedPublisher(TopicName topicName) throws Exception {
// [START singleThreadedPublisher]
// [START pubsub_publisher_single_threaded]
// create a publisher with a single threaded executor
ExecutorProvider executorProvider = InstantiatingExecutorProvider.newBuilder()
.setExecutorThreadCount(1).build();
Publisher publisher = Publisher.defaultBuilder(topicName)
.setExecutorProvider(executorProvider).build();
// [END singleThreadedPublisher]
// [END pubsub_publisher_single_threaded]
return publisher;
}

private Publisher createPublisherWithCustomCredentials(TopicName topicName) throws Exception {
// [START publisherWithCustomCredentials]
// [START pubsub_publisher_custom_credentials]
// read service account credentials from file
CredentialsProvider credentialsProvider =
FixedCredentialsProvider.create(
Expand All @@ -171,7 +171,7 @@ private Publisher createPublisherWithCustomCredentials(TopicName topicName) thro
Publisher publisher = Publisher.defaultBuilder(topicName)
.setChannelProvider(channelProvider)
.build();
// [END publisherWithCustomCredentials]
// [END pubsub_publisher_custom_credentials]
return publisher;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public SubscriberSnippets(

// [TARGET startAsync()]
public void startAndWait() throws Exception {
// [START startAsync]
Subscriber subscriber = Subscriber.defaultBuilder(subscriptionName, receiver).build();
subscriber.addListener(new Subscriber.Listener() {
public void failed(Subscriber.State from, Throwable failure) {
Expand All @@ -73,11 +72,10 @@ public void failed(Subscriber.State from, Throwable failure) {
// Wait for a stop signal.
done.get();
subscriber.stopAsync().awaitTerminated();
// [END startAsync]
}

private void createSubscriber() throws Exception {
// [START pullSubscriber]
// [START pubsub_pull]
String projectId = "my-project-id";
String subscriptionId = "my-subscription-id";

Expand Down Expand Up @@ -105,35 +103,35 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
subscriber.stopAsync();
}
}
// [END pullSubscriber]
// [END pubsub_pull]
}

private Subscriber createSubscriberWithErrorListener(Subscriber subscriber) throws Exception {
// [START subscriberWithErrorListener]
// [START pubsub_subscriber_error_listener]
subscriber.addListener(new Subscriber.Listener() {
public void failed(Subscriber.State from, Throwable failure) {
// Handle error.
}
}, MoreExecutors.directExecutor());
// [END subscriberWithErrorListener]
// [END pubsub_subscriber_error_listener]
return subscriber;
}

private Subscriber createSingleThreadedSubscriber() throws Exception {
// [START singleThreadedSubscriber]
// [START pubsub_subscriber_single_threaded]
// provide a separate executor service for polling
ExecutorProvider executorProvider = InstantiatingExecutorProvider.newBuilder()
.setExecutorThreadCount(1).build();

Subscriber subscriber = Subscriber.defaultBuilder(subscriptionName, receiver)
.setExecutorProvider(executorProvider)
.build();
// [END singleThreadedSubscriber]
// [END pubsub_subscriber_single_threaded]
return subscriber;
}

private Subscriber createSubscriberWithCustomFlowSettings() throws Exception {
// [START subscriberWithCustomFlow]
// [START pubsub_subscriber_flow_settings]
int maxMessageCount = 10;
// Configure max number of messages to be pulled
FlowControlSettings flowControlSettings = FlowControlSettings.newBuilder()
Expand All @@ -142,12 +140,12 @@ private Subscriber createSubscriberWithCustomFlowSettings() throws Exception {
Subscriber subscriber = Subscriber.defaultBuilder(subscriptionName, receiver)
.setFlowControlSettings(flowControlSettings)
.build();
// [END subscriberWithCustomFlow]
// [END pubsub_subscriber_flow_settings]
return subscriber;
}

private Subscriber createSubscriberWithCustomCredentials() throws Exception {
// [START subscriberWithCustomCredentials]
// [START pubsub_subscriber_custom_credentials]
CredentialsProvider credentialsProvider =
FixedCredentialsProvider
.create(ServiceAccountCredentials.fromStream(
Expand All @@ -159,7 +157,7 @@ private Subscriber createSubscriberWithCustomCredentials() throws Exception {
Subscriber subscriber = Subscriber.defaultBuilder(subscriptionName, receiver)
.setChannelProvider(channelProvider)
.build();
// [START subscriberWithCustomCredentials]
// [START pubsub_subscriber_custom_credentials]
return subscriber;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public String getProjectId() {

/** Example of creating a pull subscription for a topic. */
public Subscription createSubscription(String topicId, String subscriptionId) throws Exception {
// [START createPullSubscription]
// [START pubsub_create_pull_subscription]
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
// eg. projectId = "my-test-project", topicId = "my-test-topic"
TopicName topicName = TopicName.create(projectId, topicId);
Expand All @@ -67,14 +67,14 @@ public Subscription createSubscription(String topicId, String subscriptionId) th
subscriptionName, topicName, PushConfig.getDefaultInstance(), 0);
return subscription;
}
// [END createPullSubscription]
// [END pubsub_create_pull_subscription]
}

/** Example of creating a subscription with a push endpoint. */
public Subscription createSubscriptionWithPushEndpoint(String topicId, String subscriptionId,
String endpoint)
throws Exception {
// [START createPushSubscription]
// [START pubsub_create_push_subscription]
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
TopicName topicName = TopicName.create(projectId, topicId);
SubscriptionName subscriptionName =
Expand All @@ -91,23 +91,23 @@ public Subscription createSubscriptionWithPushEndpoint(String topicId, String su
subscriptionName, topicName, pushConfig, ackDeadlineInSeconds);
return subscription;
}
// [END createPushSubscription]
// [END pubsub_create_push_subscription]
}

/** Example of replacing the push configuration of a subscription, setting the push endpoint. */
public void replacePushConfig(String subscriptionId, String endpoint) throws Exception {
// [START replacePushConfig]
// [START pubsub_update_push_configuration]
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
SubscriptionName subscriptionName = SubscriptionName.create(projectId, subscriptionId);
PushConfig pushConfig = PushConfig.newBuilder().setPushEndpoint(endpoint).build();
subscriptionAdminClient.modifyPushConfig(subscriptionName, pushConfig);
}
// [END replacePushConfig]
// [END pubsub_update_push_configuration]
}

/** Example of listing subscriptions. */
public ListSubscriptionsPagedResponse listSubscriptions() throws Exception {
// [START listSubscriptions]
// [START pubsub_list_subscriptions]
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
ListSubscriptionsRequest listSubscriptionsRequest =
ListSubscriptionsRequest.newBuilder()
Expand All @@ -121,23 +121,23 @@ public ListSubscriptionsPagedResponse listSubscriptions() throws Exception {
}
return response;
}
// [END listSubscriptions]
// [END pubsub_list_subscriptions]
}

/** Example of deleting a subscription. */
public SubscriptionName deleteSubscription(String subscriptionId) throws Exception {
// [START deleteSubscription]
// [START pubsub_delete_subscription]
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
SubscriptionName subscriptionName = SubscriptionName.create(projectId, subscriptionId);
subscriptionAdminClient.deleteSubscription(subscriptionName);
return subscriptionName;
}
// [END deleteSubscription]
// [END pubsub_delete_subscription]
}

/** Example of getting a subscription policy. */
public Policy getSubscriptionPolicy(String subscriptionId) throws Exception {
// [START getSubscriptionPolicy]
// [START pubsub_get_subscription_policy]
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
SubscriptionName subscriptionName = SubscriptionName.create(projectId, subscriptionId);
Policy policy = subscriptionAdminClient.getIamPolicy(subscriptionName.toString());
Expand All @@ -146,12 +146,12 @@ public Policy getSubscriptionPolicy(String subscriptionId) throws Exception {
}
return policy;
}
// [END getSubscriptionPolicy]
// [END pubsub_get_subscription_policy]
}

/** Example of replacing a subscription policy. */
public Policy replaceSubscriptionPolicy(String subscriptionId) throws Exception {
// [START replaceSubscriptionPolicy]
// [START pubsub_set_subscription_policy]
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
SubscriptionName subscriptionName = SubscriptionName.create(projectId, subscriptionId);
Policy policy = subscriptionAdminClient.getIamPolicy(subscriptionName.toString());
Expand All @@ -167,13 +167,13 @@ public Policy replaceSubscriptionPolicy(String subscriptionId) throws Exception
updatedPolicy = subscriptionAdminClient.setIamPolicy(subscriptionName.toString(), updatedPolicy);
return updatedPolicy;
}
// [END replaceSubscriptionPolicy]
// [END pubsub_set_subscription_policy]
}

/** Example of testing whether the caller has the provided permissions on a subscription. */
public TestIamPermissionsResponse testSubscriptionPermissions(String subscriptionId)
throws Exception {
// [START testSubscriptionPermissions]
// [START pubsub_test_subscription_permissions]
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
List<String> permissions = new LinkedList<>();
permissions.add("pubsub.subscriptions.get");
Expand All @@ -182,17 +182,17 @@ public TestIamPermissionsResponse testSubscriptionPermissions(String subscriptio
topicAdminClient.testIamPermissions(subscriptionName.toString(), permissions);
return testedPermissions;
}
// [END testSubscriptionPermissions]
// [END pubsub_test_subscription_permissions]
}

/** Example of getting a subscription. */
public Subscription getSubscription(String subscriptionId) throws Exception {
// [START getSubscription]
// [START pubsub_get_subscription]
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
SubscriptionName subscriptionName = SubscriptionName.create(projectId, subscriptionId);
Subscription subscription = subscriptionAdminClient.getSubscription(subscriptionName);
return subscription;
}
// [END getSubscription]
// [END pubsub_get_subscription]
}
}
Loading