diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java index 7c013dd5a08f..ef55f6a7ffee 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java @@ -56,6 +56,7 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { new Subscriber.SubscriberListener() { @Override public void failed(Subscriber.State from, Throwable failure) { + // Handle failure. This is called when the Subscriber encountered a fatal error and is shutting down. System.err.println(failure); } }, diff --git a/google-cloud-pubsub/README.md b/google-cloud-pubsub/README.md index 2b116eb323d0..000c429ddf7d 100644 --- a/google-cloud-pubsub/README.md +++ b/google-cloud-pubsub/README.md @@ -85,17 +85,7 @@ To make authenticated requests to Google Cloud Pub/Sub, you must create a servic credentials. You can then make API calls by calling methods on the Pub/Sub service object. The simplest way to authenticate is to use [Application Default Credentials](https://developers.google.com/identity/protocols/application-default-credentials). -These credentials are automatically inferred from your environment, so you only need the following -code to create your service object: - -```java -import com.google.cloud.pubsub.PubSub; -import com.google.cloud.pubsub.PubSubOptions; - -try(PubSub pubsub = PubSubOptions.getDefaultInstance().getService()) { - // use pubsub here -} -``` +These credentials are automatically inferred from your environment. For other authentication options, see the [Authentication](https://github.com/GoogleCloudPlatform/google-cloud-java#authentication) page. @@ -105,27 +95,41 @@ With Pub/Sub you can create topics. A topic is a named resource to which message publishers. Add the following imports at the top of your file: ```java -import com.google.cloud.pubsub.Topic; -import com.google.cloud.pubsub.TopicInfo; +import com.google.cloud.pubsub.spi.v1.PublisherClient; +import com.google.pubsub.v1.TopicName; ``` Then, to create the topic, use the following code: ```java -Topic topic = pubsub.create(TopicInfo.of("test-topic")); +TopicName topic = TopicName.create("test-project", "test-topic"); +try (PublisherClient publisherClient = PublisherClient.create()) { + publisherClient.createTopic(topic); +} ``` #### Publishing messages With Pub/Sub you can publish messages to a topic. Add the following import at the top of your file: ```java -import com.google.cloud.pubsub.Message; +import com.google.api.gax.core.RpcFuture; +import com.google.cloud.pubsub.spi.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; ``` Then, to publish messages asynchronously, use the following code: ```java -Message message1 = Message.of("First message"); -Message message2 = Message.of("Second message"); -topic.publishAsync(message1, message2); +Publisher publisher = null; +try { + publisher = Publisher.newBuilder(topic).build(); + ByteString data = ByteString.copyFromUtf8("my-message"); + PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); + RpcFuture messageIdFuture = publisher.publish(pubsubMessage); +} finally { + if (publisher != null) { + publisher.shutdown(); + } +} ``` #### Creating a subscription @@ -133,14 +137,20 @@ With Pub/Sub you can create subscriptions. A subscription represents the stream single, specific topic. Add the following imports at the top of your file: ```java -import com.google.cloud.pubsub.Subscription; -import com.google.cloud.pubsub.SubscriptionInfo; +import com.google.cloud.pubsub.spi.v1.SubscriberClient; +import com.google.pubsub.v1.PushConfig; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.TopicName; ``` Then, to create the subscription, use the following code: ```java -Subscription subscription = - pubsub.create(SubscriptionInfo.of("test-topic", "test-subscription")); +TopicName topic = TopicName.create("test-project", "test-topic"); +SubscriptionName subscription = SubscriptionName.create("test-project", "test-subscription"); + +try (SubscriberClient subscriberClient = SubscriberClient.create()) { + subscriberClient.createSubscription(subscription, topic, PushConfig.getDefaultInstance(), 0); +} ``` #### Pulling messages @@ -148,22 +158,45 @@ With Pub/Sub you can pull messages from a subscription. Add the following import file: ```java -import com.google.cloud.pubsub.Message; -import com.google.cloud.pubsub.PubSub.MessageConsumer; -import com.google.cloud.pubsub.PubSub.MessageProcessor; +import com.google.cloud.pubsub.spi.v1.AckReply; +import com.google.cloud.pubsub.spi.v1.AckReplyConsumer; +import com.google.cloud.pubsub.spi.v1.MessageReceiver; +import com.google.cloud.pubsub.spi.v1.Subscriber; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.TopicName; ``` Then, to pull messages asynchronously, use the following code: ```java -MessageProcessor callback = new MessageProcessor() { - @Override - public void process(Message message) throws Exception { - System.out.printf("Received message \"%s\"%n", message.getPayloadAsString()); +MessageReceiver receiver = + new MessageReceiver() { + @Override + public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { + System.out.println("got message: " + message.getData().toStringUtf8()); + consumer.accept(AckReply.ACK, null); + } + }; +Subscriber subscriber = null; +try { + subscriber = Subscriber.newBuilder(subscription, receiver).build(); + subscriber.addListener( + new Subscriber.SubscriberListener() { + @Override + public void failed(Subscriber.State from, Throwable failure) { + // Handle failure. This is called when the Subscriber encountered a fatal error and is shutting down. + System.err.println(failure); + } + }, + MoreExecutors.directExecutor()); + subscriber.startAsync().awaitRunning(); + // Pull messages for 60 seconds. + Thread.sleep(60000); +} finally { + if (subscriber != null) { + subscriber.stopAsync(); } -}; -// Create a message consumer and pull messages (for 60 seconds) -try (MessageConsumer consumer = subscription.pullAsync(callback)) { - Thread.sleep(60_000); } ``` #### Complete source code