diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 8d149d64ac49..e9ab3ed26751 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -85,6 +85,9 @@ public class Publisher { private static final Logger logger = Logger.getLogger(Publisher.class.getName()); + @InternalApi static final Duration MIN_TOTAL_TIMEOUT = Duration.ofSeconds(10); + @InternalApi static final Duration MIN_RPC_TIMEOUT = Duration.ofMillis(10); + private final TopicName topicName; private final String cachedTopicNameString; @@ -123,9 +126,14 @@ private Publisher(Builder builder) throws IOException { cachedTopicNameString = topicName.toString(); this.batchingSettings = builder.batchingSettings; - this.retrySettings = builder.retrySettings; + this.retrySettings = builder.retrySettings.build(); this.longRandom = builder.longRandom; + Preconditions.checkArgument( + this.retrySettings.getTotalTimeout().compareTo(MIN_TOTAL_TIMEOUT) >= 0); + Preconditions.checkArgument( + this.retrySettings.getInitialRpcTimeout().compareTo(MIN_RPC_TIMEOUT) >= 0); + messagesBatch = new LinkedList<>(); messagesBatchLock = new ReentrantLock(); activeAlarm = new AtomicBoolean(false); @@ -539,9 +547,6 @@ public static Builder newBuilder(TopicName topicName) { /** A builder of {@link Publisher}s. */ public static final class Builder { - static final Duration MIN_TOTAL_TIMEOUT = Duration.ofSeconds(10); - static final Duration MIN_RPC_TIMEOUT = Duration.ofMillis(10); - // Meaningful defaults. static final long DEFAULT_ELEMENT_COUNT_THRESHOLD = 100L; static final long DEFAULT_REQUEST_BYTES_THRESHOLD = 1000L; // 1 kB @@ -583,7 +588,7 @@ public long nextLong(long least, long bound) { // Batching options BatchingSettings batchingSettings = DEFAULT_BATCHING_SETTINGS; - RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS; + RetrySettings.Builder retrySettings = DEFAULT_RETRY_SETTINGS.toBuilder(); LongRandom longRandom = DEFAULT_LONG_RANDOM; TransportChannelProvider channelProvider = @@ -635,16 +640,23 @@ public Builder setBatchingSettings(BatchingSettings batchingSettings) { return this; } - /** Configures the Publisher's retry parameters. */ + /** + * Configures the Publisher's retry parameters. + * + * @deprecated {@link #retrySettingsBuilder()} is more reliable as it would be impossible to + * miss fields. + */ + @Deprecated public Builder setRetrySettings(RetrySettings retrySettings) { - Preconditions.checkArgument( - retrySettings.getTotalTimeout().compareTo(MIN_TOTAL_TIMEOUT) >= 0); - Preconditions.checkArgument( - retrySettings.getInitialRpcTimeout().compareTo(MIN_RPC_TIMEOUT) >= 0); - this.retrySettings = retrySettings; + this.retrySettings = retrySettings.toBuilder(); return this; } + /** Configures the Publisher's retry parameters. */ + public RetrySettings.Builder retrySettingsBuilder() { + return this.retrySettings; + } + @InternalApi Builder setLongRandom(LongRandom longRandom) { this.longRandom = Preconditions.checkNotNull(longRandom); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 7b92f7af63f6..801538132fb3 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -16,6 +16,12 @@ package com.google.cloud.pubsub.v1; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import com.google.api.core.ApiFuture; import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.core.ExecutorProvider; @@ -35,6 +41,10 @@ import io.grpc.StatusException; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -42,17 +52,6 @@ import org.junit.runners.JUnit4; import org.threeten.bp.Duration; -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledExecutorService; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - @RunWith(JUnit4.class) public class PublisherImplTest { @@ -482,12 +481,12 @@ public void testBuilderParametersAndDefaults() { assertEquals( Publisher.Builder.DEFAULT_ELEMENT_COUNT_THRESHOLD, builder.batchingSettings.getElementCountThreshold().longValue()); - assertEquals(Publisher.Builder.DEFAULT_RETRY_SETTINGS, builder.retrySettings); + assertEquals(Publisher.Builder.DEFAULT_RETRY_SETTINGS, builder.retrySettings.build()); } @Test - public void testBuilderInvalidArguments() { - Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC); + public void testBuilderInvalidArguments() throws Exception { + Publisher.Builder builder = getTestPublisherBuilder(); try { builder.setChannelProvider(null); @@ -592,32 +591,62 @@ public void testBuilderInvalidArguments() { // Expected } - builder.setRetrySettings( - Publisher.Builder.DEFAULT_RETRY_SETTINGS - .toBuilder() - .setInitialRpcTimeout(Publisher.Builder.MIN_RPC_TIMEOUT) - .build()); + // Remember to shutdown so we don't leak resources. + builder + .setRetrySettings( + Publisher.Builder.DEFAULT_RETRY_SETTINGS + .toBuilder() + .setInitialRpcTimeout(Publisher.MIN_RPC_TIMEOUT) + .build()) + .build() + .shutdown(); try { - builder.setRetrySettings( - Publisher.Builder.DEFAULT_RETRY_SETTINGS - .toBuilder() - .setInitialRpcTimeout(Publisher.Builder.MIN_RPC_TIMEOUT.minusMillis(1)) - .build()); + builder + .setRetrySettings( + Publisher.Builder.DEFAULT_RETRY_SETTINGS + .toBuilder() + .setInitialRpcTimeout(Publisher.MIN_RPC_TIMEOUT.minusMillis(1)) + .build()) + .build() + .shutdown(); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { // Expected } - builder.setRetrySettings( - Publisher.Builder.DEFAULT_RETRY_SETTINGS - .toBuilder() - .setTotalTimeout(Publisher.Builder.MIN_TOTAL_TIMEOUT) - .build()); try { - builder.setRetrySettings( - Publisher.Builder.DEFAULT_RETRY_SETTINGS - .toBuilder() - .setTotalTimeout(Publisher.Builder.MIN_TOTAL_TIMEOUT.minusMillis(1)) - .build()); + Publisher.Builder builder2 = getTestPublisherBuilder(); + builder2.retrySettingsBuilder().setInitialRpcTimeout(Publisher.MIN_RPC_TIMEOUT.minusMillis(1)); + builder2.build().shutdown(); + fail("Should have thrown an IllegalArgumentException"); + } catch (IllegalArgumentException expected) { + // Expected + } + + builder + .setRetrySettings( + Publisher.Builder.DEFAULT_RETRY_SETTINGS + .toBuilder() + .setTotalTimeout(Publisher.MIN_TOTAL_TIMEOUT) + .build()) + .build() + .shutdown(); + try { + builder + .setRetrySettings( + Publisher.Builder.DEFAULT_RETRY_SETTINGS + .toBuilder() + .setTotalTimeout(Publisher.MIN_TOTAL_TIMEOUT.minusMillis(1)) + .build()) + .build() + .shutdown(); + fail("Should have thrown an IllegalArgumentException"); + } catch (IllegalArgumentException expected) { + // Expected + } + try { + Publisher.Builder builder2 = getTestPublisherBuilder(); + builder2.retrySettingsBuilder().setTotalTimeout(Publisher.MIN_TOTAL_TIMEOUT.minusMillis(1)); + builder2.build().shutdown(); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { // Expected