Skip to content

Commit a18a200

Browse files
Use a singleton for DynamoDbMasterMonitor (#737)
We encountered an issue where the MasterMonitor's `shutdown` method was called somewhere. This led to clients relying on updates to the MasterMonitor stopping receiving updates but they had no mechanism of knowing that the monitor they were subscribed to was no longer updating because the observable was not completed and getting the latest version would continue to return the last known value (even though that value was not updated). We were deciding between either using a Singleton or tracking down `shutdown` calls to `MasterMonitor` and ensuring that usages could resubscribe if necessary. The latter seemed like it would be prone to breakage in the future (e.g. particularly with the master observable). Additionally, a Singleton pattern seemed fitting given that almost every reasonable use case of Mantis relies on a sustained connection to the Mantis Master. So there's no reason to expect that shutting down the executor pool for updating the master is necessary. Additionally, using a Singleton avoids any unbounded resource leakage (e.g. it is still safe to create N MasterMonitors).
1 parent 419c15f commit a18a200

File tree

3 files changed

+225
-166
lines changed

3 files changed

+225
-166
lines changed

mantis-control-plane/mantis-control-plane-dynamodb/src/main/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitor.java

Lines changed: 6 additions & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -15,180 +15,33 @@
1515
*/
1616
package io.mantisrx.extensions.dynamodb;
1717

18-
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
19-
import com.amazonaws.services.dynamodbv2.LockItem;
20-
import io.mantisrx.common.metrics.Counter;
21-
import io.mantisrx.common.metrics.Metrics;
22-
import io.mantisrx.common.metrics.MetricsRegistry;
2318
import io.mantisrx.server.core.BaseService;
24-
import io.mantisrx.server.core.json.DefaultObjectMapper;
2519
import io.mantisrx.server.core.master.MasterDescription;
2620
import io.mantisrx.server.core.master.MasterMonitor;
27-
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
28-
import java.io.IOException;
29-
import java.nio.ByteBuffer;
30-
import java.time.Duration;
31-
import java.util.Optional;
32-
import java.util.concurrent.Executors;
33-
import java.util.concurrent.ScheduledExecutorService;
34-
import java.util.concurrent.ThreadFactory;
35-
import java.util.concurrent.TimeUnit;
36-
import javax.annotation.Nullable;
3721
import lombok.extern.slf4j.Slf4j;
38-
import org.slf4j.Logger;
39-
import org.slf4j.LoggerFactory;
4022
import rx.Observable;
41-
import rx.subjects.BehaviorSubject;
4223

4324

4425
@Slf4j
4526
public class DynamoDBMasterMonitor extends BaseService implements MasterMonitor {
46-
47-
private static final Logger logger = LoggerFactory.getLogger(DynamoDBMasterMonitor.class);
48-
49-
50-
private final ThreadFactory monitorThreadFactory = r -> {
51-
Thread thread = new Thread(r);
52-
thread.setName("dynamodb-monitor-" + System.currentTimeMillis());
53-
thread.setDaemon(true); // allow JVM to shutdown if monitor is still running
54-
thread.setPriority(Thread.NORM_PRIORITY);
55-
thread.setUncaughtExceptionHandler((t, e) -> logger.error("thread: {} failed with {}", t.getName(), e.getMessage(), e) );
56-
return thread;
57-
};
58-
private final ScheduledExecutorService leaderMonitor =
59-
Executors.newScheduledThreadPool(1, monitorThreadFactory);
60-
61-
// Assuming your lock client's options are in a variable named options
62-
private final AmazonDynamoDBLockClient lockClient;
63-
64-
private final String partitionKey;
65-
66-
private final Duration pollInterval;
67-
68-
private final Duration gracefulShutdown;
69-
70-
private final BehaviorSubject<MasterDescription> masterSubject;
71-
72-
private final ObjectMapper jsonMapper = DefaultObjectMapper.getInstance();
73-
74-
private final Metrics metrics;
75-
76-
private final Counter noLockPresentCounter;
77-
private final Counter lockDecodeFailedCounter;
78-
private final Counter nullNextLeaderCounter;
27+
private final DynamoDBMasterMonitorSingleton singleton;
7928

8029
/**
8130
* Creates a MasterMonitor backed by DynamoDB. This should be used if you are using a {@link DynamoDBLeaderElector}
8231
*/
8332
public DynamoDBMasterMonitor() {
84-
this(DynamoDBClientSingleton.getLockClient(),
85-
DynamoDBClientSingleton.getPartitionKey(),
86-
Duration.parse(DynamoDBClientSingleton.getDynamoDBConf().getDynamoDBLeaderHeartbeatDuration()),
87-
Duration.parse(DynamoDBClientSingleton.getDynamoDBConf().getDynamoDBMonitorGracefulShutdownDuration()));
88-
}
89-
90-
public DynamoDBMasterMonitor(
91-
AmazonDynamoDBLockClient lockClient,
92-
String partitionKey,
93-
Duration pollInterval,
94-
Duration gracefulShutdown) {
95-
masterSubject = BehaviorSubject.create(MasterDescription.MASTER_NULL);
96-
this.lockClient = lockClient;
97-
this.partitionKey = partitionKey;
98-
this.pollInterval = pollInterval;
99-
this.gracefulShutdown = gracefulShutdown;
100-
101-
Metrics m = new Metrics.Builder()
102-
.id("DynamoDBMasterMonitor")
103-
.addCounter("no_lock_present")
104-
.addCounter("lock_decode_failed")
105-
.addCounter("null_next_leader")
106-
.build();
107-
this.metrics = MetricsRegistry.getInstance().registerAndGet(m);
108-
109-
this.noLockPresentCounter = metrics.getCounter("no_lock_present");
110-
this.lockDecodeFailedCounter = metrics.getCounter("lock_decode_failed");
111-
this.nullNextLeaderCounter = metrics.getCounter("null_next_leader");
33+
this.singleton = DynamoDBMasterMonitorSingleton.getInstance();
11234
}
11335

11436
@Override
115-
@SuppressWarnings("FutureReturnValueIgnored")
116-
public void start() {
117-
leaderMonitor.scheduleAtFixedRate(
118-
this::getCurrentLeader, 0, pollInterval.toMillis(), TimeUnit.MILLISECONDS);
119-
}
37+
public void start() {}
12038

12139
@Override
122-
public void shutdown() {
123-
logger.info("close the lock client");
124-
try {
125-
lockClient.close();
126-
} catch (IOException e) {
127-
logger.error("error closing the dynamodb lock client", e);
128-
}
129-
130-
try {
131-
final boolean isTerminated =
132-
leaderMonitor.awaitTermination(gracefulShutdown.toMillis(), TimeUnit.MILLISECONDS);
133-
if (!isTerminated) {
134-
leaderMonitor.shutdownNow();
135-
}
136-
} catch (InterruptedException e) {
137-
logger.error("error timeout waiting on leader monitor to terminate executor", e);
138-
}
139-
logger.info("leader monitor shutdown");
140-
}
141-
142-
@SuppressWarnings("FutureReturnValueIgnored")
143-
private void getCurrentLeader() {
144-
logger.info("attempting leader lookup");
145-
final Optional<LockItem> optionalLock = lockClient.getLock(partitionKey, Optional.empty());
146-
final MasterDescription nextDescription;
147-
if (optionalLock.isPresent()) {
148-
final LockItem lock = optionalLock.get();
149-
nextDescription = lock.getData().map(this::bytesToMaster).orElse(null);
150-
} else {
151-
nextDescription = null;
152-
logger.warn("no leader found");
153-
this.noLockPresentCounter.increment();
154-
}
155-
156-
if (nextDescription != null) {
157-
updateLeader(nextDescription);
158-
} else {
159-
this.nullNextLeaderCounter.increment();
160-
}
161-
}
162-
163-
private void updateLeader(@Nullable MasterDescription nextDescription) {
164-
final MasterDescription prev = Optional.ofNullable(masterSubject.getValue()).orElse(MasterDescription.MASTER_NULL);
165-
final MasterDescription next = (nextDescription == null) ? MasterDescription.MASTER_NULL : nextDescription;
166-
if (!prev.equals(next)) {
167-
logger.info("leader changer information previous {} and next {}", prev.getHostname(), next.getHostname());
168-
masterSubject.onNext(next);
169-
}
170-
}
171-
172-
private MasterDescription bytesToMaster(ByteBuffer data) {
173-
// It is possible that the underlying buffer is read more than once,
174-
// so if the offset of the buffer is at the end, rewind, so we can read it.
175-
if (!data.hasRemaining()) {
176-
data.rewind();
177-
}
178-
final byte[] bytes = new byte[data.remaining()];
179-
data.get(bytes);
180-
try {
181-
return jsonMapper.readValue(bytes, MasterDescription.class);
182-
} catch (IOException e) {
183-
logger.error("unable to parse master description bytes: {}", data, e);
184-
this.lockDecodeFailedCounter.increment();
185-
}
186-
return MasterDescription.MASTER_NULL;
187-
}
40+
public void shutdown() {}
18841

18942
@Override
19043
public Observable<MasterDescription> getMasterObservable() {
191-
return masterSubject;
44+
return singleton.getMasterSubject();
19245
}
19346

19447
/**
@@ -198,8 +51,7 @@ public Observable<MasterDescription> getMasterObservable() {
19851
* @return Latest description of the master
19952
*/
20053
@Override
201-
@Nullable
20254
public MasterDescription getLatestMaster() {
203-
return Optional.ofNullable(masterSubject.getValue()).orElse(MasterDescription.MASTER_NULL);
55+
return singleton.getMasterSubject().getValue();
20456
}
20557
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/*
2+
* Copyright 2024 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.mantisrx.extensions.dynamodb;
18+
19+
20+
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
21+
import com.amazonaws.services.dynamodbv2.LockItem;
22+
import io.mantisrx.common.metrics.Counter;
23+
import io.mantisrx.common.metrics.Metrics;
24+
import io.mantisrx.common.metrics.MetricsRegistry;
25+
import io.mantisrx.server.core.json.DefaultObjectMapper;
26+
import io.mantisrx.server.core.master.MasterDescription;
27+
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
28+
import java.io.IOException;
29+
import java.nio.ByteBuffer;
30+
import java.time.Duration;
31+
import java.util.Optional;
32+
import java.util.concurrent.Executors;
33+
import java.util.concurrent.ScheduledExecutorService;
34+
import java.util.concurrent.ThreadFactory;
35+
import java.util.concurrent.TimeUnit;
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
38+
import rx.subjects.BehaviorSubject;
39+
40+
class DynamoDBMasterMonitorSingleton {
41+
private static final Logger logger = LoggerFactory.getLogger(DynamoDBMasterMonitor.class);
42+
43+
44+
private final ThreadFactory monitorThreadFactory = r -> {
45+
Thread thread = new Thread(r);
46+
thread.setName("dynamodb-monitor-" + System.currentTimeMillis());
47+
thread.setDaemon(true); // allow JVM to shut down if monitor is still running
48+
thread.setPriority(Thread.NORM_PRIORITY);
49+
thread.setUncaughtExceptionHandler((t, e) -> logger.error("thread: {} failed with {}", t.getName(), e.getMessage(), e) );
50+
return thread;
51+
};
52+
private final ScheduledExecutorService leaderMonitor =
53+
Executors.newScheduledThreadPool(1, monitorThreadFactory);
54+
55+
// Assuming your lock client's options are in a variable named options
56+
private final AmazonDynamoDBLockClient lockClient;
57+
58+
private final String partitionKey;
59+
60+
private final Duration gracefulShutdown;
61+
62+
private final BehaviorSubject<MasterDescription> masterSubject;
63+
64+
private final ObjectMapper jsonMapper = DefaultObjectMapper.getInstance();
65+
66+
private final Duration pollInterval;
67+
68+
private final Counter noLockPresentCounter;
69+
private final Counter lockDecodeFailedCounter;
70+
private final Counter nullNextLeaderCounter;
71+
private final Counter leaderChangedCounter;
72+
private final Counter refreshedLeaderCounter;
73+
74+
private static volatile DynamoDBMasterMonitorSingleton instance = null;
75+
76+
public static synchronized DynamoDBMasterMonitorSingleton getInstance() {
77+
if (instance == null) {
78+
instance = new DynamoDBMasterMonitorSingleton();
79+
Runtime.getRuntime()
80+
.addShutdownHook(new Thread(instance::shutdown, "dynamodb-monitor-shutdown-" + instance.hashCode()));
81+
instance.start();
82+
}
83+
return instance;
84+
}
85+
86+
/**
87+
* Creates a MasterMonitor backed by DynamoDB. This should be used if you are using a {@link DynamoDBLeaderElector}
88+
*/
89+
DynamoDBMasterMonitorSingleton() {
90+
this(DynamoDBClientSingleton.getLockClient(),
91+
DynamoDBClientSingleton.getPartitionKey(),
92+
Duration.parse(DynamoDBClientSingleton.getDynamoDBConf().getDynamoDBLeaderHeartbeatDuration()),
93+
Duration.parse(DynamoDBClientSingleton.getDynamoDBConf().getDynamoDBMonitorGracefulShutdownDuration()));
94+
}
95+
96+
DynamoDBMasterMonitorSingleton(
97+
AmazonDynamoDBLockClient lockClient,
98+
String partitionKey,
99+
Duration pollInterval,
100+
Duration gracefulShutdown) {
101+
masterSubject = BehaviorSubject.create(MasterDescription.MASTER_NULL);
102+
this.lockClient = lockClient;
103+
this.partitionKey = partitionKey;
104+
this.pollInterval = pollInterval;
105+
this.gracefulShutdown = gracefulShutdown;
106+
107+
Metrics m = new Metrics.Builder()
108+
.id("DynamoDBMasterMonitor")
109+
.addCounter("no_lock_present")
110+
.addCounter("lock_decode_failed")
111+
.addCounter("null_next_leader")
112+
.addCounter("refreshed_leader")
113+
.addCounter("leader_changed")
114+
.build();
115+
Metrics metrics = MetricsRegistry.getInstance().registerAndGet(m);
116+
117+
this.noLockPresentCounter = metrics.getCounter("no_lock_present");
118+
this.lockDecodeFailedCounter = metrics.getCounter("lock_decode_failed");
119+
this.nullNextLeaderCounter = metrics.getCounter("null_next_leader");
120+
this.refreshedLeaderCounter = metrics.getCounter("refreshed_leader");
121+
this.leaderChangedCounter = metrics.getCounter("leader_changed");
122+
}
123+
124+
public void start() {
125+
logger.info("starting leader monitor");
126+
leaderMonitor.scheduleAtFixedRate(
127+
this::getCurrentLeader, 0, pollInterval.toMillis(), TimeUnit.MILLISECONDS);
128+
}
129+
130+
void shutdown() {
131+
logger.info("close the lock client");
132+
try {
133+
lockClient.close();
134+
} catch (IOException e) {
135+
logger.error("error closing the dynamodb lock client", e);
136+
}
137+
138+
try {
139+
final boolean isTerminated =
140+
leaderMonitor.awaitTermination(gracefulShutdown.toMillis(), TimeUnit.MILLISECONDS);
141+
if (!isTerminated) {
142+
leaderMonitor.shutdownNow();
143+
}
144+
} catch (InterruptedException e) {
145+
logger.error("error timeout waiting on leader monitor to terminate executor", e);
146+
}
147+
masterSubject.onCompleted();
148+
logger.info("leader monitor shutdown");
149+
}
150+
151+
@SuppressWarnings("FutureReturnValueIgnored")
152+
private void getCurrentLeader() {
153+
logger.info("attempting leader lookup");
154+
final Optional<LockItem> optionalLock = lockClient.getLock(partitionKey, Optional.empty());
155+
final MasterDescription nextDescription;
156+
if (optionalLock.isPresent()) {
157+
final LockItem lock = optionalLock.get();
158+
nextDescription = lock.getData().map(this::bytesToMaster).orElse(null);
159+
} else {
160+
nextDescription = null;
161+
logger.warn("no leader found");
162+
this.noLockPresentCounter.increment();
163+
}
164+
165+
if (nextDescription != null) {
166+
updateLeader(nextDescription);
167+
} else {
168+
this.nullNextLeaderCounter.increment();
169+
}
170+
}
171+
172+
private void updateLeader(MasterDescription nextDescription) {
173+
this.refreshedLeaderCounter.increment();
174+
final MasterDescription prev = Optional.ofNullable(masterSubject.getValue()).orElse(MasterDescription.MASTER_NULL);
175+
if (!prev.equals(nextDescription)) {
176+
this.leaderChangedCounter.increment();
177+
logger.info("leader changer information previous {} and next {}", prev.getHostname(), nextDescription.getHostname());
178+
masterSubject.onNext(nextDescription);
179+
}
180+
}
181+
182+
private MasterDescription bytesToMaster(ByteBuffer data) {
183+
// It is possible that the underlying buffer is read more than once,
184+
// so if the offset of the buffer is at the end, rewind, so we can read it.
185+
if (!data.hasRemaining()) {
186+
data.rewind();
187+
}
188+
final byte[] bytes = new byte[data.remaining()];
189+
data.get(bytes);
190+
try {
191+
return jsonMapper.readValue(bytes, MasterDescription.class);
192+
} catch (IOException e) {
193+
logger.error("unable to parse master description bytes: {}", data, e);
194+
this.lockDecodeFailedCounter.increment();
195+
}
196+
return MasterDescription.MASTER_NULL;
197+
}
198+
199+
BehaviorSubject<MasterDescription> getMasterSubject() {
200+
return masterSubject;
201+
}
202+
}

0 commit comments

Comments
 (0)