Skip to content

Commit d9618fa

Browse files
committed
[Fix #722] Close executor service if created
Signed-off-by: fjtirado <[email protected]>
1 parent 9f7b967 commit d9618fa

File tree

6 files changed

+57
-21
lines changed

6 files changed

+57
-21
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/ExecutorServiceFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,6 @@
1818
import java.util.concurrent.ExecutorService;
1919
import java.util.function.Supplier;
2020

21-
@FunctionalInterface
22-
public interface ExecutorServiceFactory extends Supplier<ExecutorService> {}
21+
public interface ExecutorServiceFactory extends Supplier<ExecutorService>, AutoCloseable {
22+
void close();
23+
}

impl/core/src/main/java/io/serverlessworkflow/impl/DefaultExecutorServiceFactory.java renamed to impl/core/src/main/java/io/serverlessworkflow/impl/ExecutorServiceHolder.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,31 @@
1717

1818
import java.util.concurrent.ExecutorService;
1919
import java.util.concurrent.Executors;
20+
import java.util.concurrent.locks.Lock;
21+
import java.util.concurrent.locks.ReentrantLock;
2022

21-
public class DefaultExecutorServiceFactory implements ExecutorServiceFactory {
23+
public class ExecutorServiceHolder implements ExecutorServiceFactory {
2224

23-
private static final ExecutorServiceFactory instance = new DefaultExecutorServiceFactory();
24-
25-
public static ExecutorServiceFactory instance() {
26-
return instance;
27-
}
28-
29-
private static class ExecutorServiceHolder {
30-
private static ExecutorService instance = Executors.newCachedThreadPool();
31-
}
25+
private ExecutorService service;
26+
private Lock serviceLock = new ReentrantLock();
3227

3328
@Override
3429
public ExecutorService get() {
35-
return ExecutorServiceHolder.instance;
30+
try {
31+
serviceLock.lock();
32+
if (service == null) {
33+
service = Executors.newCachedThreadPool();
34+
}
35+
} finally {
36+
serviceLock.unlock();
37+
}
38+
return service;
3639
}
3740

38-
private DefaultExecutorServiceFactory() {}
41+
@Override
42+
public void close() {
43+
if (service != null) {
44+
service.shutdown();
45+
}
46+
}
3947
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,14 @@
3939
import java.util.ServiceLoader.Provider;
4040
import java.util.concurrent.ConcurrentHashMap;
4141
import java.util.concurrent.ExecutorService;
42-
import java.util.concurrent.Executors;
4342
import java.util.stream.Collectors;
43+
import org.slf4j.Logger;
44+
import org.slf4j.LoggerFactory;
4445

4546
public class WorkflowApplication implements AutoCloseable {
4647

48+
private static final Logger logger = LoggerFactory.getLogger(WorkflowApplication.class);
49+
4750
private final TaskExecutorFactory taskFactory;
4851
private final ExpressionFactory exprFactory;
4952
private final ResourceLoaderFactory resourceLoaderFactory;
@@ -137,7 +140,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
137140
private SchemaValidatorFactory schemaValidatorFactory;
138141
private WorkflowPositionFactory positionFactory = () -> new QueueWorkflowPosition();
139142
private WorkflowIdFactory idFactory = () -> UlidCreator.getMonotonicUlid().toString();
140-
private ExecutorServiceFactory executorFactory = () -> Executors.newCachedThreadPool();
143+
private ExecutorServiceFactory executorFactory = new ExecutorServiceHolder();
141144
private EventConsumer<?, ?> eventConsumer = InMemoryEvents.get();
142145
private EventPublisher eventPublisher = InMemoryEvents.get();
143146
private RuntimeDescriptorFactory descriptorFactory =
@@ -236,12 +239,23 @@ public WorkflowDefinition workflowDefinition(Workflow workflow) {
236239

237240
@Override
238241
public void close() {
242+
safeClose(executorFactory);
243+
safeClose(eventPublisher);
244+
safeClose(eventConsumer);
239245
for (WorkflowDefinition definition : definitions.values()) {
240-
definition.close();
246+
safeClose(definition);
241247
}
242248
definitions.clear();
243249
}
244250

251+
private void safeClose(AutoCloseable closeable) {
252+
try {
253+
closeable.close();
254+
} catch (Exception ex) {
255+
logger.warn("Error closing resource {}", closeable.getClass().getName(), ex);
256+
}
257+
}
258+
245259
public WorkflowPositionFactory positionFactory() {
246260
return positionFactory;
247261
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/EventConsumer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
import java.util.Collection;
2222
import java.util.function.Consumer;
2323

24-
public interface EventConsumer<T extends EventRegistration, V extends EventRegistrationBuilder> {
24+
public interface EventConsumer<T extends EventRegistration, V extends EventRegistrationBuilder>
25+
extends AutoCloseable {
2526

2627
V listen(EventFilter filter, WorkflowApplication workflowApplication);
2728

@@ -30,4 +31,6 @@ public interface EventConsumer<T extends EventRegistration, V extends EventRegis
3031
T register(V builder, Consumer<CloudEvent> consumer);
3132

3233
void unregister(T register);
34+
35+
void close();
3336
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/EventPublisher.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import io.cloudevents.CloudEvent;
1919
import java.util.concurrent.CompletableFuture;
2020

21-
public interface EventPublisher {
21+
public interface EventPublisher extends AutoCloseable {
2222
CompletableFuture<Void> publish(CloudEvent event);
23+
24+
void close();
2325
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
package io.serverlessworkflow.impl.events;
1717

1818
import io.cloudevents.CloudEvent;
19-
import io.serverlessworkflow.impl.DefaultExecutorServiceFactory;
19+
import io.serverlessworkflow.impl.ExecutorServiceFactory;
20+
import io.serverlessworkflow.impl.ExecutorServiceHolder;
2021
import java.util.Map;
2122
import java.util.concurrent.CompletableFuture;
2223
import java.util.concurrent.ConcurrentHashMap;
@@ -37,6 +38,8 @@ public static InMemoryEvents get() {
3738
return instance;
3839
}
3940

41+
private ExecutorServiceFactory serviceFactory = new ExecutorServiceHolder();
42+
4043
private Map<String, Consumer<CloudEvent>> topicMap = new ConcurrentHashMap<>();
4144

4245
private AtomicReference<Consumer<CloudEvent>> allConsumerRef = new AtomicReference<>();
@@ -64,7 +67,7 @@ public CompletableFuture<Void> publish(CloudEvent ce) {
6467
consumer.accept(ce);
6568
}
6669
},
67-
DefaultExecutorServiceFactory.instance().get());
70+
serviceFactory.get());
6871
}
6972

7073
@Override
@@ -76,4 +79,9 @@ protected void registerToAll(Consumer<CloudEvent> consumer) {
7679
protected void unregisterFromAll() {
7780
allConsumerRef.set(null);
7881
}
82+
83+
@Override
84+
public void close() {
85+
serviceFactory.close();
86+
}
7987
}

0 commit comments

Comments
 (0)