Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
new Subscriber.SubscriberListener() {
@Override
public void failed(Subscriber.State from, Throwable failure) {
// Handle failure. This is called when the Subscriber encountered a fatal error and is shutting down.
System.err.println(failure);
}
},
Expand Down
99 changes: 66 additions & 33 deletions google-cloud-pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,7 @@ To make authenticated requests to Google Cloud Pub/Sub, you must create a servic
credentials. You can then make API calls by calling methods on the Pub/Sub service object. The
simplest way to authenticate is to use
[Application Default Credentials](https://developers.google.com/identity/protocols/application-default-credentials).
These credentials are automatically inferred from your environment, so you only need the following
code to create your service object:

```java
import com.google.cloud.pubsub.PubSub;
import com.google.cloud.pubsub.PubSubOptions;

try(PubSub pubsub = PubSubOptions.getDefaultInstance().getService()) {
// use pubsub here
}
```
These credentials are automatically inferred from your environment.

For other authentication options, see the
[Authentication](https://github.com/GoogleCloudPlatform/google-cloud-java#authentication) page.
Expand All @@ -105,65 +95,108 @@ With Pub/Sub you can create topics. A topic is a named resource to which message
publishers. Add the following imports at the top of your file:

```java
import com.google.cloud.pubsub.Topic;
import com.google.cloud.pubsub.TopicInfo;
import com.google.cloud.pubsub.spi.v1.PublisherClient;
import com.google.pubsub.v1.TopicName;
```
Then, to create the topic, use the following code:

```java
Topic topic = pubsub.create(TopicInfo.of("test-topic"));
TopicName topic = TopicName.create("test-project", "test-topic");
try (PublisherClient publisherClient = PublisherClient.create()) {
publisherClient.createTopic(topic);
}
```

#### Publishing messages
With Pub/Sub you can publish messages to a topic. Add the following import at the top of your file:

```java
import com.google.cloud.pubsub.Message;
import com.google.api.gax.core.RpcFuture;
import com.google.cloud.pubsub.spi.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
```
Then, to publish messages asynchronously, use the following code:

```java
Message message1 = Message.of("First message");
Message message2 = Message.of("Second message");
topic.publishAsync(message1, message2);
Publisher publisher = null;
try {
publisher = Publisher.newBuilder(topic).build();
ByteString data = ByteString.copyFromUtf8("my-message");
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
RpcFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
} finally {
if (publisher != null) {
publisher.shutdown();
}
}
```

#### Creating a subscription
With Pub/Sub you can create subscriptions. A subscription represents the stream of messages from a
single, specific topic. Add the following imports at the top of your file:

```java
import com.google.cloud.pubsub.Subscription;
import com.google.cloud.pubsub.SubscriptionInfo;
import com.google.cloud.pubsub.spi.v1.SubscriberClient;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
```
Then, to create the subscription, use the following code:

```java
Subscription subscription =
pubsub.create(SubscriptionInfo.of("test-topic", "test-subscription"));
TopicName topic = TopicName.create("test-project", "test-topic");
SubscriptionName subscription = SubscriptionName.create("test-project", "test-subscription");

try (SubscriberClient subscriberClient = SubscriberClient.create()) {
subscriberClient.createSubscription(subscription, topic, PushConfig.getDefaultInstance(), 0);
}
```

#### Pulling messages
With Pub/Sub you can pull messages from a subscription. Add the following imports at the top of your
file:

```java
import com.google.cloud.pubsub.Message;
import com.google.cloud.pubsub.PubSub.MessageConsumer;
import com.google.cloud.pubsub.PubSub.MessageProcessor;
import com.google.cloud.pubsub.spi.v1.AckReply;
import com.google.cloud.pubsub.spi.v1.AckReplyConsumer;
import com.google.cloud.pubsub.spi.v1.MessageReceiver;
import com.google.cloud.pubsub.spi.v1.Subscriber;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
```
Then, to pull messages asynchronously, use the following code:

```java
MessageProcessor callback = new MessageProcessor() {
@Override
public void process(Message message) throws Exception {
System.out.printf("Received message \"%s\"%n", message.getPayloadAsString());
MessageReceiver receiver =
new MessageReceiver() {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
System.out.println("got message: " + message.getData().toStringUtf8());
consumer.accept(AckReply.ACK, null);
}
};
Subscriber subscriber = null;
try {
subscriber = Subscriber.newBuilder(subscription, receiver).build();
subscriber.addListener(
new Subscriber.SubscriberListener() {
@Override
public void failed(Subscriber.State from, Throwable failure) {
// Handle failure. This is called when the Subscriber encountered a fatal error and is shutting down.
System.err.println(failure);
}
},
MoreExecutors.directExecutor());
subscriber.startAsync().awaitRunning();
// Pull messages for 60 seconds.
Thread.sleep(60000);
} finally {
if (subscriber != null) {
subscriber.stopAsync();
}
};
// Create a message consumer and pull messages (for 60 seconds)
try (MessageConsumer consumer = subscription.pullAsync(callback)) {
Thread.sleep(60_000);
}
```
#### Complete source code
Expand Down