-
Notifications
You must be signed in to change notification settings - Fork 0
KAFKA-18894: Add KIP-877 support for ConfigProvider #23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
WalkthroughRefactors config provider handling to use Plugin wrappers across clients and connect modules. Updates ConfigTransformer API and call sites, adjusts provider loading (Plugins.newConfigProvider) to return wrapped plugins with metrics support, updates resource cleanup, and adds/updates tests including a new MonitorableConfigProvider test utility and metrics assertions. Changes
Sequence Diagram(s)sequenceDiagram
participant C as Caller (Worker/AbstractConfig/MirrorMakerConfig)
participant PLS as Plugins
participant Pl as Plugin<ConfigProvider>
participant CP as ConfigProvider
participant CT as ConfigTransformer
C->>PLS: newConfigProvider(config, providerName, classLoaderUsage, metrics)
PLS->>PLS: load class, instantiate, configure
PLS->>Pl: wrapInstance(provider, metrics, {provider})
PLS-->>C: Plugin<ConfigProvider>
C->>CT: new ConfigTransformer(Map<String, Plugin<ConfigProvider>>)
C->>CT: transform(data, ...)
CT->>Pl: get provider plugin by name
CT->>CP: plugin.get().get(path, keys)
CP-->>CT: ConfigData
CT-->>C: Transformed result
sequenceDiagram
participant W as Worker
participant PLS as Plugins
participant Pl as Plugin<ConfigProvider>
participant WCT as WorkerConfigTransformer
participant CT as ConfigTransformer
W->>PLS: newConfigProvider(..., providerName, ..., metrics)
PLS-->>W: Plugin<ConfigProvider>
W->>WCT: construct with Map<name, Plugin<ConfigProvider>>
WCT->>CT: construct with same map
WCT->>CT: transform(...)
CT->>Pl: resolve provider
Pl-->>CT: underlying ConfigProvider
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ast-grep (0.38.6)connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @arvi18, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request implements KIP-877 support for Kafka's ConfigProvider
interface, enhancing observability by enabling metric reporting for config provider plugins. It introduces a Plugin
wrapper for ConfigProvider
instances, standardizing their lifecycle management and providing a hook for metric integration. This change necessitates updates across various core and Connect modules to adapt to the new Plugin
-wrapped ConfigProvider
objects, ensuring that config providers can now expose valuable operational metrics.
Highlights
- Introduction of Plugin Wrapper for ConfigProviders: The core change involves wrapping
ConfigProvider
instances within a newPlugin
class. ThisPlugin
wrapper now manages the lifecycle and provides a standardized mechanism for integrating features like metric reporting (KIP-877) forConfigProvider
implementations. - Enabling Metric Support for ConfigProviders: The
ConfigProvider
interface's Javadoc has been updated to explicitly state that implementations can now implement theMonitorable
interface to expose metrics. This change is crucial for enabling KIP-877 support, allowing operators to gain deeper insights into the behavior of their config providers. - Refactoring of Plugin Instantiation Logic: The
Plugins.newConfigProvider
method, responsible for instantiating config providers, has been refactored. It now returns aPlugin<ConfigProvider>
and accepts aMetrics
object, which is then passed to the wrappedConfigProvider
if it implementsMonitorable
, facilitating metric registration. - Widespread Adaption of New Plugin Type: Numerous internal classes such as
AbstractConfig
,ConfigTransformer
,MirrorMakerConfig
,Worker
, andWorkerConfigTransformer
have been updated to correctly handle the newPlugin<ConfigProvider>
type, ensuring seamless integration and proper resource management (e.g., closing plugins). - Comprehensive Test Coverage for New Functionality: New and updated test cases, including the addition of
MonitorableConfigProvider
andCustomMonitorableConfigProvider
, have been introduced to validate the correct behavior of metric registration and the overall integration of thePlugin
wrapper forConfigProvider
instances.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command>
or @gemini-code-assist <command>
. Below is a summary of the supported commands.
Feature | Command | Description |
---|---|---|
Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/
folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
KAFKA-18894: Add KIP-877 support for ConfigProviderTL;DR: Adds monitoring capabilities to ConfigProvider implementations by supporting the Monitorable interface for metrics collection. Refacto PR SummaryEnhances ConfigProvider to support metrics collection through the Monitorable interface implementation. Change HighlightsClick to expand
Sequence DiagramsequenceDiagram
participant App as Application
participant Config as AbstractConfig
participant Transformer as ConfigTransformer
participant Plugin as Plugin<ConfigProvider>
participant Provider as ConfigProvider
participant Metrics as Metrics
App->>Config: instantiateConfigProviders()
Config->>Provider: create instance
Config->>Provider: configure(properties)
Config->>Plugin: wrapInstance(provider, metrics)
Plugin->>Provider: withPluginMetrics() if Monitorable
Config->>Transformer: new ConfigTransformer(providerPlugins)
App->>Transformer: transform(configs)
Transformer->>Plugin: get().get(path, keys)
Provider->>Metrics: report metrics
Testing GuideClick to expand
|
/refacto-test |
Refacto is reviewing this PR. Please wait for the review comments to be posted. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request successfully adds metrics support for ConfigProvider
as per KIP-877. The core of the change is wrapping ConfigProvider
instances in a Plugin
object, which manages metrics registration. This is applied consistently, for instance in AbstractConfig
and more robustly in the Kafka Connect Worker
where a Metrics
context is available. The changes are well-tested, with the addition of MonitorableConfigProvider
and related test cases to validate the new functionality. The implementation is clean and effective. I have one minor suggestion to improve the consistency of metric tagging.
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
Show resolved
Hide resolved
Code Review: ConfigProvider Plugin Integration👍 Well Done
📌 Files Processed
📝 Additional Comments
|
ConfigTransformerResult result = configTransformer.transform(indirectVariables); | ||
if (!result.data().isEmpty()) { | ||
resolvedOriginals.putAll(result.data()); | ||
} | ||
} | ||
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider")); | ||
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Plugin Closure Risk
The code attempts to close ConfigProvider instances directly, but now needs to close Plugin instances. This mismatch could cause resource leaks when providers aren't properly closed.
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); | |
providerPlugins.values().forEach(x -> Utils.closeQuietly(x.get(), "config provider")); |
Standards
- ISO-IEC-25010-Reliability-Recoverability
- ISO-IEC-25010-Functional-Correctness-Appropriateness
- DbC-Resource-Mgmt
- SRE-Resource-Management
@@ -97,6 +98,6 @@ private void scheduleReload(String connectorName, String path, long ttl) { | |||
|
|||
@Override | |||
public void close() { | |||
configProviders.values().forEach(x -> Utils.closeQuietly(x, "config provider")); | |||
configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent Provider Closing
The code closes Plugin objects directly instead of accessing the underlying ConfigProvider instances. This inconsistency with other fixes could lead to resource leaks if Plugin.get() isn't called.
configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); | |
configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x.get(), "config provider")); |
Standards
- ISO-IEC-25010-Reliability-Recoverability
- ISO-IEC-25010-Functional-Correctness-Consistency
- DbC-Resource-Mgmt
- SRE-Resource-Management
for (Map.Entry<String, Set<String>> pathWithKeys : keysByPath.entrySet()) { | ||
String path = pathWithKeys.getKey(); | ||
Set<String> keys = new HashSet<>(pathWithKeys.getValue()); | ||
ConfigData configData = provider.get(path, keys); | ||
ConfigData configData = providerPlugin.get().get(path, keys); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing Null Check
No null check on providerPlugin.get() before invoking get(path, keys). If the plugin returns null, this will cause a NullPointerException and service disruption.
ConfigData configData = providerPlugin.get().get(path, keys); | |
ConfigProvider provider = providerPlugin.get(); | |
if (provider == null) { | |
throw new ConfigException("ConfigProvider plugin returned null provider for " + providerName); | |
} | |
ConfigData configData = provider.get(path, keys); |
Standards
- ISO-IEC-25010-Reliability-Fault-Tolerance
- ISO-IEC-25010-Functional-Correctness-Appropriateness
- DbC-Precondition
- SRE-Defensive-Programming
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider( | ||
config, | ||
providerName, | ||
ClassLoaderUsage.PLUGINS, | ||
metrics.metrics() | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resource Leak Risk
The plugin instance is created but not properly closed if an exception occurs during initialization. If any provider fails to initialize, previous successfully initialized providers won't be closed, causing resource leaks.
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider( | |
config, | |
providerName, | |
ClassLoaderUsage.PLUGINS, | |
metrics.metrics() | |
); | |
Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>(); | |
List<Plugin<ConfigProvider>> initializedPlugins = new ArrayList<>(); | |
try { | |
for (String providerName : providerNames) { | |
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider( | |
config, | |
providerName, | |
ClassLoaderUsage.PLUGINS, | |
metrics.metrics() | |
); | |
initializedPlugins.add(configProviderPlugin); | |
providerPluginMap.put(providerName, configProviderPlugin); | |
} | |
return new WorkerConfigTransformer(this, providerPluginMap); | |
} catch (Exception e) { | |
initializedPlugins.forEach(plugin -> Utils.closeQuietly(plugin, "config provider plugin")); | |
throw e; | |
} |
Standards
- Resource-Management-Completeness
- Logic-Verification-Error-Handling
- Algorithm-Correctness-Exception-Safety
@@ -643,7 +646,7 @@ public ConfigProvider newConfigProvider(AbstractConfig config, String providerPr | |||
try (LoaderSwap loaderSwap = safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) { | |||
plugin.configure(configProviderConfig); | |||
} | |||
return plugin; | |||
return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resource Leak Risk
ConfigProvider resource is wrapped but not closed if configuration fails. If configure() throws exception, the plugin instance leaks resources as close() is never called.
return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName)); | |
try { | |
plugin.configure(configProviderConfig); | |
return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName)); | |
} catch (Exception e) { | |
Utils.closeQuietly(plugin, "config provider"); | |
throw e; | |
} |
Standards
- CWE-772
- OWASP-A06
@@ -56,15 +57,15 @@ public class ConfigTransformer { | |||
public static final Pattern DEFAULT_PATTERN = Pattern.compile("\\$\\{([^}]*?):(([^}]*?):)?([^}]*?)\\}"); | |||
private static final String EMPTY_PATH = ""; | |||
|
|||
private final Map<String, ConfigProvider> configProviders; | |||
private final Map<String, Plugin<ConfigProvider>> configProviderPlugins; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Abstraction Leakage
Implementation detail (Plugin wrapper) leaks into ConfigTransformer API. This creates unnecessary coupling between ConfigTransformer and Plugin implementation, making future changes to plugin architecture more difficult.
private final Map<String, Plugin<ConfigProvider>> configProviderPlugins; | |
private final Map<String, ConfigProvider> configProviders; | |
public ConfigTransformer(Map<String, Plugin<ConfigProvider>> configProviderPlugins) { | |
Map<String, ConfigProvider> providers = new HashMap<>(); | |
configProviderPlugins.forEach((name, plugin) -> providers.put(name, plugin.get())); | |
this.configProviders = providers; | |
} |
Standards
- SOLID-DIP
- Clean-Code-Abstraction
- Design-Pattern-Facade
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG); | ||
configProviderPluginInstances.put(entry.getKey(), providerPlugin); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inefficient Map Creation
Creating a provider map without specifying initial capacity causes rehashing overhead. With multiple providers, this leads to unnecessary memory allocations and CPU overhead.
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG); | |
configProviderPluginInstances.put(entry.getKey(), providerPlugin); | |
Map<String, Plugin<ConfigProvider>> configProviderPluginInstances = new HashMap<>(providerMap.size()); | |
for (Map.Entry<String, String> entry : providerMap.entrySet()) { |
Standards
- ISO-IEC-25010-Performance-Resource-Utilization
- Algorithm-Opt-Memory-Allocation
- Google-Performance-Best-Practices
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (2)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (1)
632-650
: Close partially-initialized providers on configure failure to avoid leaksIf
plugin.configure(...)
throws, the created provider instance will not be wrapped or closed, potentially leaking resources (ConfigProvider is Closeable). Wrap the configure call and close the provider on failure.Apply this diff:
try (LoaderSwap loaderSwap = safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) { - plugin.configure(configProviderConfig); + try { + plugin.configure(configProviderConfig); + } catch (RuntimeException e) { + // Avoid leaking a partially-initialized provider + Utils.closeQuietly(plugin, "config provider plugin"); + throw e; + } } return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (1)
598-640
: Mirror leak-proof configure semantics for providers instantiated hereJust like in Plugins.newConfigProvider, if
provider.configure(...)
throws, the instantiated provider is not closed, which can leak resources (providers are Closeable). Close on failure.Apply this diff:
- ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class); - provider.configure(configProperties); - Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG); + ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class); + try { + provider.configure(configProperties); + } catch (RuntimeException e) { + Utils.closeQuietly(provider, "config provider plugin"); + throw e; + } + Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG); configProviderPluginInstances.put(entry.getKey(), providerPlugin);
🧹 Nitpick comments (6)
clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java (1)
32-35
: Clarify when metrics and tags are actually registeredMetrics are only attached when the provider implements Monitorable and is wrapped with non-null Metrics. The "provider" tag is guaranteed when loaded via Connect’s Plugins.newConfigProvider; in other paths (e.g., clients code), only "config" and "class" may be present or metrics may not be registered at all. Suggest tightening the Javadoc to avoid overpromising.
- * <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the config provider to register metrics. - * The following tags are automatically added to all metrics registered: <code>config</code> set to - * <code>config.providers</code>, <code>class</code> set to the ConfigProvider class name, - * and <code>provider</code> set to the provider name. + * <p>To enable metrics for a config provider, implement {@link org.apache.kafka.common.metrics.Monitorable} + * and ensure the provider is loaded via a plugin wrapper with a non-null {@code Metrics} instance + * (for example, via Kafka Connect’s {@code Plugins.newConfigProvider}). + * When metrics are enabled, the following tags are added: + * <code>config</code> (typically {@code config.providers}), <code>class</code> (the provider’s simple class name), + * and, when loaded via Connect, <code>provider</code> (the configured provider name).connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (1)
48-52
: Prevent accidental external mutation of provider mapMake a defensive, unmodifiable copy to avoid surprises if the caller mutates the map after construction.
- this.configProviderPlugins = configProviderPlugins; + this.configProviderPlugins = Map.copyOf(configProviderPlugins);connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (1)
150-158
: Minor NPE safety in test providerUse constant-first equals to avoid potential NPEs if path were null in future edits.
- if (path.equals(TEST_PATH)) { + if (TEST_PATH.equals(path)) {clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
29-58
: Consider returning empty ConfigData instead of null in get() methodsReturning null from
get(...)
can force callers/tests to add null checks. Returning an emptyConfigData
would be safer for general reuse as a test utility.connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (1)
2904-2927
: Stabilize the test by asserting provider instantiation (and optionally starting the worker)This test assumes the worker constructs both providers during initialization. To harden it:
- Verify both providers were requested from
plugins
with the correct names.- Optionally call
worker.start()
to guarantee provider initialization if future refactors defer it.Apply this diff to assert interactions:
worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); Metrics metrics = worker.metrics().metrics(); + // Verify both providers were instantiated via Plugins + verify(plugins).newConfigProvider(any(AbstractConfig.class), eq("monitorable"), any(ClassLoaderUsage.class), any(Metrics.class)); + verify(plugins).newConfigProvider(any(AbstractConfig.class), eq("monitorable2"), any(ClassLoaderUsage.class), any(Metrics.class));Optionally, if acceptable for the suite, call
worker.start()
andworker.stop()
around the assertions to ensure deterministic provider setup and teardown.clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)
98-105
: Minor cleanup: remove redundant null-check and avoid repeated providerPlugin.get()keysByPath is non-null (from the entry itself), and calling get() on each iteration is unnecessary.
Apply this diff:
- Plugin<ConfigProvider> providerPlugin = configProviderPlugins.get(providerName); - Map<String, Set<String>> keysByPath = entry.getValue(); - if (providerPlugin != null && keysByPath != null) { - for (Map.Entry<String, Set<String>> pathWithKeys : keysByPath.entrySet()) { + Plugin<ConfigProvider> providerPlugin = configProviderPlugins.get(providerName); + Map<String, Set<String>> keysByPath = entry.getValue(); + if (providerPlugin != null) { + ConfigProvider provider = providerPlugin.get(); + for (Map.Entry<String, Set<String>> pathWithKeys : keysByPath.entrySet()) { String path = pathWithKeys.getKey(); Set<String> keys = new HashSet<>(pathWithKeys.getValue()); - ConfigData configData = providerPlugin.get().get(path, keys); + ConfigData configData = provider.get(path, keys);
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these settings in your CodeRabbit configuration.
📒 Files selected for processing (12)
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
(4 hunks)clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
(3 hunks)clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java
(1 hunks)clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
(5 hunks)clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java
(1 hunks)connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
(2 hunks)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
(1 hunks)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
(3 hunks)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
(3 hunks)connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
(5 hunks)connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
(5 hunks)connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
(4 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (10)
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin
(33-92)
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (3)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin
(33-92)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (1)
Plugins
(59-696)clients/src/main/java/org/apache/kafka/common/utils/Utils.java (1)
Utils
(93-1700)
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin
(33-92)clients/src/main/java/org/apache/kafka/common/utils/Utils.java (1)
Utils
(93-1700)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin
(33-92)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java (1)
WorkerConfig
(58-463)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala (1)
withPluginMetrics
(6299-6301)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin
(33-92)
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin
(33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin
(33-92)clients/src/main/java/org/apache/kafka/common/utils/Utils.java (1)
Utils
(93-1700)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (3)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
MonitorableConfigProvider
(29-58)clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin
(33-92)core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala (1)
withPluginMetrics
(6299-6301)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (3)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
MonitorableConfigProvider
(29-58)clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin
(33-92)clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java (1)
Metrics
(70-696)
🔇 Additional comments (15)
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (2)
20-20
: Import addition aligns with Plugin-wrapped providersSwitch to org.apache.kafka.common.internals.Plugin is correct and consistent with the updated transformer APIs.
49-49
: Tests correctly updated to use Plugin-wrapped providers
- Wrapping TestConfigProvider via Plugin.wrapInstance and passing a Map<String, Plugin> into ConfigTransformer matches the new API.
- Using Map.of for brevity/readability is fine.
Also applies to: 54-54, 63-63, 72-72, 79-79, 86-86, 93-93, 102-102
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (1)
26-26
: Import is correctImporting org.apache.kafka.common.internals.Plugin is required with the new provider wrapping.
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (2)
46-52
: Constructor and field refactor to Plugin-wrapped providers looks soundHolding Plugin and passing them directly into ConfigTransformer matches the new design.
101-102
: Properly closing provider pluginsClosing the Plugin wrappers (not just the underlying providers) is correct; this also ensures pluginMetrics are closed.
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (2)
69-69
: Test wiring updated to Plugin-wrapped providerWrapping TestConfigProvider with Plugin.wrapInstance and injecting into WorkerConfigTransformer aligns with the production code path.
75-75
: Map.of usage and transform calls are correctThe updated calls succinctly exercise the transformer with the right key/values and reflect the TTL scheduling scenarios.
Also applies to: 100-100, 115-115, 122-122
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (1)
550-560
: Good resource hygiene: wrappers are closed after useUsing Plugin-wrapped providers and closing them via
Utils.closeQuietly
is the right lifecycle management given the newPlugin<ConfigProvider>
API. No issues here.connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (2)
382-390
: API adaptation looks correctStubbing the new 4-arg
Plugins.newConfigProvider
and returning a wrapped plugin with the expected extra tags aligns with the updated API and metrics tagging scheme.
2929-2951
: Metric assertions are precise and future-proofedMatching the full tags map and checking both name and description ensures we’re validating the correct plugin metrics without relying on ordering or incidental metrics. Looks good.
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (3)
381-403
: Correct migration to Plugin-wrapped providers in testAsserting on
plugin.get()
and continuing to validate classloader behavior keeps the test intent intact after the API change.
404-412
: Nice validation of withPluginMetrics invocation orderOverriding
withPluginMetrics
to assertconfigured
is a solid way to lock in the contract that metrics are attached after configuration.
810-816
: LGTM: minimal CustomMonitorableConfigProvider for order assertionThe override is scoped and clear; no issues.
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java (1)
212-223
: Provider plugins: wrapping and lifecycle verified — no action requiredVerified that WorkerConfigTransformer holds Plugin instances and closes them; Plugins.newConfigProvider wraps providers with metrics.
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
- field: line ~46 — private final Map<String, Plugin> configProviderPlugins;
- close(): lines ~100–102 — public void close() { configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); }
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
- newConfigProvider(): lines ~632–650 — signature accepts Metrics and returns Plugin; uses Plugin.wrapInstance(plugin, metrics, ...)
No change required; the plugin wrappers and their PluginMetrics are released on close.
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)
60-69
: Adoption of Plugin-wrapped providers is correctSwitching to Map<String, Plugin> and adjusting the constructor aligns with the new plugin/metrics model. No issues here.
Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>(); | ||
for (String name : providerNames) { | ||
ConfigProvider configProvider = plugins.newConfigProvider( | ||
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider( | ||
this, | ||
CONFIG_PROVIDERS_CONFIG + "." + name, | ||
Plugins.ClassLoaderUsage.PLUGINS | ||
name, | ||
Plugins.ClassLoaderUsage.PLUGINS, | ||
null | ||
); | ||
providers.put(name, configProvider); | ||
providerPlugins.put(name, configProviderPlugin); | ||
} | ||
ConfigTransformer transformer = new ConfigTransformer(providers); | ||
ConfigTransformer transformer = new ConfigTransformer(providerPlugins); | ||
Map<String, String> transformed = transformer.transform(props).data(); | ||
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider")); | ||
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); | ||
return transformed; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guard against null providers and ensure plugins are always closed on error
- Plugins.newConfigProvider may return null (e.g., missing
<name>.class
); adding these to the map risks NPE at transform time. - If transform throws, provider plugins currently aren’t closed, leaking resources.
Apply both a null-guard and a try/finally to guarantee closure.
- Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
- for (String name : providerNames) {
- Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
- this,
- name,
- Plugins.ClassLoaderUsage.PLUGINS,
- null
- );
- providerPlugins.put(name, configProviderPlugin);
- }
- ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
- Map<String, String> transformed = transformer.transform(props).data();
- providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
- return transformed;
+ Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
+ for (String name : providerNames) {
+ Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
+ this,
+ name,
+ Plugins.ClassLoaderUsage.PLUGINS,
+ null
+ );
+ if (configProviderPlugin != null) {
+ providerPlugins.put(name, configProviderPlugin);
+ } else {
+ // Missing config.providers.<name>.class; skip to avoid NPE during transform
+ }
+ }
+ try {
+ ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
+ return transformer.transform(props).data();
+ } finally {
+ providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>(); | |
for (String name : providerNames) { | |
ConfigProvider configProvider = plugins.newConfigProvider( | |
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider( | |
this, | |
CONFIG_PROVIDERS_CONFIG + "." + name, | |
Plugins.ClassLoaderUsage.PLUGINS | |
name, | |
Plugins.ClassLoaderUsage.PLUGINS, | |
null | |
); | |
providers.put(name, configProvider); | |
providerPlugins.put(name, configProviderPlugin); | |
} | |
ConfigTransformer transformer = new ConfigTransformer(providers); | |
ConfigTransformer transformer = new ConfigTransformer(providerPlugins); | |
Map<String, String> transformed = transformer.transform(props).data(); | |
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider")); | |
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); | |
return transformed; | |
Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>(); | |
for (String name : providerNames) { | |
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider( | |
this, | |
name, | |
Plugins.ClassLoaderUsage.PLUGINS, | |
null | |
); | |
if (configProviderPlugin != null) { | |
providerPlugins.put(name, configProviderPlugin); | |
} else { | |
// Missing config.providers.<name>.class; skip to avoid NPE during transform | |
} | |
} | |
try { | |
ConfigTransformer transformer = new ConfigTransformer(providerPlugins); | |
return transformer.transform(props).data(); | |
} finally { | |
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); | |
} |
🤖 Prompt for AI Agents
In
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
around lines 273-286, newConfigProvider can return null and provider plugins
aren’t closed if transform throws; change the code to (1) declare
providerPlugins before creating plugins, (2) wrap the plugin creation loop and
the transform call in a try/finally so all created plugins are closed in the
finally block, and (3) null-guard each Plugin returned from
plugins.newConfigProvider — if it is null throw an informative ConfigException
(or otherwise fail fast) instead of inserting null into the map so transform
cannot later hit an NPE. Ensure the finally closes only the plugins actually
created.
Summary by CodeRabbit