Skip to content

Commit 18a3ff8

Browse files
committed
user LeaderLatch instead of LeaderSelector
1 parent e237eee commit 18a3ff8

File tree

3 files changed

+51
-103
lines changed

3 files changed

+51
-103
lines changed

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 1 addition & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import com.alibaba.fluss.server.coordinator.event.watcher.CoordinatorServerChangeWatcher;
6363
import com.alibaba.fluss.server.coordinator.event.watcher.TableChangeWatcher;
6464
import com.alibaba.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher;
65+
import com.alibaba.fluss.server.coordinator.statemachine.ReplicaState;
6566
import com.alibaba.fluss.server.coordinator.statemachine.ReplicaStateMachine;
6667
import com.alibaba.fluss.server.coordinator.statemachine.TableBucketStateMachine;
6768
import com.alibaba.fluss.server.entity.AdjustIsrResultForBucket;
@@ -139,14 +140,6 @@ public class CoordinatorEventProcessor implements EventProcessor {
139140

140141
private final CompletedSnapshotStoreManager completedSnapshotStoreManager;
141142

142-
// metrics
143-
private volatile int aliveCoordinatorServerCount;
144-
private volatile int tabletServerCount;
145-
private volatile int offlineBucketCount;
146-
private volatile int tableCount;
147-
private volatile int bucketCount;
148-
private volatile int replicasToDeleteCount;
149-
150143
public CoordinatorEventProcessor(
151144
ZooKeeperClient zooKeeperClient,
152145
CoordinatorMetadataCache serverMetadataCache,
@@ -204,20 +197,6 @@ public CoordinatorEventProcessor(
204197
this.lakeTableTieringManager = lakeTableTieringManager;
205198
this.coordinatorMetricGroup = coordinatorMetricGroup;
206199
this.internalListenerName = conf.getString(ConfigOptions.INTERNAL_LISTENER_NAME);
207-
registerMetrics();
208-
}
209-
210-
private void registerMetrics() {
211-
coordinatorMetricGroup.gauge(MetricNames.ACTIVE_COORDINATOR_COUNT, () -> 1);
212-
coordinatorMetricGroup.gauge(
213-
MetricNames.ALIVE_COORDINATOR_COUNT, () -> aliveCoordinatorServerCount);
214-
coordinatorMetricGroup.gauge(
215-
MetricNames.ACTIVE_TABLET_SERVER_COUNT, () -> tabletServerCount);
216-
coordinatorMetricGroup.gauge(MetricNames.OFFLINE_BUCKET_COUNT, () -> offlineBucketCount);
217-
coordinatorMetricGroup.gauge(MetricNames.BUCKET_COUNT, () -> bucketCount);
218-
coordinatorMetricGroup.gauge(MetricNames.TABLE_COUNT, () -> tableCount);
219-
coordinatorMetricGroup.gauge(
220-
MetricNames.REPLICAS_TO_DELETE_COUNT, () -> replicasToDeleteCount);
221200
}
222201

223202
public CoordinatorEventManager getCoordinatorEventManager() {
@@ -517,33 +496,6 @@ public void process(CoordinatorEvent event) {
517496
}
518497
}
519498

520-
private void updateMetrics() {
521-
aliveCoordinatorServerCount = coordinatorContext.getLiveCoordinatorServers().size();
522-
tabletServerCount = coordinatorContext.getLiveTabletServers().size();
523-
tableCount = coordinatorContext.allTables().size();
524-
bucketCount = coordinatorContext.bucketLeaderAndIsr().size();
525-
offlineBucketCount = coordinatorContext.getOfflineBucketCount();
526-
527-
int replicasToDeletes = 0;
528-
// for replica in partitions to be deleted
529-
for (TablePartition tablePartition : coordinatorContext.getPartitionsToBeDeleted()) {
530-
for (TableBucketReplica replica :
531-
coordinatorContext.getAllReplicasForPartition(
532-
tablePartition.getTableId(), tablePartition.getPartitionId())) {
533-
replicasToDeletes =
534-
isReplicaToDelete(replica) ? replicasToDeletes + 1 : replicasToDeletes;
535-
}
536-
}
537-
// for replica in tables to be deleted
538-
for (long tableId : coordinatorContext.getTablesToBeDeleted()) {
539-
for (TableBucketReplica replica : coordinatorContext.getAllReplicasForTable(tableId)) {
540-
replicasToDeletes =
541-
isReplicaToDelete(replica) ? replicasToDeletes + 1 : replicasToDeletes;
542-
}
543-
}
544-
this.replicasToDeleteCount = replicasToDeletes;
545-
}
546-
547499
private boolean isReplicaToDelete(TableBucketReplica replica) {
548500
ReplicaState replicaState = coordinatorContext.getReplicaState(replica);
549501
return replicaState != null && replicaState != ReplicaDeletionSuccessful;

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorLeaderElection.java

Lines changed: 47 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -20,78 +20,71 @@
2020

2121
import com.alibaba.fluss.server.zk.data.ZkData;
2222
import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.CuratorFramework;
23-
import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderSelector;
24-
import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
25-
import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.state.ConnectionState;
26-
import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory;
23+
import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch;
24+
import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatchListener;
2725

2826
import org.slf4j.Logger;
2927
import org.slf4j.LoggerFactory;
3028

31-
import java.util.concurrent.Executors;
32-
import java.util.concurrent.ScheduledExecutorService;
33-
import java.util.concurrent.TimeUnit;
29+
import java.io.IOException;
30+
import java.util.concurrent.atomic.AtomicBoolean;
3431

3532
/** Using by coordinator server. Coordinator servers listen ZK node and elect leadership. */
36-
public class CoordinatorLeaderElection {
33+
public class CoordinatorLeaderElection implements AutoCloseable {
3734
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorLeaderElection.class);
3835

39-
private final CuratorFramework zkClient;
4036
private final int serverId;
41-
private final ScheduledExecutorService executor;
37+
private final LeaderLatch leaderLatch;
38+
private final AtomicBoolean isLeader = new AtomicBoolean(false);
4239

4340
public CoordinatorLeaderElection(CuratorFramework zkClient, int serverId) {
44-
this(
45-
zkClient,
46-
serverId,
47-
Executors.newSingleThreadScheduledExecutor(
48-
new ExecutorThreadFactory("fluss-coordinator-leader-election")));
49-
}
50-
51-
protected CoordinatorLeaderElection(
52-
CuratorFramework zkClient, int serverId, ScheduledExecutorService executor) {
53-
this.zkClient = zkClient;
5441
this.serverId = serverId;
55-
this.executor = executor;
42+
this.leaderLatch =
43+
new LeaderLatch(
44+
zkClient, ZkData.CoordinatorElectionZNode.path(), String.valueOf(serverId));
5645
}
5746

5847
public void startElectLeader(Runnable initLeaderServices) {
59-
executor.schedule(() -> electLeader(initLeaderServices), 0, TimeUnit.MILLISECONDS);
60-
}
48+
leaderLatch.addListener(
49+
new LeaderLatchListener() {
50+
@Override
51+
public void isLeader() {
52+
LOG.info("Coordinator server {} has become the leader.", serverId);
53+
isLeader.set(true);
54+
}
55+
56+
@Override
57+
public void notLeader() {
58+
LOG.warn("Coordinator server {} has lost the leadership.", serverId);
59+
isLeader.set(false);
60+
}
61+
});
6162

62-
private void electLeader(Runnable initLeaderServices) {
63-
LeaderSelector leaderSelector =
64-
new LeaderSelector(
65-
zkClient,
66-
ZkData.CoordinatorElectionZNode.path(),
67-
new LeaderSelectorListener() {
68-
@Override
69-
public void takeLeadership(CuratorFramework client) {
70-
LOG.info(
71-
"Coordinator server {} win the leader in election now.",
72-
serverId);
73-
initLeaderServices.run();
63+
try {
64+
leaderLatch.start();
65+
LOG.info("Coordinator server {} started leader election.", serverId);
7466

75-
// Do not return, otherwise the leader will be released immediately.
76-
while (true) {
77-
try {
78-
Thread.sleep(1000);
79-
} catch (InterruptedException e) {
80-
}
81-
}
82-
}
67+
// todo: Currently, we await the leader latch and do nothing until it becomes leader.
68+
// Later we can make it as a hot backup server to continuously synchronize metadata from
69+
// Zookeeper, which save time from initializing context
70+
leaderLatch.await();
71+
initLeaderServices.run();
8372

84-
@Override
85-
public void stateChanged(
86-
CuratorFramework client, ConnectionState newState) {
87-
if (newState == ConnectionState.LOST) {
88-
LOG.info("Coordinator leader {} lost connection", serverId);
89-
}
90-
}
91-
});
73+
} catch (Exception e) {
74+
LOG.error("Failed to start LeaderLatch for server {}", serverId, e);
75+
throw new RuntimeException("Leader election start failed", e);
76+
}
77+
}
78+
79+
@Override
80+
public void close() throws IOException {
81+
LOG.info("Closing LeaderLatch for server {}.", serverId);
82+
if (leaderLatch != null) {
83+
leaderLatch.close();
84+
}
85+
}
9286

93-
// allow reelection
94-
leaderSelector.autoRequeue();
95-
leaderSelector.start();
87+
public boolean isLeader() {
88+
return this.isLeader.get();
9689
}
9790
}

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/CoordinatorEventManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public final class CoordinatorEventManager implements EventManager {
6262
private Histogram eventQueueTime;
6363

6464
// Coordinator metrics moved from CoordinatorEventProcessor
65+
private volatile int aliveCoordinatorServerCount;
6566
private volatile int tabletServerCount;
6667
private volatile int offlineBucketCount;
6768
private volatile int tableCount;
@@ -93,6 +94,8 @@ private void registerMetrics() {
9394

9495
// Register coordinator metrics
9596
coordinatorMetricGroup.gauge(MetricNames.ACTIVE_COORDINATOR_COUNT, () -> 1);
97+
coordinatorMetricGroup.gauge(
98+
MetricNames.ALIVE_COORDINATOR_COUNT, () -> aliveCoordinatorServerCount);
9699
coordinatorMetricGroup.gauge(
97100
MetricNames.ACTIVE_TABLET_SERVER_COUNT, () -> tabletServerCount);
98101
coordinatorMetricGroup.gauge(MetricNames.OFFLINE_BUCKET_COUNT, () -> offlineBucketCount);

0 commit comments

Comments
 (0)