diff --git a/api/src/main/resources/schema/workflow.yaml b/api/src/main/resources/schema/workflow.yaml index 1cceff60..b59e2f3a 100644 --- a/api/src/main/resources/schema/workflow.yaml +++ b/api/src/main/resources/schema/workflow.yaml @@ -499,10 +499,6 @@ $defs: description: Defines the properties of event to emit. required: [ source, type ] additionalProperties: true - cc: - $ref: '#/$defs/endpoint' - title: EmitCarbonCopyDefinition - description: Defines an additional endpoint, if any, to publish an event's carbon copy to. required: [ event ] forTask: type: object @@ -1343,7 +1339,7 @@ $defs: - properties: until: false title: AnyEventUntilConsumed - required: [ any ] + required: [ any ] - title: OneEventConsumptionStrategy properties: one: @@ -1717,20 +1713,20 @@ $defs: - properties: amount: type: integer - title: AsyncApiMessageConsumptionPolicyAmount description: The amount of (filtered) messages to consume before disposing of the subscription. + title: AsyncApiMessageConsumptionPolicyAmount required: [ amount ] - properties: while: $ref: '#/$defs/runtimeExpression' - title: AsyncApiMessageConsumptionPolicyWhile description: A runtime expression evaluated after each consumed (filtered) message to decide if message consumption should continue. + title: AsyncApiMessageConsumptionPolicyWhile required: [ while ] - properties: until: $ref: '#/$defs/runtimeExpression' - title: AsyncApiMessageConsumptionPolicyUntil description: A runtime expression evaluated before each consumed (filtered) message to decide if message consumption should continue. + title: AsyncApiMessageConsumptionPolicyUntil required: [ until ] subscriptionIterator: type: object diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java index 1a58d656..e051dabf 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java @@ -47,7 +47,6 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -55,12 +54,9 @@ public abstract class ListenExecutor extends RegularTaskExecutor { protected final EventRegistrationBuilderCollection regBuilders; - protected final EventRegistrationBuilderCollection untilRegBuilders; - protected final Optional until; protected final Optional> loop; protected final Function converter; protected final EventConsumer eventConsumer; - protected final AtomicBoolean untilEvent = new AtomicBoolean(true); private static record EventRegistrationBuilderCollection( Collection registrations, boolean isAnd) {} @@ -177,22 +173,37 @@ protected void internalProcessCe( arrayNode.add(node); future.complete(node); } - - @Override - protected CompletableFuture combine(CompletableFuture[] completables) { - return CompletableFuture.allOf(completables); - } } public static class OrListenExecutor extends ListenExecutor { + private final Optional until; + private final EventRegistrationBuilderCollection untilRegBuilders; + public OrListenExecutor(ListenExecutorBuilder builder) { super(builder); + this.until = Optional.ofNullable(builder.until); + this.untilRegBuilders = builder.untilRegistrations; } @Override - protected CompletableFuture combine(CompletableFuture[] completables) { - return CompletableFuture.anyOf(completables); + protected CompletableFuture buildFuture( + EventRegistrationBuilderCollection regCollection, + Collection registrations, + BiConsumer> consumer) { + CompletableFuture combinedFuture = + super.buildFuture(regCollection, registrations, consumer); + if (untilRegBuilders != null) { + Collection untilRegistrations = new ArrayList<>(); + CompletableFuture untilFuture = + combine(untilRegBuilders, untilRegistrations, (ce, f) -> f.complete(null)); + untilFuture.thenAccept( + v -> { + combinedFuture.complete(null); + untilRegistrations.forEach(reg -> eventConsumer.unregister(reg)); + }); + } + return combinedFuture; } protected void internalProcessCe( @@ -206,14 +217,12 @@ protected void internalProcessCe( || until .filter(u -> u.apply(workflow, taskContext, arrayNode).asBoolean()) .isPresent()) - && untilEvent.get()) { + && untilRegBuilders == null) { future.complete(arrayNode); } } } - protected abstract CompletableFuture combine(CompletableFuture[] completables); - protected abstract void internalProcessCe( JsonNode node, ArrayNode arrayNode, @@ -226,48 +235,37 @@ protected CompletableFuture internalExecute( WorkflowContext workflow, TaskContext taskContext) { ArrayNode output = JsonUtils.mapper().createArrayNode(); Collection registrations = new ArrayList<>(); - if (untilRegBuilders != null) { - untilEvent.set(false); - } - CompletableFuture combinedFuture = - combine( - toCompletables( - regBuilders, - registrations, - (ce, future) -> - processCe(converter.apply(ce), output, workflow, taskContext, future))); - CompletableFuture resultFuture = - combinedFuture.thenApply( + return buildFuture( + regBuilders, + registrations, + (BiConsumer>) + ((ce, future) -> + processCe(converter.apply(ce), output, workflow, taskContext, future))) + .thenApply( v -> { registrations.forEach(reg -> eventConsumer.unregister(reg)); return output; }); - if (untilRegBuilders != null) { - Collection untilRegistrations = new ArrayList<>(); - CompletableFuture[] futures = - toCompletables( - untilRegBuilders, untilRegistrations, (ce, future) -> future.complete(null)); - CompletableFuture untilFuture = - untilRegBuilders.isAnd() - ? CompletableFuture.allOf(futures) - : CompletableFuture.anyOf(futures); - untilFuture.thenAccept( - v -> { - untilEvent.set(true); - combinedFuture.complete(null); - untilRegistrations.forEach(reg -> eventConsumer.unregister(reg)); - }); - } - return resultFuture; } - private CompletableFuture[] toCompletables( + protected CompletableFuture buildFuture( + EventRegistrationBuilderCollection regCollection, + Collection registrations, + BiConsumer> consumer) { + return combine(regCollection, registrations, consumer); + } + + protected final CompletableFuture combine( EventRegistrationBuilderCollection regCollection, Collection registrations, BiConsumer> consumer) { - return regCollection.registrations().stream() - .map(reg -> toCompletable(reg, registrations, consumer)) - .toArray(size -> new CompletableFuture[size]); + CompletableFuture[] futures = + regCollection.registrations().stream() + .map(reg -> toCompletable(reg, registrations, consumer)) + .toArray(size -> new CompletableFuture[size]); + return regCollection.isAnd() + ? CompletableFuture.allOf(futures) + : CompletableFuture.anyOf(futures); } private CompletableFuture toCompletable( @@ -307,9 +305,7 @@ protected ListenExecutor(ListenExecutorBuilder builder) { super(builder); this.eventConsumer = builder.application.eventConsumer(); this.regBuilders = builder.registrations; - this.until = Optional.ofNullable(builder.until); this.loop = Optional.ofNullable(builder.loop); this.converter = builder.converter; - this.untilRegBuilders = builder.untilRegistrations; } } diff --git a/impl/core/src/test/java/io/serverlessworkflow/impl/EventDefinitionTest.java b/impl/core/src/test/java/io/serverlessworkflow/impl/EventDefinitionTest.java index 8dcba7ab..495632d0 100644 --- a/impl/core/src/test/java/io/serverlessworkflow/impl/EventDefinitionTest.java +++ b/impl/core/src/test/java/io/serverlessworkflow/impl/EventDefinitionTest.java @@ -77,6 +77,7 @@ void testUntilConsumed() throws IOException { emitOutDefinition.instance(Map.of()).start().join(); assertThat(future).isCompleted(); assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.COMPLETED); + assertThat(waitingInstance.outputAsJsonNode()).isEqualTo(temperature()); } private static Stream eventListenerParameters() { @@ -106,4 +107,11 @@ private static JsonNode doctor() { node.put("isSick", true); return mapper.createArrayNode().add(node); } + + private static JsonNode temperature() { + ObjectMapper mapper = JsonUtils.mapper(); + ObjectNode node = mapper.createObjectNode(); + node.put("temperature", 39); + return mapper.createArrayNode().add(node); + } }