diff --git a/temporal-sdk/src/test/java/io/temporal/internal/testing/ActivityTestingTest.java b/temporal-sdk/src/test/java/io/temporal/internal/testing/ActivityTestingTest.java index 0cadad8da9..a0783a7d26 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/testing/ActivityTestingTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/testing/ActivityTestingTest.java @@ -31,6 +31,7 @@ import io.temporal.client.ActivityCanceledException; import io.temporal.failure.ActivityFailure; import io.temporal.failure.ApplicationFailure; +import io.temporal.testing.ActivityRequestedAsyncCompletion; import io.temporal.testing.TestActivityEnvironment; import java.io.IOException; import java.util.ArrayList; @@ -39,10 +40,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; +import org.junit.*; import org.junit.rules.Timeout; public class ActivityTestingTest { @@ -111,6 +109,21 @@ public void testFailure() { } } + private static class AsyncActivityImpl implements TestActivity { + @Override + public String activity1(String input) { + Activity.getExecutionContext().doNotCompleteOnReturn(); + return ""; + } + } + + @Test + public void testAsyncActivity() { + testEnvironment.registerActivitiesImplementations(new AsyncActivityImpl()); + TestActivity activity = testEnvironment.newActivityStub(TestActivity.class); + Assert.assertThrows(ActivityRequestedAsyncCompletion.class, () -> activity.activity1("input1")); + } + private static class HeartbeatActivityImpl implements TestActivity { @Override diff --git a/temporal-testing/src/main/java/io/temporal/testing/ActivityRequestedAsyncCompletion.java b/temporal-testing/src/main/java/io/temporal/testing/ActivityRequestedAsyncCompletion.java new file mode 100644 index 0000000000..46d5b134c8 --- /dev/null +++ b/temporal-testing/src/main/java/io/temporal/testing/ActivityRequestedAsyncCompletion.java @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.testing; + +/** + * Exception thrown when an activity request to complete asynchronously in the {@link + * TestActivityEnvironment}. Intended to be used in unit tests to assert an activity requested async + * completion. + */ +public final class ActivityRequestedAsyncCompletion extends RuntimeException { + private final String activityId; + private final boolean manualCompletion; + + public ActivityRequestedAsyncCompletion(String activityId, boolean manualCompletion) { + super("activity requested async completion"); + this.activityId = activityId; + this.manualCompletion = manualCompletion; + } + + public String getActivityId() { + return activityId; + } + + public boolean isManualCompletion() { + return manualCompletion; + } +} diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironment.java b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironment.java index 96065966e2..c58bc9c8c8 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironment.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironment.java @@ -82,6 +82,9 @@ static TestActivityEnvironment newInstance(TestEnvironmentOptions options) { * Creates a stub that can be used to invoke activities registered through {@link * #registerActivitiesImplementations(Object...)}. * + *

Activity methods may throw {@link ActivityRequestedAsyncCompletion} if the activity + * requested async completion. + * * @param activityInterface activity interface class that the object under test implements. * @param Type of the activity interface. * @return The stub that implements the activity interface. @@ -92,6 +95,9 @@ static TestActivityEnvironment newInstance(TestEnvironmentOptions options) { * Creates a stub that can be used to invoke activities registered through {@link * #registerActivitiesImplementations(Object...)}. * + *

Activity methods may throw {@link ActivityRequestedAsyncCompletion} if the activity + * requested async completion. + * * @param Type of the activity interface. * @param activityInterface activity interface class that the object under test implements * @param options options that specify the activity invocation parameters diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java index 2f9c2087f6..a45f5be340 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java @@ -20,7 +20,6 @@ package io.temporal.testing; -import com.google.common.base.Defaults; import com.google.protobuf.ByteString; import com.uber.m3.tally.NoopScope; import com.uber.m3.tally.Scope; @@ -511,40 +510,38 @@ private T getReply( Type resultType) { DataConverter dataConverter = testEnvironmentOptions.getWorkflowClientOptions().getDataConverter(); - RespondActivityTaskCompletedRequest taskCompleted = response.getTaskCompleted(); - if (taskCompleted != null) { + if (response.getTaskCompleted() != null) { + RespondActivityTaskCompletedRequest taskCompleted = response.getTaskCompleted(); Optional result = taskCompleted.hasResult() ? Optional.of(taskCompleted.getResult()) : Optional.empty(); return dataConverter.fromPayloads(0, result, resultClass, resultType); - } else { + } else if (response.getTaskFailed() != null) { RespondActivityTaskFailedRequest taskFailed = response.getTaskFailed().getTaskFailedRequest(); - if (taskFailed != null) { - Exception cause = dataConverter.failureToException(taskFailed.getFailure()); - throw new ActivityFailure( - taskFailed.getFailure().getMessage(), - 0, - 0, - task.getActivityType().getName(), - task.getActivityId(), - RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE, - "TestActivityEnvironment", - cause); - } else { - RespondActivityTaskCanceledRequest taskCanceled = response.getTaskCanceled(); - if (taskCanceled != null) { - throw new CanceledFailure( - "canceled", - new EncodedValues( - taskCanceled.hasDetails() - ? Optional.of(taskCanceled.getDetails()) - : Optional.empty(), - dataConverter), - null); - } - } + Exception cause = dataConverter.failureToException(taskFailed.getFailure()); + throw new ActivityFailure( + taskFailed.getFailure().getMessage(), + 0, + 0, + task.getActivityType().getName(), + task.getActivityId(), + RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE, + "TestActivityEnvironment", + cause); + } else if (response.getTaskCanceled() != null) { + RespondActivityTaskCanceledRequest taskCanceled = response.getTaskCanceled(); + throw new CanceledFailure( + "canceled", + new EncodedValues( + taskCanceled.hasDetails() + ? Optional.of(taskCanceled.getDetails()) + : Optional.empty(), + dataConverter), + null); + } else { + throw new ActivityRequestedAsyncCompletion( + task.getActivityId(), response.isManualCompletion()); } - return Defaults.defaultValue(resultClass); } @Override