Skip to content

[Fix #722] Close executor service if created #723

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import dev.langchain4j.agentic.internal.AgentExecutor;
import dev.langchain4j.agentic.workflow.ParallelAgentService;
import io.serverlessworkflow.impl.ExecutorServiceHolder;
import java.util.List;
import java.util.concurrent.ExecutorService;

Expand All @@ -33,7 +34,7 @@ public static <T> ParallelAgentService<T> builder(Class<T> agentServiceClass) {

@Override
public ParallelAgentService<T> executorService(ExecutorService executorService) {
this.workflowExecBuilder.withExecutorFactory(() -> executorService);
this.workflowExecBuilder.withExecutorFactory(new ExecutorServiceHolder(executorService));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl;

import java.util.concurrent.ExecutorService;

public abstract class AbstractExecutorServiceHolder implements ExecutorServiceFactory {

protected ExecutorService service;

@Override
public void close() {
if (service != null && !service.isShutdown()) {
service.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,23 @@

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class DefaultExecutorServiceFactory implements ExecutorServiceFactory {
public class DefaultExecutorServiceFactory extends AbstractExecutorServiceHolder {

private static final ExecutorServiceFactory instance = new DefaultExecutorServiceFactory();

public static ExecutorServiceFactory instance() {
return instance;
}

private static class ExecutorServiceHolder {
private static ExecutorService instance = Executors.newCachedThreadPool();
}
private Lock serviceLock = new ReentrantLock();

@Override
public ExecutorService get() {
return ExecutorServiceHolder.instance;
try {
serviceLock.lock();
if (service == null) {
service = Executors.newCachedThreadPool();
}
} finally {
serviceLock.unlock();
}
return service;
}

private DefaultExecutorServiceFactory() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

@FunctionalInterface
public interface ExecutorServiceFactory extends Supplier<ExecutorService> {}
public interface ExecutorServiceFactory extends Supplier<ExecutorService>, AutoCloseable {
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl;

import java.util.concurrent.ExecutorService;

public class ExecutorServiceHolder extends AbstractExecutorServiceHolder {

public ExecutorServiceHolder(ExecutorService service) {
this.service = service;
}

@Override
public ExecutorService get() {
return service;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@
import java.util.ServiceLoader.Provider;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkflowApplication implements AutoCloseable {

private static final Logger logger = LoggerFactory.getLogger(WorkflowApplication.class);

private final TaskExecutorFactory taskFactory;
private final ExpressionFactory exprFactory;
private final ResourceLoaderFactory resourceLoaderFactory;
Expand Down Expand Up @@ -137,7 +140,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
private SchemaValidatorFactory schemaValidatorFactory;
private WorkflowPositionFactory positionFactory = () -> new QueueWorkflowPosition();
private WorkflowIdFactory idFactory = () -> UlidCreator.getMonotonicUlid().toString();
private ExecutorServiceFactory executorFactory = () -> Executors.newCachedThreadPool();
private ExecutorServiceFactory executorFactory = new DefaultExecutorServiceFactory();
private EventConsumer<?, ?> eventConsumer = InMemoryEvents.get();
private EventPublisher eventPublisher = InMemoryEvents.get();
private RuntimeDescriptorFactory descriptorFactory =
Expand Down Expand Up @@ -236,12 +239,23 @@ public WorkflowDefinition workflowDefinition(Workflow workflow) {

@Override
public void close() {
safeClose(executorFactory);
safeClose(eventPublisher);
safeClose(eventConsumer);
for (WorkflowDefinition definition : definitions.values()) {
definition.close();
safeClose(definition);
}
definitions.clear();
}

private void safeClose(AutoCloseable closeable) {
try {
closeable.close();
} catch (Exception ex) {
logger.warn("Error closing resource {}", closeable.getClass().getName(), ex);
}
}

public WorkflowPositionFactory positionFactory() {
return positionFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
import java.util.Collection;
import java.util.function.Consumer;

public interface EventConsumer<T extends EventRegistration, V extends EventRegistrationBuilder> {
public interface EventConsumer<T extends EventRegistration, V extends EventRegistrationBuilder>
extends AutoCloseable {

V listen(EventFilter filter, WorkflowApplication workflowApplication);

Expand All @@ -30,4 +31,6 @@ public interface EventConsumer<T extends EventRegistration, V extends EventRegis
T register(V builder, Consumer<CloudEvent> consumer);

void unregister(T register);

void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import io.cloudevents.CloudEvent;
import java.util.concurrent.CompletableFuture;

public interface EventPublisher {
public interface EventPublisher extends AutoCloseable {
CompletableFuture<Void> publish(CloudEvent event);

void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.cloudevents.CloudEvent;
import io.serverlessworkflow.impl.DefaultExecutorServiceFactory;
import io.serverlessworkflow.impl.ExecutorServiceFactory;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -37,6 +38,8 @@ public static InMemoryEvents get() {
return instance;
}

private ExecutorServiceFactory serviceFactory = new DefaultExecutorServiceFactory();

private Map<String, Consumer<CloudEvent>> topicMap = new ConcurrentHashMap<>();

private AtomicReference<Consumer<CloudEvent>> allConsumerRef = new AtomicReference<>();
Expand Down Expand Up @@ -64,7 +67,7 @@ public CompletableFuture<Void> publish(CloudEvent ce) {
consumer.accept(ce);
}
},
DefaultExecutorServiceFactory.instance().get());
serviceFactory.get());
}

@Override
Expand All @@ -76,4 +79,9 @@ protected void registerToAll(Consumer<CloudEvent> consumer) {
protected void unregisterFromAll() {
allConsumerRef.set(null);
}

@Override
public void close() {
serviceFactory.close();
}
}