Conversation
Remove isEmpty() check from isWorkflowActivity.
This reverts commit 381e66c.
|
|
||
| @Override | ||
| public <R> CompletableFuture<R> getResultAsync(Class<R> resultClass, @Nullable Type resultType) { | ||
| return CompletableFuture.supplyAsync(() -> getResult(resultClass, resultType)); |
There was a problem hiding this comment.
Do we want to set up a separate Executor for this?
There was a problem hiding this comment.
We probably do. I think you might need to do something similar to what happens in the workflow client for getting results (and we probably also want overloads that you can give timeouts to, like it has).
@Quinn-With-Two-Ns I'm sure would be able to say something much more intelligent than me about this. I'd ask him.
There was a problem hiding this comment.
We also probably want to cache results here rather than re-polling. Looks like the other SDKs are doing that.
There was a problem hiding this comment.
Yeah we shouldn't just be calling getResult in supplyAsync , look at how getResultAsync on WorkflowStubImpl works you need to wire the async down to the GRPC layer
There was a problem hiding this comment.
and yeah if we cahced results in other SDK's we should here as well
Sushisource
left a comment
There was a problem hiding this comment.
Overall looking pretty good to me! I didn't review the tests (it's a long one!). Just a few things to address (and we'll want more than my stamp).
| * {@code captured[0]} when any method is called on it. | ||
| */ | ||
| @SuppressWarnings("unchecked") | ||
| private static <I> I createTypeProbe(Class<I> activityInterface, Method[] captured) { |
There was a problem hiding this comment.
Why is captured an array here? Seems like only one value can ever be written? I guess this is just a workaround for needing some wrapper type, but I wonder if there's something more explicit for that purpose.
There was a problem hiding this comment.
Seems like AtomicReference is the standard approach, but I think that has as semantic connotation I don't want to give, so I'm just adding a Box.
There was a problem hiding this comment.
Got rid of Box and moved all the logic to MethodExtractor, to reduce boilerplate here.
| } catch (Throwable ignored) { | ||
| } | ||
| UntypedActivityHandle untyped = | ||
| start(extractActivityType(activityInterface, captured[0]), options, new Object[0]); |
There was a problem hiding this comment.
It seems like, in general with these createTypeProbe call sites, we could be doing error handling a bit more robustly. If they pass a method reference that doesn't exist on the class, for example, what kind of error are we gonna throw here?
Will we just end up with an NPE from dereffing captured[0]? We want to provide at least some kind of reasonable user-facing error in that case.
There was a problem hiding this comment.
Moved the logic into MethodExtractor, which throws an information RTE if the method doesn't match the activity.
| * ActivityHandle#describe()}. | ||
| */ | ||
| @Experimental | ||
| public final class ActivityExecutionDescription extends ActivityExecutionMetadata { |
There was a problem hiding this comment.
This guy is missing lastFailure() which exists in the other SDKs. Probably also good to expose the raw info too, which I think we typically do?
|
|
||
| @Override | ||
| public <R> CompletableFuture<R> getResultAsync(Class<R> resultClass, @Nullable Type resultType) { | ||
| return CompletableFuture.supplyAsync(() -> getResult(resultClass, resultType)); |
There was a problem hiding this comment.
We probably do. I think you might need to do something similar to what happens in the workflow client for getting results (and we probably also want overloads that you can give timeouts to, like it has).
@Quinn-With-Two-Ns I'm sure would be able to say something much more intelligent than me about this. I'd ask him.
|
|
||
| @Override | ||
| public <R> CompletableFuture<R> getResultAsync(Class<R> resultClass, @Nullable Type resultType) { | ||
| return CompletableFuture.supplyAsync(() -> getResult(resultClass, resultType)); |
There was a problem hiding this comment.
We also probably want to cache results here rather than re-polling. Looks like the other SDKs are doing that.
| } | ||
|
|
||
| @Override | ||
| public DescribeActivityOutput describeActivity(DescribeActivityInput input) { |
There was a problem hiding this comment.
I think this will need to support the long poll token, which probably also means the describe() on the handle needs an overload to take one too
There was a problem hiding this comment.
I thought the SDK wasn't/didn't need to expose the long poll token
There was a problem hiding this comment.
Bleh, I forgot that. I was just going on what existed in Go/Python. We can probably leave it out then.
| /** | ||
| * Starts a standalone activity using a typed interface and an unbound method reference, and | ||
| * returns a typed handle. The activity type name is inferred from the method reference at runtime | ||
| * via a reflection proxy; the result type is captured from the generic type parameter. |
There was a problem hiding this comment.
| * via a reflection proxy; the result type is captured from the generic type parameter. |
IMO this is just an implementation detail, I don't think the average user would benefit from that info and might even confuse them since a "reflection proxy" isn't a standard term. The only argument I could see is for GRAAL users, since they do need to know that.
| * | ||
| * <p>Example: | ||
| * | ||
| * <pre>{@code |
There was a problem hiding this comment.
nit: I like the examples a lot, but I would make 2 small changes. I would add one for each overload, or ask AI to write them, and have just one example per method.
| * @param options pagination options such as page size | ||
| * @return a page of results and a token for the next page | ||
| */ | ||
| ActivityListPage listExecutionsPaginated( |
There was a problem hiding this comment.
I don't think we need to expose this? We didn't for other list methods.
| A2 arg2, | ||
| A3 arg3) | ||
| throws ActivityAlreadyStartedException { | ||
| Method method = MethodExtractor.extract(activityInterface, activity); |
There was a problem hiding this comment.
@Evanthx FYI your SANO code could copy this pattern.
| */ | ||
| <I> ActivityHandle<Void> start( | ||
| Class<I> activityInterface, Functions.Proc1<I> activity, StartActivityOptions options) | ||
| throws ActivityAlreadyStartedException; |
There was a problem hiding this comment.
| throws ActivityAlreadyStartedException; |
Generally the Java SDK does NOT used checked exceptions. I think we should be consistent with other client calls.
| * StartActivityOptions#getIdConflictPolicy()}). | ||
| */ | ||
| @Experimental | ||
| public final class ActivityAlreadyStartedException extends TemporalException { |
There was a problem hiding this comment.
For WorkflowExecutionAlreadyStarted we inherit from WorkflowExecution I don't see anything equivalent in this PR.
| * @param resultType the generic type to use for deserialization; may be {@code null} | ||
| * @throws ActivityFailedException if the activity failed, timed out, or was cancelled | ||
| */ | ||
| <R> R getResult(Class<R> resultClass, @Nullable Type resultType) throws ActivityFailedException; |
There was a problem hiding this comment.
| <R> R getResult(Class<R> resultClass, @Nullable Type resultType) throws ActivityFailedException; | |
| <R> R getResult(Class<R> resultClass, @Nullable Type resultType); |
Same comment as above about checked exception
| * cancelled. The original cause can be retrieved via {@link #getCause()}. | ||
| */ | ||
| @Experimental | ||
| public final class ActivityFailedException extends TemporalException { |
There was a problem hiding this comment.
I think we are missing a common ActivityException base class here that ActivityFailedException and ActivityAlreadyStartedException
| @Experimental | ||
| public interface ActivityClientCallsInterceptor { | ||
|
|
||
| StartActivityOutput startActivity(StartActivityInput input) |
There was a problem hiding this comment.
nit: we should have Java docs for these. I know some interceptors don't but that is more tech debt then intentional IMO.
| * ActivityHandle#fromUntyped(UntypedActivityHandle, Class)} or {@link | ||
| * ActivityHandle#fromUntyped(UntypedActivityHandle, Class, Type)}. | ||
| */ | ||
| final class ActivityHandleWrapper<R> implements ActivityHandle<R> { |
There was a problem hiding this comment.
| final class ActivityHandleWrapper<R> implements ActivityHandle<R> { | |
| final class ActivityHandleImpl<R> implements ActivityHandle<R> { |
Was there a reason we used a fifferent naming convention then other classes?
| * @param resultClass the class to deserialize the result into | ||
| * @param resultType the generic type to use for deserialization; may be {@code null} | ||
| */ | ||
| <R> CompletableFuture<R> getResultAsync(Class<R> resultClass, @Nullable Type resultType); |
There was a problem hiding this comment.
We are missing the timeour overloads for getResultAsync on WorkflowStub
Quinn-With-Two-Ns
left a comment
There was a problem hiding this comment.
I didn't review the test or check more subtle behaviour, overall I think it looks really good! The MethodExtractor was a good idea. A few changes that if I owned the Java SDK I would want changed is:
- Mirror the exception hierarchy with
WorkflowException - Mirror the
getResultAsyncoverloads inWorkflowStub - Remove the use of checked exceptions, they are not used in the Java SDK like this
- Mirror the implementation of
getResultAsyncfromWorkflowStub.getResultAsyncshould not but be callinggetResultin an executor it should flow down to an async gRPC call.
Add standalone activity API (ActivityClient)
Introduces support for standalone activities — activities that execute independently of any workflow.
New public API surface:
ActivityClient— top-level client for starting, describing,listing, counting, cancelling, and terminating standalone activities
ActivityHandle<R>/UntypedActivityHandle— typed and untypedhandles returned by
ActivityClient.start(); providegetResult(),getResultAsync(),describe(),cancel(), andterminate()StartActivityOptions— builder-based options for starting anactivity (id, task queue, timeouts, retry, priority, etc.)
ActivityClientOptions— namespace / data converter / interceptorconfiguration for
ActivityClientActivityExecutionDescription— rich descriptor returned bydescribe()andlist*(); extendsActivityExecutionMetadataActivityExecutionMetadata— lightweight metadata used in listresults
ActivityExecutionCount— result ofcountActivities()ActivityListOptions/ActivityListPaginatedOptions— filteringand pagination options for list operations
ActivityListPage— page of results with continuation tokenActivityAlreadyStartedException— thrown when a duplicate activityid is rejected by the server
ActivityFailedException— thrown fromgetResult()when theactivity fails
New interceptor API:
ActivityClientCallsInterceptor— per-call interceptor for allActivityClientoperationsActivityClientCallsInterceptorBase— pass-through baseimplementation (delegates every method to the next interceptor)
ActivityClientInterceptor— factory interceptor that wraps theclient-level invoker
ActivityClientInterceptorBase— no-op base implementationActivityInfoadditions:getActivityRunId()— run-scoped id assigned by the server to eachactivity execution
isWorkflowActivity()— distinguishes workflow-dispatched activitiesfrom standalone ones
ActivityCompletionClientadditions:(String activityId, Optional<String> runId, …)for completing, failing, sending heartbeats, and cancelling standalone
activities without a workflow id
ActivitySerializationContextfix:workflowIdandworkflowTypeare now@Nullable; removedrequireNonNullguards that caused NPEs for standalone activitiesFunctions.java:FuncandVFunc(zero-arg typed/void functional interfaces)needed by
ActivityClient.start()method-reference overloadsCI:
dev server used by unit tests:
frontend.activityAPIsEnabled,activity.enableStandalone,history.enableChasm,history.enableTransitionHistoryTests:
StandaloneActivityTest— integration tests against a real servercovering the full activity lifecycle (start, poll, complete, cancel,
terminate, describe, list, heartbeat, async completion)
ActivityClientCallsInterceptorBaseTest— delegation tests for thebase interceptor
ActivityClientCallsInterceptorChainTest— chain ordering testsActivityHandleImplTest— unit tests for handle dispatchActivityCompletionClientImplTest— unit tests for completion clientActivityInfoStandaloneTest— unit tests forActivityInfoin thestandalone context
ActivitySerializationContextTest— confirms nullable workflow fieldsStartActivityOptionsTest,ActivityClientOptionsTest,ActivityExecutionDescriptionTest,ActivityExecutionMetadataTest,ActivityAlreadyStartedExceptionTest— options and value-type tests