diff --git a/sdk/src/main/java/com/microsoft/durabletask/DataConverter.java b/sdk/src/main/java/com/microsoft/durabletask/DataConverter.java index 7abfa66e..0989ad49 100644 --- a/sdk/src/main/java/com/microsoft/durabletask/DataConverter.java +++ b/sdk/src/main/java/com/microsoft/durabletask/DataConverter.java @@ -4,12 +4,16 @@ import com.google.protobuf.Timestamp; +import javax.annotation.Nullable; import java.time.Instant; import java.time.temporal.ChronoUnit; public interface DataConverter { - String serialize(Object value); - T deserialize(String data, Class target) throws DataConverterException; + @Nullable + String serialize(@Nullable Object value); + + @Nullable + T deserialize(@Nullable String data, Class target) throws DataConverterException; default DataConverterException wrapConverterException(String message, Throwable cause) { return new DataConverterException(message, cause); diff --git a/sdk/src/main/java/com/microsoft/durabletask/FailureDetails.java b/sdk/src/main/java/com/microsoft/durabletask/FailureDetails.java index af67bad8..20bc6e46 100644 --- a/sdk/src/main/java/com/microsoft/durabletask/FailureDetails.java +++ b/sdk/src/main/java/com/microsoft/durabletask/FailureDetails.java @@ -5,6 +5,10 @@ import com.google.protobuf.StringValue; import com.microsoft.durabletask.protobuf.OrchestratorService.TaskFailureDetails; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Objects; + class FailureDetails { private final String errorType; private final String errorMessage; @@ -12,11 +16,13 @@ class FailureDetails { public FailureDetails( String errorType, - String errorMessage, - String errorDetails) { + @Nullable String errorMessage, + @Nullable String errorDetails) { this.errorType = errorType; - this.errorMessage = errorMessage; this.stackTrace = errorDetails; + + // Error message can be null for things like NullPointerException but the gRPC contract doesn't allow null + this.errorMessage = Objects.requireNonNullElse(errorMessage, ""); } public FailureDetails(Exception exception) { @@ -29,14 +35,17 @@ public FailureDetails(TaskFailureDetails proto) { this.stackTrace = proto.getStackTrace().getValue(); } + @Nonnull public String getErrorType() { return this.errorType; } + @Nonnull public String getErrorMessage() { return this.errorMessage; } + @Nullable public String getStackTrace() { return this.stackTrace; } @@ -56,7 +65,7 @@ TaskFailureDetails toProto() { return TaskFailureDetails.newBuilder() .setErrorType(this.getErrorType()) .setErrorMessage(this.getErrorMessage()) - .setStackTrace(StringValue.of(this.getStackTrace())) + .setStackTrace(StringValue.of(Objects.requireNonNullElse(this.getStackTrace(), ""))) .build(); } } \ No newline at end of file diff --git a/sdk/src/main/java/com/microsoft/durabletask/RetryPolicy.java b/sdk/src/main/java/com/microsoft/durabletask/RetryPolicy.java new file mode 100644 index 00000000..55999fa2 --- /dev/null +++ b/sdk/src/main/java/com/microsoft/durabletask/RetryPolicy.java @@ -0,0 +1,108 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.microsoft.durabletask; + +import javax.annotation.Nullable; +import java.time.Duration; +import java.util.Objects; + +public class RetryPolicy { + + private final int maxNumberOfAttempts; + private final Duration firstRetryInterval; + private final double backoffCoefficient; + private final Duration maxRetryInterval; + private final Duration retryTimeout; + + private RetryPolicy(Builder builder) { + this.maxNumberOfAttempts = builder.maxNumberOfAttempts; + this.firstRetryInterval = builder.firstRetryInterval; + this.backoffCoefficient = builder.backoffCoefficient; + this.maxRetryInterval = Objects.requireNonNullElse(builder.maxRetryInterval, Duration.ZERO); + this.retryTimeout = Objects.requireNonNullElse(builder.retryTimeout, Duration.ZERO); + } + + public static Builder newBuilder(int maxNumberOfAttempts, Duration firstRetryInterval) { + return new Builder(maxNumberOfAttempts, firstRetryInterval); + } + + public int getMaxNumberOfAttempts() { + return this.maxNumberOfAttempts; + } + + public Duration getFirstRetryInterval() { + return this.firstRetryInterval; + } + + public double getBackoffCoefficient() { + return this.backoffCoefficient; + } + + public Duration getMaxRetryInterval() { + return this.maxRetryInterval; + } + + public Duration getRetryTimeout() { + return this.retryTimeout; + } + + public static class Builder { + private int maxNumberOfAttempts; + private Duration firstRetryInterval; + private double backoffCoefficient; + private Duration maxRetryInterval; + private Duration retryTimeout; + + private Builder(int maxNumberOfAttempts, Duration firstRetryInterval) { + this.setMaxNumberOfAttempts(maxNumberOfAttempts); + this.setFirstRetryInterval(firstRetryInterval); + } + + public RetryPolicy build() { + return new RetryPolicy(this); + } + + public Builder setMaxNumberOfAttempts(int maxNumberOfAttempts) { + if (maxNumberOfAttempts <= 0) { + throw new IllegalArgumentException("The value for maxNumberOfAttempts must be greater than zero."); + } + this.maxNumberOfAttempts = maxNumberOfAttempts; + return this; + } + + public Builder setFirstRetryInterval(Duration firstRetryInterval) { + if (firstRetryInterval == null) { + throw new IllegalArgumentException("firstRetryInterval cannot be null."); + } + if (firstRetryInterval.isZero() || firstRetryInterval.isNegative()) { + throw new IllegalArgumentException("The value for firstRetryInterval must be greater than zero."); + } + this.firstRetryInterval = firstRetryInterval; + return this; + } + + public Builder setBackoffCoefficient(double backoffCoefficient) { + if (backoffCoefficient < 1.0) { + throw new IllegalArgumentException("The value for backoffCoefficient must be greater or equal to 1.0."); + } + this.backoffCoefficient = backoffCoefficient; + return this; + } + + public Builder setMaxRetryInterval(@Nullable Duration maxRetryInterval) { + if (maxRetryInterval != null && maxRetryInterval.compareTo(this.firstRetryInterval) < 0) { + throw new IllegalArgumentException("The value for maxRetryInterval must be greater than or equal to the value for firstRetryInterval."); + } + this.maxRetryInterval = maxRetryInterval; + return this; + } + + public Builder setRetryTimeout(Duration retryTimeout) { + if (retryTimeout != null && retryTimeout.compareTo(this.firstRetryInterval) < 0) { + throw new IllegalArgumentException("The value for retryTimeout must be greater than or equal to the value for firstRetryInterval."); + } + this.retryTimeout = retryTimeout; + return this; + } + } +} diff --git a/sdk/src/main/java/com/microsoft/durabletask/TaskOptions.java b/sdk/src/main/java/com/microsoft/durabletask/TaskOptions.java new file mode 100644 index 00000000..d01905d5 --- /dev/null +++ b/sdk/src/main/java/com/microsoft/durabletask/TaskOptions.java @@ -0,0 +1,43 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.microsoft.durabletask; + +public class TaskOptions { + private final RetryPolicy retryPolicy; + + TaskOptions(Builder builder) { + this.retryPolicy = builder.retryPolicy; + } + + public static TaskOptions fromRetryPolicy(RetryPolicy policy) { + return newBuilder().setRetryPolicy(policy).build(); + } + + boolean hasRetryPolicy() { + return this.retryPolicy != null; + } + + public RetryPolicy getRetryPolicy() { + return this.retryPolicy; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + private RetryPolicy retryPolicy; + + private Builder() { + } + + public TaskOptions build() { + return new TaskOptions(this); + } + + public Builder setRetryPolicy(RetryPolicy retryPolicy) { + this.retryPolicy = retryPolicy; + return this; + } + } +} diff --git a/sdk/src/main/java/com/microsoft/durabletask/TaskOrchestrationContext.java b/sdk/src/main/java/com/microsoft/durabletask/TaskOrchestrationContext.java index e1f35b70..d43db1e2 100644 --- a/sdk/src/main/java/com/microsoft/durabletask/TaskOrchestrationContext.java +++ b/sdk/src/main/java/com/microsoft/durabletask/TaskOrchestrationContext.java @@ -2,6 +2,8 @@ // Licensed under the MIT License. package com.microsoft.durabletask; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.time.Duration; import java.time.Instant; import java.util.Arrays; @@ -25,30 +27,51 @@ default Task> anyOf(Task... tasks) { void complete(Object output); void fail(FailureDetails failureDetails); - Task callActivity(String name, Object input, Class returnType); + Task callActivity(String name, Object input, TaskOptions options, Class returnType); default Task callActivity(String name) { return this.callActivity(name, null); } default Task callActivity(String name, Object input) { - return this.callActivity(name, input, Void.class); + return this.callActivity(name, input, null, Void.class); } - Task callSubOrchestrator(String name, Object input, String instanceId, Class returnType); + default Task callActivity(String name, Object input, Class returnType) { + return this.callActivity(name, input, null, returnType); + } + + default Task callActivity(String name, Object input, TaskOptions options) { + return this.callActivity(name, input, options, Void.class); + } default Task callSubOrchestrator(String name){ return this.callSubOrchestrator(name, null); } - default Task callSubOrchestrator(String name, Object input){ + default Task callSubOrchestrator(String name, Object input) { return this.callSubOrchestrator(name, input, null); } - default Task callSubOrchestrator(String name, Object input, Class returnType){ + default Task callSubOrchestrator(String name, Object input, Class returnType) { return this.callSubOrchestrator(name, input, null, returnType); } + default Task callSubOrchestrator(String name, Object input, String instanceId, Class returnType) { + return this.callSubOrchestrator(name, input, instanceId, null, returnType); + } + + default Task callSubOrchestrator(String name, Object input, String instanceId, TaskOptions options) { + return this.callSubOrchestrator(name, input, instanceId, options, Void.class); + } + + Task callSubOrchestrator( + String name, + @Nullable Object input, + @Nullable String instanceId, + @Nullable TaskOptions options, + Class returnType); + Task waitForExternalEvent(String name, Duration timeout, Class dataType) throws TaskCanceledException; default Task waitForExternalEvent(String name, Duration timeout) throws TaskCanceledException { diff --git a/sdk/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java b/sdk/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java index 629d2400..2999c9e9 100644 --- a/sdk/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java +++ b/sdk/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java @@ -7,6 +7,7 @@ import com.microsoft.durabletask.protobuf.OrchestratorService.*; import com.microsoft.durabletask.protobuf.OrchestratorService.ScheduleTaskAction.Builder; +import javax.annotation.Nullable; import java.time.Duration; import java.time.Instant; import java.util.*; @@ -194,11 +195,17 @@ public Task> anyOf(List> tasks) { } @Override - public Task callActivity(String name, Object input, Class returnType) { + public Task callActivity( + String name, + @Nullable Object input, + @Nullable TaskOptions options, + Class returnType) { Helpers.throwIfArgumentNull(name, "name"); Helpers.throwIfArgumentNull(returnType, "returnType"); - int id = this.sequenceNumber++; + if (input instanceof TaskOptions) { + throw new IllegalArgumentException("TaskOptions cannot be used as an input. Did you call the wrong method overload?"); + } String serializedInput = this.dataConverter.serialize(input); Builder scheduleTaskBuilder = ScheduleTaskAction.newBuilder().setName(name); @@ -206,31 +213,53 @@ public Task callActivity(String name, Object input, Class returnType) scheduleTaskBuilder.setInput(StringValue.of(serializedInput)); } - this.pendingActions.put(id, OrchestratorAction.newBuilder() - .setId(id) - .setScheduleTask(scheduleTaskBuilder) - .build()); + TaskFactory taskFactory = () -> { + int id = this.sequenceNumber++; + this.pendingActions.put(id, OrchestratorAction.newBuilder() + .setId(id) + .setScheduleTask(scheduleTaskBuilder) + .build()); + + if (!this.isReplaying) { + this.logger.fine(() -> String.format( + "%s: calling activity '%s' (#%d) with serialized input: %s", + this.instanceId, + name, + id, + serializedInput != null ? serializedInput : "(null)")); + } - if (!this.isReplaying) { - this.logger.fine(() -> String.format( - "%s: calling activity '%s' (#%d) with serialized input: %s", - this.instanceId, - name, - id, - serializedInput != null ? serializedInput : "(null)")); + CompletableTask task = new CompletableTask<>(); + TaskRecord record = new TaskRecord<>(task, name, returnType); + this.openTasks.put(id, record); + return task; + }; + + Task task; + if (options != null && options.hasRetryPolicy()) { + // Return a retry task that generates the same task for each retry + task = new RetriableTask(taskFactory, options.getRetryPolicy(), this); + } else { + // Return a single vanilla task + task = taskFactory.create(); } - CompletableTask task = new CompletableTask<>(); - TaskRecord record = new TaskRecord<>(task, name, returnType); - this.openTasks.put(id, record); return task; } @Override - public Task callSubOrchestrator(String name, Object input, String instanceId, Class returnType){ + public Task callSubOrchestrator( + String name, + @Nullable Object input, + @Nullable String instanceId, + @Nullable TaskOptions options, + Class returnType){ Helpers.throwIfArgumentNull(name, "name"); Helpers.throwIfArgumentNull(returnType, "returnType"); - int id = this.sequenceNumber++; + + if (input instanceof TaskOptions) { + throw new IllegalArgumentException("TaskOptions cannot be used as an input. Did you call the wrong method overload?"); + } String serializedInput = this.dataConverter.serialize(input); CreateSubOrchestrationAction.Builder createSubOrchestrationActionBuilder = CreateSubOrchestrationAction.newBuilder().setName(name); @@ -238,31 +267,42 @@ public Task callSubOrchestrator(String name, Object input, String instanc createSubOrchestrationActionBuilder.setInput(StringValue.of(serializedInput)); } - //TODO:replace this with a deterministic GUID generation so that it's safe for replay, + // TODO:replace this with a deterministic GUID generation so that it's safe for replay, // please find potentail bug here https://github.com/microsoft/durabletask-dotnet/issues/9 if (instanceId == null) { instanceId = UUID.randomUUID().toString(); } createSubOrchestrationActionBuilder.setInstanceId(instanceId); - this.pendingActions.put(id, OrchestratorAction.newBuilder() - .setId(id) - .setCreateSubOrchestration(createSubOrchestrationActionBuilder) - .build()); + TaskFactory taskFactory = () -> { + int id = this.sequenceNumber++; + this.pendingActions.put(id, OrchestratorAction.newBuilder() + .setId(id) + .setCreateSubOrchestration(createSubOrchestrationActionBuilder) + .build()); + + if (!this.isReplaying) { + this.logger.fine(() -> String.format( + "%s: calling sub-orchestration '%s' (#%d) with serialized input: %s", + this.instanceId, + name, + id, + serializedInput != null ? serializedInput : "(null)")); + } - if (!this.isReplaying) { - this.logger.fine(() -> String.format( - "%s: calling sub-orchestration '%s' (#%d) with serialized input: %s", - this.instanceId, - name, - id, - serializedInput != null ? serializedInput : "(null)")); - } + CompletableTask task = new CompletableTask<>(); + TaskRecord record = new TaskRecord<>(task, name, returnType); + this.openTasks.put(id, record); + return task; + }; - CompletableTask task = new CompletableTask<>(); - TaskRecord record = new TaskRecord<>(task, name, returnType); - this.openTasks.put(id, record); - return task; + if (options != null && options.hasRetryPolicy()) { + // Return a retry task that generates the same task for each retry + return new RetriableTask<>(taskFactory, options.getRetryPolicy(), this); + } else { + // Return a single vanilla task + return taskFactory.create(); + } } public Task waitForExternalEvent(String name, Duration timeout, Class dataType) { @@ -744,6 +784,73 @@ protected void handleException(Throwable e) throws TaskFailedException { } } + // Task implementation that implements a retry policy + private class RetriableTask extends CompletableTask { + private final RetryPolicy policy; + private final TaskOrchestrationContext context; + private final Instant firstAttempt; + private final TaskFactory taskFactory; + + private int attemptNumber; + + public RetriableTask(TaskFactory taskFactory, RetryPolicy policy, TaskOrchestrationContext context) { + super(new CompletableFuture<>()); + this.taskFactory = taskFactory; + this.policy = policy; + this.context = context; + this.firstAttempt = context.getCurrentInstant(); + } + + @Override + public V get() throws TaskFailedException, OrchestratorBlockedEvent { + while (true) { + Task currentTask = this.taskFactory.create(); + this.attemptNumber++; + try { + return currentTask.get(); + } catch (TaskFailedException ex) { + if (!this.shouldRetry()) { + throw ex; + } + + // Use a durable timer to create the delay between retries + this.context.createTimer(this.getNextDelay()).get(); + } + } + } + + private boolean shouldRetry() { + if (this.attemptNumber >= this.policy.getMaxNumberOfAttempts()) { + // Max number of attempts exceeded + return false; + } + + // Duration.ZERO is interpreted as no maximum timeout + Duration retryTimeout = this.policy.getRetryTimeout(); + if (retryTimeout.compareTo(Duration.ZERO) > 0) { + Instant retryExpiration = this.firstAttempt.plus(retryTimeout); + if (this.context.getCurrentInstant().compareTo(retryExpiration) >= 0) { + // Max retry timeout exceeded + return false; + } + } + + // Keep retrying + return true; + } + + private Duration getNextDelay() { + // REVIEW: Do we need to worry about overflow here? + long nextDelayInMillis = this.policy.getFirstRetryInterval().toMillis() * + (long)Math.pow(this.policy.getBackoffCoefficient(), this.attemptNumber); + if (nextDelayInMillis > this.policy.getMaxRetryInterval().toMillis()) { + return this.policy.getMaxRetryInterval(); + } else { + return Duration.ofMillis(nextDelayInMillis); + } + } + } + private class CompletableTask extends Task { public CompletableTask() { @@ -755,7 +862,7 @@ public CompletableTask() { } @Override - public final V get() throws TaskFailedException, OrchestratorBlockedEvent { + public V get() throws TaskFailedException, OrchestratorBlockedEvent { do { // If the future is done, return its value right away if (this.future.isDone()) { @@ -828,4 +935,9 @@ public Task thenCompose(Function> fn) { } } } + + @FunctionalInterface + private interface TaskFactory { + Task create(); + } } diff --git a/sdk/src/test/java/com/microsoft/durabletask/ErrorHandlingIntegrationTests.java b/sdk/src/test/java/com/microsoft/durabletask/ErrorHandlingIntegrationTests.java index 993a5081..56398ea3 100644 --- a/sdk/src/test/java/com/microsoft/durabletask/ErrorHandlingIntegrationTests.java +++ b/sdk/src/test/java/com/microsoft/durabletask/ErrorHandlingIntegrationTests.java @@ -8,6 +8,9 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; + import static org.junit.jupiter.api.Assertions.*; /** @@ -97,6 +100,59 @@ void activityException(boolean handleException) { } } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 10}) + public void retryActivityFailures(int maxNumberOfAttempts) { + final String orchestratorName = "OrchestratorWithActivityException"; + final String activityName = "Throw"; + + TaskOptions options = TaskOptions.fromRetryPolicy(RetryPolicy.newBuilder( + maxNumberOfAttempts, + Duration.ofMillis(1)).build()); + + AtomicInteger actualAttemptCount = new AtomicInteger(); + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> { + ctx.callActivity(activityName,null, options).get(); + }) + .addActivity(activityName, ctx -> { + actualAttemptCount.getAndIncrement(); + throw new RuntimeException("Error #" + actualAttemptCount.get()); + }) + .buildAndStart(); + + DurableTaskClient client = DurableTaskGrpcClient.newBuilder().build(); + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, ""); + OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.FAILED, instance.getRuntimeStatus()); + + // Make sure the exception details are still what we expect + FailureDetails details = instance.getFailureDetails(); + assertNotNull(details); + + // Make sure the surfaced exception is the last one. This is reflected in both the task ID and the + // error message. In the case of the task ID, it's going to be (N-1)*2 because there is a timer task + // injected before each retry. This is useful to validate because changing this could break replays for + // existing orchestrations that adopt an updated retry policy implementation (this has happened before). + String expectedExceptionMessage = "Error #" + maxNumberOfAttempts; + int expectedTaskId = (maxNumberOfAttempts - 1) * 2; + String expectedMessage = String.format( + "Task '%s' (#%d) failed with an unhandled exception: %s", + activityName, + expectedTaskId, + expectedExceptionMessage); + assertEquals(expectedMessage, details.getErrorMessage()); + assertEquals("com.microsoft.durabletask.TaskFailedException", details.getErrorType()); + assertNotNull(details.getStackTrace()); + + // Confirm the number of attempts + assertEquals(maxNumberOfAttempts, actualAttemptCount.get()); + } + } + @ParameterizedTest @ValueSource(booleans = {true, false}) void subOrchestrationException(boolean handleException){ @@ -122,16 +178,16 @@ void subOrchestrationException(boolean handleException){ }) .buildAndStart(); DurableTaskClient client = DurableTaskGrpcClient.newBuilder().build(); - try(worker; client){ + try (worker; client) { String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, 1); OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); assertNotNull(instance); - if (handleException){ + if (handleException) { assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); String result = instance.readOutputAs(String.class); assertNotNull(result); assertEquals("handled", result); - }else{ + } else { assertEquals(OrchestrationRuntimeStatus.FAILED, instance.getRuntimeStatus()); FailureDetails details = instance.getFailureDetails(); assertNotNull(details); @@ -146,4 +202,56 @@ void subOrchestrationException(boolean handleException){ } } } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 10}) + public void retrySubOrchestratorFailures(int maxNumberOfAttempts) { + final String orchestratorName = "OrchestratorWithBustedSubOrchestrator"; + final String subOrchestratorName = "BustedSubOrchestrator"; + + TaskOptions options = TaskOptions.fromRetryPolicy(RetryPolicy.newBuilder( + maxNumberOfAttempts, + Duration.ofMillis(1)).build()); + + AtomicInteger actualAttemptCount = new AtomicInteger(); + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> { + ctx.callSubOrchestrator(subOrchestratorName, null, null, options).get(); + }) + .addOrchestrator(subOrchestratorName, ctx -> { + actualAttemptCount.getAndIncrement(); + throw new RuntimeException("Error #" + actualAttemptCount.get()); + }) + .buildAndStart(); + + DurableTaskClient client = DurableTaskGrpcClient.newBuilder().build(); + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, ""); + OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.FAILED, instance.getRuntimeStatus()); + + // Make sure the exception details are still what we expect + FailureDetails details = instance.getFailureDetails(); + assertNotNull(details); + + // Make sure the surfaced exception is the last one. This is reflected in both the task ID and the + // error message. In the case of the task ID, it's going to be (N-1)*2 because there is a timer task + // injected before each retry. This is useful to validate because changing this could break replays for + // existing orchestrations that adopt an updated retry policy implementation (this has happened before). + String expectedExceptionMessage = "Error #" + maxNumberOfAttempts; + int expectedTaskId = (maxNumberOfAttempts - 1) * 2; + String expectedMessage = String.format( + "Task '%s' (#%d) failed with an unhandled exception: %s", + subOrchestratorName, + expectedTaskId, + expectedExceptionMessage); + assertEquals(expectedMessage, details.getErrorMessage()); + assertEquals("com.microsoft.durabletask.TaskFailedException", details.getErrorType()); + assertNotNull(details.getStackTrace()); + + // Confirm the number of attempts + assertEquals(maxNumberOfAttempts, actualAttemptCount.get()); + } + } } diff --git a/sdk/src/test/java/com/microsoft/durabletask/IntegrationTestBase.java b/sdk/src/test/java/com/microsoft/durabletask/IntegrationTestBase.java index 74a4c371..b83dc8ea 100644 --- a/sdk/src/test/java/com/microsoft/durabletask/IntegrationTestBase.java +++ b/sdk/src/test/java/com/microsoft/durabletask/IntegrationTestBase.java @@ -1,3 +1,6 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + package com.microsoft.durabletask; import org.junit.jupiter.api.AfterEach; @@ -11,7 +14,7 @@ public class IntegrationTestBase { private DurableTaskGrpcWorker server; @AfterEach - private void shutdown() throws InterruptedException { + private void shutdown() { if (this.server != null) { this.server.stop(); }