-
Notifications
You must be signed in to change notification settings - Fork 0
KAFKA-17259: Support to override serverProperties and restart cluster in ClusterTestExtensions #20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -58,11 +58,13 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import java.util.Collection; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import java.util.Collections; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import java.util.HashMap; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import java.util.HashSet; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import java.util.List; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import java.util.Map; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import java.util.Map.Entry; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import java.util.Optional; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import java.util.Properties; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import java.util.Set; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import java.util.concurrent.CompletableFuture; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import java.util.concurrent.ExecutionException; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import java.util.concurrent.ExecutorService; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -314,8 +316,9 @@ private static void setupNodeDirectories(File baseDirectory, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private final Map<Integer, BrokerServer> brokers; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private final File baseDirectory; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private final SimpleFaultHandlerFactory faultHandlerFactory; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private final PreboundSocketFactoryManager socketFactoryManager; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private PreboundSocketFactoryManager socketFactoryManager; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private final String controllerListenerName; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private Map<Integer, Set<String>> nodeIdToListeners = new HashMap<>(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private KafkaClusterTestKit( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
TestKitNodes nodes, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -437,6 +440,130 @@ public void startup() throws ExecutionException, InterruptedException { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public void shutdown() throws Exception { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
List<Entry<String, Future<?>>> futureEntries = new ArrayList<>(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Note the shutdown order here is chosen to be consistent with | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// `KafkaRaftServer`. See comments in that class for an explanation. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
int brokerId = entry.getKey(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
BrokerServer broker = entry.getValue(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Set<String> listeners = nodeIdToListeners.get(brokerId); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" + | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" + | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
broker.socketServer().controlPlaneAcceptorOpt().get().localPort()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+451
to
+460
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider extracting this logic for collecting listeners into a separate, well-named method to improve readability and maintainability. This would also reduce code duplication, as the same logic is repeated for brokers and controllers. private Set<String> collectListeners(SharedServer server, int nodeId) {
Set<String> listeners = nodeIdToListeners.computeIfAbsent(nodeId, __ -> new HashSet<>());
server.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!server.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(server.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
server.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
server.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
return listeners;
}
// Usage in shutdown method:
int brokerId = entry.getKey();
BrokerServer broker = entry.getValue();
Set<String> listeners = collectListeners(broker.sharedServer(), brokerId);
nodeIdToListeners.put(brokerId, listeners); |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
nodeIdToListeners.put(brokerId, listeners); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
futureEntries.add(new SimpleImmutableEntry<>("broker" + brokerId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
executorService.submit((Runnable) broker::shutdown))); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
waitForAllFutures(futureEntries); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
futureEntries.clear(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
int controllerId = entry.getKey(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ControllerServer controller = entry.getValue(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
nodeIdToListeners.computeIfAbsent(controllerId, __ -> new HashSet<>()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Set<String> listeners = nodeIdToListeners.get(controllerId); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
controller.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (!controller.socketServer().controlPlaneAcceptorOpt().isEmpty()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
listeners.add(controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" + | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" + | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
controller.socketServer().controlPlaneAcceptorOpt().get().localPort()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
nodeIdToListeners.put(controllerId, listeners); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+452
to
+481
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Duplicated Listener CollectionIdentical listener collection logic is duplicated for both brokers and controllers. This violates DRY principle and increases maintenance burden when modifying listener collection logic.
Suggested change
Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
executorService.submit(controller::shutdown))); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
waitForAllFutures(futureEntries); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
futureEntries.clear(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
socketFactoryManager.close(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+443
to
+486
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Either bail out early when the servers were never started, or initialise - nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
+ nodeIdToListeners
+ .computeIfAbsent(brokerId, __ ->
+ new HashSet<>(List.of(broker.config()
+ .originals()
+ .getOrDefault(SocketServerConfigs.LISTENERS_CONFIG, "")
+ .toString().split(",")))); A similar fix is needed for the controller loop below. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Resource Leak RisksocketFactoryManager.close() called without error handling. If close() throws an exception, the shutdown process terminates prematurely, potentially leaving resources unclosed and causing resource leaks.
Suggested change
Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} catch (Exception e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for (Entry<String, Future<?>> entry : futureEntries) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
entry.getValue().cancel(true); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
throw e; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public void restart(Map<Integer, Map<String, Object>> perServerOverriddenConfig) throws Exception { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's crucial to ensure that the public void restart(Map<Integer, Map<String, Object>> perServerOverriddenConfig) throws Exception {
try {
shutdown();
} catch (Exception e) {
log.error("Exception during shutdown: {}", e.getMessage(), e);
throw e; // Re-throw the exception to prevent restart
} |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
shutdown(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+495
to
+497
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Resource Cleanup RiskIf shutdown() throws an exception, the restart method will abort without completing initialization of new servers. This creates a state where the system is partially shut down but not restarted, potentially causing test failures.
Suggested change
Standards
Comment on lines
+496
to
+497
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unvalidated Configuration OverrideConfiguration overrides are applied without validation, allowing potentially insecure settings. An attacker could inject malicious configurations that compromise security controls or expose sensitive information.
Suggested change
Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Map<Integer, SharedServer> jointServers = new HashMap<>(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+496
to
+498
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Resource Leak RiskThe restart() method creates a new socketFactoryManager without properly closing the previous one. If an exception occurs during restart, the old socketFactoryManager resources may leak. The shutdown() method closes it but exception paths don't ensure closure.
Suggested change
Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
socketFactoryManager = new PreboundSocketFactoryManager(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reinitializing if (socketFactoryManager != null) {
try {
socketFactoryManager.close();
} catch (Exception e) {
log.warn("Exception while closing socketFactoryManager: {}", e.getMessage(), e);
}
}
socketFactoryManager = new PreboundSocketFactoryManager(); |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
controllers.forEach((id, controller) -> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Map<String, Object> config = controller.config().originals(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap())); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap())); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+502
to
+504
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Duplicate Null CheckMissing null check for perServerConfigOverrides parameter in controller configuration. If null is passed to restart(), a NullPointerException will occur when attempting to call getOrDefault(), breaking cluster restart functionality.
Suggested change
Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id))); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line uses Set<String> listenersForNode = nodeIdToListeners.get(id);
if (listenersForNode == null || listenersForNode.isEmpty()) {
log.warn("No listeners found for node {}", id);
// Handle the case where there are no listeners, possibly by using a default listener
}
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", listenersForNode != null ? listenersForNode : Collections.emptySet())); |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+495
to
+506
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Port reservations are lost on restart – risk of “Address already in use”
Consider feeding the cached listener URIs back into the new manager before constructing servers, e.g.: socketFactoryManager = new PreboundSocketFactoryManager();
nodeIdToListeners.forEach((id, listeners) ->
listeners.forEach(l -> socketFactoryManager.reserve(id, l))); (or expose a helper in |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
TestKitNode node = nodes.controllerNodes().get(id); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
KafkaConfig nodeConfig = new KafkaConfig(config, false); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
SharedServer sharedServer = new SharedServer( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
nodeConfig, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
node.initialMetaPropertiesEnsemble(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Time.SYSTEM, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
new Metrics(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(nodeConfig.quorumConfig().voters())), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Collections.emptyList(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
faultHandlerFactory, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
socketFactoryManager.getOrCreateSocketFactory(node.id()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
controller = new ControllerServer( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
sharedServer, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
KafkaRaftServer.configSchema(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
nodes.bootstrapMetadata()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} catch (Throwable e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
log.error("Error creating controller {}", node.id(), e); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Utils.swallow(log, Level.WARN, "sharedServer.stopForController error", sharedServer::stopForController); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
throw e; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+524
to
+527
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The error handling here uses } catch (Throwable e) {
log.error("Error creating controller {}", node.id(), e);
try {
sharedServer.stopForController();
} catch (Throwable e2) {
log.warn("sharedServer.stopForController error", e2);
}
throw new RuntimeException("Error creating controller " + node.id(), e);
} |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
controllers.put(node.id(), controller); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
jointServers.put(node.id(), sharedServer); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
brokers.forEach((id, broker) -> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Map<String, Object> config = broker.config().originals(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap())); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+533
to
+535
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing Null CheckMissing null check for perServerConfigOverrides parameter. If null is passed to restart(), a NullPointerException will occur when attempting to call getOrDefault(), breaking cluster restart functionality.
Suggested change
Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap())); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id))); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
TestKitNode node = nodes.brokerNodes().get(id); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
KafkaConfig nodeConfig = new KafkaConfig(config); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
SharedServer sharedServer = jointServers.computeIfAbsent( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
node.id(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+533
to
+542
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mutating the live config map may have side-effects
Use a defensive copy before modifications: - Map<String, Object> config = broker.config().originals();
+ Map<String, Object> config = new HashMap<>(broker.config().originals()); Apply the same change to the controller block above. 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
nodeId -> new SharedServer( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
nodeConfig, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
node.initialMetaPropertiesEnsemble(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Time.SYSTEM, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
new Metrics(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(nodeConfig.quorumConfig().voters())), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Collections.emptyList(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
faultHandlerFactory, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
socketFactoryManager.getOrCreateSocketFactory(node.id()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
broker = new BrokerServer(sharedServer); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} catch (Throwable e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
log.error("Error creating broker {}", node.id(), e); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Utils.swallow(log, Level.WARN, "sharedServer.stopForBroker error", sharedServer::stopForBroker); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+524
to
+558
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unchecked Exception HandlingRaw exceptions are rethrown without wrapping in both controller and broker creation. This loses context about the failure source and can lead to confusing error messages during test failures.
Suggested change
Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
throw e; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+556
to
+559
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to the controller creation, the error handling here uses } catch (Throwable e) {
log.error("Error creating broker {}", node.id(), e);
try {
sharedServer.stopForBroker();
} catch (Throwable e2) {
log.warn("sharedServer.stopForBroker error", e2);
}
throw new RuntimeException("Error creating broker " + node.id(), e);
} |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
brokers.put(node.id(), broker); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
startup(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* Wait for a controller to mark all the brokers as ready (registered and unfenced). | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* And also wait for the metadata cache up-to-date in each broker server. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -193,6 +193,11 @@ public void stop() { | |||||||||||||||||||||||
} | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||
public void restart(Map<Integer, Map<String, Object>> perServerConfigOverrides) throws Exception { | ||||||||||||||||||||||||
clusterTestKit.restart(perServerConfigOverrides); | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
Comment on lines
+196
to
+199
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Guard against restarting an instance that has never been started If a test calls Add a fast-fail guard: @@
@Override
public void restart(Map<Integer, Map<String, Object>> perServerConfigOverrides) throws Exception {
- clusterTestKit.restart(perServerConfigOverrides);
+ if (!started.get()) {
+ throw new IllegalStateException("Cannot restart a cluster that has not been started");
+ }
+ clusterTestKit.restart(perServerConfigOverrides);
} 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||
|
||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||
public void shutdownBroker(int brokerId) { | ||||||||||||||||||||||||
findBrokerOrThrow(brokerId).shutdown(); | ||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resource Leak Risk
Multiple HashSet allocations occur when nodeIdToListeners.get() is called immediately after computeIfAbsent. This creates unnecessary object allocation and increases GC pressure during cluster restarts.
Standards