Skip to content

Commit 6ba65fa

Browse files
committed
[pinpoint-apm#8341] It is possible to specify the znode path of zookeeper used in cluster
1. Apply ZNode path config to ZookeeperClusterConfiguration 2. Support ZNode path config for collector and web module
1 parent d8ef3a7 commit 6ba65fa

File tree

20 files changed

+170
-59
lines changed

20 files changed

+170
-59
lines changed

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/flink/FlinkClusterService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public FlinkClusterService(FlinkConfiguration config, FlinkClusterConnectionMana
5252
this.config = Objects.requireNonNull(config, "config");
5353
this.serviceState = new CommonStateContext();
5454
this.clusterConnectionManager = Objects.requireNonNull(clusterConnectionManager, "clusterConnectionManager");
55-
this.pinpointFlinkClusterPath = config.getFlinkClusterZookeeperPath();
55+
this.pinpointFlinkClusterPath = config.getFlinkZNodePath();
5656
}
5757

5858
@PostConstruct

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/zookeeper/ZookeeperClusterService.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@
2828
import com.navercorp.pinpoint.collector.util.CollectorUtils;
2929
import com.navercorp.pinpoint.common.server.cluster.zookeeper.CuratorZookeeperClient;
3030
import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperClient;
31-
import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperConstants;
3231
import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperEventWatcher;
3332
import com.navercorp.pinpoint.common.server.util.concurrent.CommonState;
3433
import com.navercorp.pinpoint.common.server.util.concurrent.CommonStateContext;
35-
3634
import com.navercorp.pinpoint.common.util.Assert;
35+
3736
import org.apache.commons.lang3.StringUtils;
37+
import org.apache.curator.utils.ZKPaths;
3838
import org.apache.zookeeper.WatchedEvent;
3939
import org.apache.zookeeper.Watcher.Event.EventType;
4040
import org.slf4j.Logger;
@@ -52,6 +52,8 @@ public class ZookeeperClusterService implements ClusterService {
5252
private final Logger logger = LoggerFactory.getLogger(this.getClass());
5353

5454
private final CollectorClusterConfig config;
55+
private final String webZNodePath;
56+
5557
private final ClusterPointRouter clusterPointRouter;
5658

5759
// represented as pid@hostname (identifiers may overlap for services hosted on localhost if pids are identical)
@@ -74,6 +76,8 @@ public ZookeeperClusterService(CollectorClusterConfig config, ClusterPointRouter
7476
this.config = Objects.requireNonNull(config, "config");
7577
Assert.isTrue(config.isClusterEnable(), "clusterEnable is false");
7678

79+
this.webZNodePath = Objects.requireNonNull(config.getWebZNodePath(), "webZNodePath");
80+
7781
this.clusterPointRouter = Objects.requireNonNull(clusterPointRouter, "clusterPointRouter");
7882

7983
CollectorClusterConnectionRepository clusterRepository = new CollectorClusterConnectionRepository();
@@ -110,10 +114,12 @@ public void setUp() throws IOException {
110114
this.client = new CuratorZookeeperClient(config.getClusterAddress(), config.getClusterSessionTimeout(), watcher);
111115
this.client.connect();
112116

113-
this.profilerClusterManager = new ZookeeperProfilerClusterManager(client, serverIdentifier, clusterPointRouter.getTargetClusterPointRepository());
117+
final String connectedAgentZNodePath = ZKPaths.makePath(config.getCollectorZNodePath(), serverIdentifier);
118+
119+
this.profilerClusterManager = new ZookeeperProfilerClusterManager(client, connectedAgentZNodePath, clusterPointRouter.getTargetClusterPointRepository());
114120
this.profilerClusterManager.start();
115121

116-
this.webClusterManager = new ZookeeperClusterManager(client, ZookeeperConstants.PINPOINT_WEB_CLUSTER_PATH, clusterConnectionManager);
122+
this.webClusterManager = new ZookeeperClusterManager(client, webZNodePath, clusterConnectionManager);
117123
this.webClusterManager.start();
118124

119125
this.serviceState.changeStateStarted();
@@ -196,7 +202,7 @@ public void process(WatchedEvent event) {
196202
if (eventType == EventType.NodeChildrenChanged) {
197203
String path = event.getPath();
198204

199-
if (ZookeeperConstants.PINPOINT_WEB_CLUSTER_PATH.equals(path)) {
205+
if (webZNodePath.equals(path)) {
200206
webClusterManager.handleAndRegisterWatcher(path);
201207
} else {
202208
logger.warn("Unknown Path ChildrenChanged {}.", path);
@@ -209,7 +215,7 @@ public void process(WatchedEvent event) {
209215
public boolean handleConnected() {
210216
if (serviceState.isStarted()) {
211217
profilerClusterManager.refresh();
212-
webClusterManager.handleAndRegisterWatcher(ZookeeperConstants.PINPOINT_WEB_CLUSTER_PATH);
218+
webClusterManager.handleAndRegisterWatcher(webZNodePath);
213219
return true;
214220
} else {
215221
return false;

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/zookeeper/ZookeeperJobWorker.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,25 @@
1717
package com.navercorp.pinpoint.collector.cluster.zookeeper;
1818

1919
import com.navercorp.pinpoint.collector.cluster.zookeeper.job.ZookeeperJob;
20+
import com.navercorp.pinpoint.common.profiler.concurrent.PinpointThreadFactory;
2021
import com.navercorp.pinpoint.common.server.cluster.zookeeper.CreateNodeMessage;
2122
import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperClient;
22-
import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperConstants;
2323
import com.navercorp.pinpoint.common.server.cluster.zookeeper.exception.PinpointZookeeperException;
2424
import com.navercorp.pinpoint.common.server.util.concurrent.CommonStateContext;
2525
import com.navercorp.pinpoint.common.util.BytesUtils;
2626
import com.navercorp.pinpoint.common.util.CollectionUtils;
27-
import com.navercorp.pinpoint.common.profiler.concurrent.PinpointThreadFactory;
2827
import com.navercorp.pinpoint.rpc.util.ClassUtils;
2928
import com.navercorp.pinpoint.rpc.util.ListUtils;
3029

3130
import org.apache.commons.lang3.StringUtils;
32-
import org.apache.curator.utils.ZKPaths;
3331
import org.slf4j.Logger;
3432
import org.slf4j.LoggerFactory;
3533

3634
import java.util.ArrayList;
3735
import java.util.Arrays;
3836
import java.util.Collections;
3937
import java.util.List;
38+
import java.util.Objects;
4039
import java.util.concurrent.ConcurrentLinkedDeque;
4140
import java.util.concurrent.ThreadFactory;
4241

@@ -59,12 +58,12 @@ public class ZookeeperJobWorker implements Runnable {
5958
private final ConcurrentLinkedDeque<ZookeeperJob> zookeeperJobDeque = new ConcurrentLinkedDeque<>();
6059
private Thread workerThread;
6160

62-
public ZookeeperJobWorker(ZookeeperClient zookeeperClient, String serverIdentifier) {
61+
public ZookeeperJobWorker(ZookeeperClient zookeeperClient, String connectedAgentZNodePath) {
6362
this.zookeeperClient = zookeeperClient;
6463

6564
this.workerState = new CommonStateContext();
6665

67-
this.collectorUniqPath = ZKPaths.makePath(ZookeeperConstants.PINPOINT_COLLECTOR_CLUSTER_PATH, serverIdentifier);
66+
this.collectorUniqPath = Objects.requireNonNull(connectedAgentZNodePath, "connectedAgentZNodePath");
6867
}
6968

7069
public void start() {

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/zookeeper/ZookeeperProfilerClusterManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,10 @@ public class ZookeeperProfilerClusterManager implements ProfilerClusterManager {
4747

4848
// keep it simple - register on RUN, remove on FINISHED, skip otherwise
4949
// should only be instantiated when cluster is enabled.
50-
public ZookeeperProfilerClusterManager(ZookeeperClient client, String serverIdentifier, ClusterPointRepository<ClusterPoint<?>> profileCluster) {
50+
public ZookeeperProfilerClusterManager(ZookeeperClient client, String connectedAgentZNodePath, ClusterPointRepository<ClusterPoint<?>> profileCluster) {
5151
this.profileCluster = Objects.requireNonNull(profileCluster, "profileCluster");
5252

53-
this.worker = new ZookeeperJobWorker(client, serverIdentifier);
53+
this.worker = new ZookeeperJobWorker(client, connectedAgentZNodePath);
5454
}
5555

5656
@Override

collector/src/main/java/com/navercorp/pinpoint/collector/config/CollectorClusterConfig.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ public class CollectorClusterConfig {
4242
@Value("${cluster.listen.port:-1}")
4343
private int clusterListenPort;
4444

45-
4645
public boolean isClusterEnable() {
4746
return clusterConfiguration.isEnable();
4847
}
@@ -51,6 +50,14 @@ public String getClusterAddress() {
5150
return clusterConfiguration.getAddress();
5251
}
5352

53+
public String getWebZNodePath() {
54+
return clusterConfiguration.getWebZNodePath();
55+
}
56+
57+
public String getCollectorZNodePath() {
58+
return clusterConfiguration.getCollectorZNodePath();
59+
}
60+
5461
public int getClusterSessionTimeout() {
5562
return clusterConfiguration.getSessionTimeout();
5663
}
@@ -80,12 +87,15 @@ public void log() {
8087

8188
@Override
8289
public String toString() {
83-
return "ClusterConfig{" +
84-
"clusterEnable=" + isClusterEnable() +
85-
", clusterAddress='" + getClusterAddress() + '\'' +
86-
", clusterSessionTimeout=" + getClusterSessionTimeout() +
87-
", clusterListenIp='" + clusterListenIp + '\'' +
88-
", clusterListenPort=" + clusterListenPort +
89-
'}';
90+
final StringBuilder sb = new StringBuilder("CollectorClusterConfig{");
91+
sb.append("clusterEnable").append(isClusterEnable());
92+
sb.append(", clusterAddress='").append(getClusterAddress()).append('\'');
93+
sb.append(", webZNodePath='").append(getCollectorZNodePath()).append('\'');
94+
sb.append(", collectorZNodePath='").append(getWebZNodePath()).append('\'');
95+
sb.append(", clusterSessionTimeout=").append(getClusterSessionTimeout());
96+
sb.append(", clusterListenPort=").append(clusterListenPort);
97+
sb.append('}');
98+
return sb.toString();
9099
}
100+
91101
}

collector/src/main/java/com/navercorp/pinpoint/collector/config/FlinkConfiguration.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,19 +43,16 @@ public boolean isFlinkClusterEnable() {
4343
return clusterConfiguration.isEnable();
4444
}
4545

46-
@Value("${flink.cluster.zookeeper.path:}")
47-
protected String flinkClusterZookeeperPath;
48-
4946
public String getFlinkClusterZookeeperAddress() {
5047
return clusterConfiguration.getAddress();
5148
}
5249

53-
public int getFlinkClusterSessionTimeout() {
54-
return clusterConfiguration.getSessionTimeout();
50+
public String getFlinkZNodePath() {
51+
return clusterConfiguration.getFlinkZNodePath();
5552
}
5653

57-
public String getFlinkClusterZookeeperPath() {
58-
return flinkClusterZookeeperPath;
54+
public int getFlinkClusterSessionTimeout() {
55+
return clusterConfiguration.getSessionTimeout();
5956
}
6057

6158
@PostConstruct
@@ -73,7 +70,7 @@ public String toString() {
7370
return "FlinkConfiguration{" +
7471
"flinkClusterEnable=" + isFlinkClusterEnable() +
7572
", flinkClusterZookeeperAddress='" + getFlinkClusterZookeeperAddress() + '\'' +
76-
", flinkClusterZookeeperPath='" + flinkClusterZookeeperPath + '\'' +
73+
", flinkZNodePath='" + getFlinkZNodePath() + '\'' +
7774
", flinkClusterSessionTimeout=" + getFlinkClusterSessionTimeout() +
7875
'}';
7976
}

collector/src/main/resources/pinpoint-collector-root.properties

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ collector.statistics.agent-state.enable=true
119119
# In this case, you must set cluster.connect.address (pinpoint-web.properties); and cluster.listen.ip, cluster.listen.port (pinpoint-collector-root.properties) accordingly.
120120
cluster.enable=true
121121
cluster.zookeeper.address=${pinpoint.zookeeper.address}
122+
cluster.zookeeper.znode_root=/pinpoint-cluster
122123
cluster.zookeeper.sessiontimeout=30000
123124
cluster.listen.ip=
124125
cluster.listen.port=-1
@@ -141,5 +142,5 @@ collector.map-link.max.enable=true
141142
# Flink configuration
142143
flink.cluster.enable=false
143144
flink.cluster.zookeeper.address=${pinpoint.zookeeper.address}
144-
flink.cluster.zookeeper.path=/pinpoint-cluster/flink
145+
flink.cluster.zookeeper.znode_root=/pinpoint-cluster
145146
flink.cluster.zookeeper.sessiontimeout=3000

collector/src/test/java/com/navercorp/pinpoint/collector/cluster/zookeeper/ZookeeperJobWorkerTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.navercorp.pinpoint.test.utils.TestAwaitUtils;
3333

3434
import org.apache.curator.utils.ZKPaths;
35+
import org.apache.hadoop.mapreduce.ID;
3536
import org.junit.Assert;
3637
import org.junit.Test;
3738
import static org.mockito.Mockito.mock;
@@ -53,7 +54,7 @@ public class ZookeeperJobWorkerTest {
5354

5455
private static final String IDENTIFIER = "ZookeeperJobWorkerTest";
5556
private static final String PATH =
56-
ZKPaths.makePath(ZookeeperConstants.PINPOINT_COLLECTOR_CLUSTER_PATH, IDENTIFIER);
57+
ZKPaths.makePath(ZookeeperConstants.DEFAULT_CLUSTER_ZNODE_ROOT_PATH, ZookeeperConstants.COLLECTOR_LEAF_PATH, IDENTIFIER);
5758

5859
private final Logger logger = LoggerFactory.getLogger(this.getClass());
5960

@@ -65,7 +66,7 @@ public void test1() throws Exception {
6566
InMemoryZookeeperClient zookeeperClient = new InMemoryZookeeperClient(true);
6667
zookeeperClient.connect();
6768

68-
ZookeeperProfilerClusterManager manager = new ZookeeperProfilerClusterManager(zookeeperClient, IDENTIFIER, new ClusterPointRepository());
69+
ZookeeperProfilerClusterManager manager = new ZookeeperProfilerClusterManager(zookeeperClient, PATH, new ClusterPointRepository());
6970
manager.start();
7071

7172
ClusterPointStateChangedEventHandler clusterPointStateChangedEventHandler = new ClusterPointStateChangedEventHandler(channelPropertiesFactory, manager);
@@ -89,7 +90,7 @@ public void test2() throws Exception {
8990
InMemoryZookeeperClient zookeeperClient = new InMemoryZookeeperClient(true);
9091
zookeeperClient.connect();
9192

92-
ZookeeperProfilerClusterManager manager = new ZookeeperProfilerClusterManager(zookeeperClient, IDENTIFIER, new ClusterPointRepository());
93+
ZookeeperProfilerClusterManager manager = new ZookeeperProfilerClusterManager(zookeeperClient, PATH, new ClusterPointRepository());
9394
manager.start();
9495

9596
ClusterPointStateChangedEventHandler clusterPointStateChangedEventHandler = new ClusterPointStateChangedEventHandler(channelPropertiesFactory, manager);
@@ -113,7 +114,7 @@ public void test3() throws Exception {
113114
InMemoryZookeeperClient zookeeperClient = new InMemoryZookeeperClient(true);
114115
zookeeperClient.connect();
115116

116-
ZookeeperProfilerClusterManager manager = new ZookeeperProfilerClusterManager(zookeeperClient, IDENTIFIER, new ClusterPointRepository());
117+
ZookeeperProfilerClusterManager manager = new ZookeeperProfilerClusterManager(zookeeperClient, PATH, new ClusterPointRepository());
117118
manager.start();
118119

119120
ClusterPointStateChangedEventHandler clusterPointStateChangedEventHandler = new ClusterPointStateChangedEventHandler(channelPropertiesFactory, manager);
@@ -152,7 +153,7 @@ public void test4() throws Exception {
152153
InMemoryZookeeperClient zookeeperClient = new InMemoryZookeeperClient(true);
153154
zookeeperClient.connect();
154155

155-
ZookeeperProfilerClusterManager manager = new ZookeeperProfilerClusterManager(zookeeperClient, IDENTIFIER, new ClusterPointRepository());
156+
ZookeeperProfilerClusterManager manager = new ZookeeperProfilerClusterManager(zookeeperClient, PATH, new ClusterPointRepository());
156157
manager.start();
157158

158159
ClusterPointStateChangedEventHandler clusterPointStateChangedEventHandler = new ClusterPointStateChangedEventHandler(channelPropertiesFactory, manager);

collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/command/GrpcCommandServiceTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.navercorp.pinpoint.collector.cluster.zookeeper.ZookeeperProfilerClusterManager;
2323
import com.navercorp.pinpoint.collector.receiver.grpc.RecordedStreamObserver;
2424
import com.navercorp.pinpoint.collector.receiver.grpc.service.command.GrpcCommandService;
25+
import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperConstants;
2526
import com.navercorp.pinpoint.common.trace.ServiceType;
2627
import com.navercorp.pinpoint.grpc.Header;
2728
import com.navercorp.pinpoint.grpc.server.DefaultTransportMetadata;
@@ -40,6 +41,7 @@
4041
import io.grpc.Context;
4142
import io.grpc.stub.ServerCallStreamObserver;
4243
import io.grpc.stub.StreamObserver;
44+
import org.apache.curator.utils.ZKPaths;
4345
import org.junit.Assert;
4446
import org.junit.Test;
4547
import org.mockito.Mockito;
@@ -140,7 +142,10 @@ private ZookeeperProfilerClusterManager creteMemoryClusterManager() throws IOExc
140142
InMemoryZookeeperClient zookeeperClient = new InMemoryZookeeperClient();
141143
zookeeperClient.connect();
142144

143-
ZookeeperProfilerClusterManager manager = new ZookeeperProfilerClusterManager(zookeeperClient, this.getClass().getSimpleName(), new ClusterPointRepository());
145+
String path
146+
= ZKPaths.makePath(ZookeeperConstants.DEFAULT_CLUSTER_ZNODE_ROOT_PATH, ZookeeperConstants.COLLECTOR_LEAF_PATH, this.getClass().getSimpleName());
147+
148+
ZookeeperProfilerClusterManager manager = new ZookeeperProfilerClusterManager(zookeeperClient, path, new ClusterPointRepository());
144149
manager.start();
145150
return manager;
146151
}

0 commit comments

Comments
 (0)