From d6c44910054f75e50e4b3538dc44fe208d83f765 Mon Sep 17 00:00:00 2001 From: fjtirado Date: Wed, 20 Aug 2025 15:30:24 +0200 Subject: [PATCH] [Fix #722] Close executor service if created Signed-off-by: fjtirado --- .../langchain4j/ParallelAgentServiceImpl.java | 3 +- .../impl/AbstractExecutorServiceHolder.java | 30 +++++++++++++++++++ .../impl/DefaultExecutorServiceFactory.java | 26 ++++++++-------- .../impl/ExecutorServiceFactory.java | 5 ++-- .../impl/ExecutorServiceHolder.java | 30 +++++++++++++++++++ .../impl/WorkflowApplication.java | 20 +++++++++++-- .../impl/events/EventConsumer.java | 5 +++- .../impl/events/EventPublisher.java | 4 ++- .../impl/events/InMemoryEvents.java | 10 ++++++- 9 files changed, 111 insertions(+), 22 deletions(-) create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/AbstractExecutorServiceHolder.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/ExecutorServiceHolder.java diff --git a/fluent/agentic-langchain4j/src/main/java/io/serverlessworkflow/fluent/agentic/langchain4j/ParallelAgentServiceImpl.java b/fluent/agentic-langchain4j/src/main/java/io/serverlessworkflow/fluent/agentic/langchain4j/ParallelAgentServiceImpl.java index 54c1247d..eaaaf67a 100644 --- a/fluent/agentic-langchain4j/src/main/java/io/serverlessworkflow/fluent/agentic/langchain4j/ParallelAgentServiceImpl.java +++ b/fluent/agentic-langchain4j/src/main/java/io/serverlessworkflow/fluent/agentic/langchain4j/ParallelAgentServiceImpl.java @@ -17,6 +17,7 @@ import dev.langchain4j.agentic.internal.AgentExecutor; import dev.langchain4j.agentic.workflow.ParallelAgentService; +import io.serverlessworkflow.impl.ExecutorServiceHolder; import java.util.List; import java.util.concurrent.ExecutorService; @@ -33,7 +34,7 @@ public static ParallelAgentService builder(Class agentServiceClass) { @Override public ParallelAgentService executorService(ExecutorService executorService) { - this.workflowExecBuilder.withExecutorFactory(() -> executorService); + this.workflowExecBuilder.withExecutorFactory(new ExecutorServiceHolder(executorService)); return this; } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/AbstractExecutorServiceHolder.java b/impl/core/src/main/java/io/serverlessworkflow/impl/AbstractExecutorServiceHolder.java new file mode 100644 index 00000000..f32d8e94 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/AbstractExecutorServiceHolder.java @@ -0,0 +1,30 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +import java.util.concurrent.ExecutorService; + +public abstract class AbstractExecutorServiceHolder implements ExecutorServiceFactory { + + protected ExecutorService service; + + @Override + public void close() { + if (service != null && !service.isShutdown()) { + service.shutdown(); + } + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/DefaultExecutorServiceFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/DefaultExecutorServiceFactory.java index 1ac1f759..7710c43f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/DefaultExecutorServiceFactory.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/DefaultExecutorServiceFactory.java @@ -17,23 +17,23 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; -public class DefaultExecutorServiceFactory implements ExecutorServiceFactory { +public class DefaultExecutorServiceFactory extends AbstractExecutorServiceHolder { - private static final ExecutorServiceFactory instance = new DefaultExecutorServiceFactory(); - - public static ExecutorServiceFactory instance() { - return instance; - } - - private static class ExecutorServiceHolder { - private static ExecutorService instance = Executors.newCachedThreadPool(); - } + private Lock serviceLock = new ReentrantLock(); @Override public ExecutorService get() { - return ExecutorServiceHolder.instance; + try { + serviceLock.lock(); + if (service == null) { + service = Executors.newCachedThreadPool(); + } + } finally { + serviceLock.unlock(); + } + return service; } - - private DefaultExecutorServiceFactory() {} } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/ExecutorServiceFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/ExecutorServiceFactory.java index 7c211149..2b831037 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/ExecutorServiceFactory.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/ExecutorServiceFactory.java @@ -18,5 +18,6 @@ import java.util.concurrent.ExecutorService; import java.util.function.Supplier; -@FunctionalInterface -public interface ExecutorServiceFactory extends Supplier {} +public interface ExecutorServiceFactory extends Supplier, AutoCloseable { + void close(); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/ExecutorServiceHolder.java b/impl/core/src/main/java/io/serverlessworkflow/impl/ExecutorServiceHolder.java new file mode 100644 index 00000000..23eb9c69 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/ExecutorServiceHolder.java @@ -0,0 +1,30 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +import java.util.concurrent.ExecutorService; + +public class ExecutorServiceHolder extends AbstractExecutorServiceHolder { + + public ExecutorServiceHolder(ExecutorService service) { + this.service = service; + } + + @Override + public ExecutorService get() { + return service; + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index 96a2d47e..a89c798e 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -39,11 +39,14 @@ import java.util.ServiceLoader.Provider; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class WorkflowApplication implements AutoCloseable { + private static final Logger logger = LoggerFactory.getLogger(WorkflowApplication.class); + private final TaskExecutorFactory taskFactory; private final ExpressionFactory exprFactory; private final ResourceLoaderFactory resourceLoaderFactory; @@ -137,7 +140,7 @@ public SchemaValidator getValidator(SchemaInline inline) { private SchemaValidatorFactory schemaValidatorFactory; private WorkflowPositionFactory positionFactory = () -> new QueueWorkflowPosition(); private WorkflowIdFactory idFactory = () -> UlidCreator.getMonotonicUlid().toString(); - private ExecutorServiceFactory executorFactory = () -> Executors.newCachedThreadPool(); + private ExecutorServiceFactory executorFactory = new DefaultExecutorServiceFactory(); private EventConsumer eventConsumer = InMemoryEvents.get(); private EventPublisher eventPublisher = InMemoryEvents.get(); private RuntimeDescriptorFactory descriptorFactory = @@ -236,12 +239,23 @@ public WorkflowDefinition workflowDefinition(Workflow workflow) { @Override public void close() { + safeClose(executorFactory); + safeClose(eventPublisher); + safeClose(eventConsumer); for (WorkflowDefinition definition : definitions.values()) { - definition.close(); + safeClose(definition); } definitions.clear(); } + private void safeClose(AutoCloseable closeable) { + try { + closeable.close(); + } catch (Exception ex) { + logger.warn("Error closing resource {}", closeable.getClass().getName(), ex); + } + } + public WorkflowPositionFactory positionFactory() { return positionFactory; } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventConsumer.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventConsumer.java index 00c1619e..c08b3d2d 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventConsumer.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventConsumer.java @@ -21,7 +21,8 @@ import java.util.Collection; import java.util.function.Consumer; -public interface EventConsumer { +public interface EventConsumer + extends AutoCloseable { V listen(EventFilter filter, WorkflowApplication workflowApplication); @@ -30,4 +31,6 @@ public interface EventConsumer consumer); void unregister(T register); + + void close(); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventPublisher.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventPublisher.java index 08cc121d..5ac724ce 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventPublisher.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventPublisher.java @@ -18,6 +18,8 @@ import io.cloudevents.CloudEvent; import java.util.concurrent.CompletableFuture; -public interface EventPublisher { +public interface EventPublisher extends AutoCloseable { CompletableFuture publish(CloudEvent event); + + void close(); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java index edd2dad7..02f1b9cf 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java @@ -17,6 +17,7 @@ import io.cloudevents.CloudEvent; import io.serverlessworkflow.impl.DefaultExecutorServiceFactory; +import io.serverlessworkflow.impl.ExecutorServiceFactory; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -37,6 +38,8 @@ public static InMemoryEvents get() { return instance; } + private ExecutorServiceFactory serviceFactory = new DefaultExecutorServiceFactory(); + private Map> topicMap = new ConcurrentHashMap<>(); private AtomicReference> allConsumerRef = new AtomicReference<>(); @@ -64,7 +67,7 @@ public CompletableFuture publish(CloudEvent ce) { consumer.accept(ce); } }, - DefaultExecutorServiceFactory.instance().get()); + serviceFactory.get()); } @Override @@ -76,4 +79,9 @@ protected void registerToAll(Consumer consumer) { protected void unregisterFromAll() { allConsumerRef.set(null); } + + @Override + public void close() { + serviceFactory.close(); + } }