Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.utils.Utils;

import org.slf4j.Logger;
Expand Down Expand Up @@ -546,16 +547,16 @@ private Map<String, String> extractPotentialVariables(Map<?, ?> configMap) {
configProperties = configProviderProps;
classNameFilter = ignored -> true;
}
Map<String, ConfigProvider> providers = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);

if (!providers.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providers);
if (!providerPlugins.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
ConfigTransformerResult result = configTransformer.transform(indirectVariables);
if (!result.data().isEmpty()) {
resolvedOriginals.putAll(result.data());
}
}
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plugin Closure Risk

The code attempts to close ConfigProvider instances directly, but now needs to close Plugin instances. This mismatch could cause resource leaks when providers aren't properly closed.

Suggested change
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x.get(), "config provider"));
Standards
  • ISO-IEC-25010-Reliability-Recoverability
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Resource-Mgmt
  • SRE-Resource-Management

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource Leak Risk in Multiple Components

The code attempts to close Plugin wrappers instead of the actual ConfigProvider instances they contain. This prevents proper resource cleanup of the underlying providers, potentially causing resource leaks in production environments.

Suggested change
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x.get(), "config provider"));
Standards
  • CWE-772
  • OWASP-A06


return new ResolvingMap<>(resolvedOriginals, originals);
}
Expand Down Expand Up @@ -594,7 +595,7 @@ private Map<String, Object> configProviderProperties(String configProviderPrefix
* @param classNameFilter Filter for config provider class names
* @return map of config provider name and its instance.
*/
private Map<String, ConfigProvider> instantiateConfigProviders(
private Map<String, Plugin<ConfigProvider>> instantiateConfigProviders(
Map<String, String> indirectConfigs,
Map<String, ?> providerConfigProperties,
Predicate<String> classNameFilter
Expand All @@ -620,21 +621,22 @@ private Map<String, ConfigProvider> instantiateConfigProviders(
}
}
// Instantiate Config Providers
Map<String, ConfigProvider> configProviderInstances = new HashMap<>();
Map<String, Plugin<ConfigProvider>> configProviderPluginInstances = new HashMap<>();
for (Map.Entry<String, String> entry : providerMap.entrySet()) {
try {
String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + CONFIG_PROVIDERS_PARAM;
Map<String, ?> configProperties = configProviderProperties(prefix, providerConfigProperties);
ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);
provider.configure(configProperties);
configProviderInstances.put(entry.getKey(), provider);
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
configProviderPluginInstances.put(entry.getKey(), providerPlugin);
Comment on lines +631 to +632
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inefficient Map Creation

Creating a provider map without specifying initial capacity causes rehashing overhead. With multiple providers, this leads to unnecessary memory allocations and CPU overhead.

Suggested change
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
configProviderPluginInstances.put(entry.getKey(), providerPlugin);
Map<String, Plugin<ConfigProvider>> configProviderPluginInstances = new HashMap<>(providerMap.size());
for (Map.Entry<String, String> entry : providerMap.entrySet()) {
Standards
  • ISO-IEC-25010-Performance-Resource-Utilization
  • Algorithm-Opt-Memory-Allocation
  • Google-Performance-Best-Practices

} catch (ClassNotFoundException e) {
log.error("Could not load config provider class {}", entry.getValue(), e);
throw new ConfigException(providerClassProperty(entry.getKey()), entry.getValue(), "Could not load config provider class or one of its dependencies");
}
}

return configProviderInstances;
return configProviderPluginInstances;
}

private static String providerClassProperty(String providerName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.config.provider.FileConfigProvider;
import org.apache.kafka.common.internals.Plugin;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -56,15 +57,15 @@ public class ConfigTransformer {
public static final Pattern DEFAULT_PATTERN = Pattern.compile("\\$\\{([^}]*?):(([^}]*?):)?([^}]*?)\\}");
private static final String EMPTY_PATH = "";

private final Map<String, ConfigProvider> configProviders;
private final Map<String, Plugin<ConfigProvider>> configProviderPlugins;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Abstraction Leakage

Implementation detail (Plugin wrapper) leaks into ConfigTransformer API. This creates unnecessary coupling between ConfigTransformer and Plugin implementation, making future changes to plugin architecture more difficult.

Suggested change
private final Map<String, Plugin<ConfigProvider>> configProviderPlugins;
private final Map<String, ConfigProvider> configProviders;
public ConfigTransformer(Map<String, Plugin<ConfigProvider>> configProviderPlugins) {
Map<String, ConfigProvider> providers = new HashMap<>();
configProviderPlugins.forEach((name, plugin) -> providers.put(name, plugin.get()));
this.configProviders = providers;
}
Standards
  • SOLID-DIP
  • Clean-Code-Abstraction
  • Design-Pattern-Facade

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dependency Inversion Violation

Direct replacement of ConfigProvider with Plugin creates tight coupling to Plugin implementation. This violates Dependency Inversion Principle by forcing ConfigTransformer to know about Plugin internals rather than depending on abstractions.

Suggested change
private final Map<String, Plugin<ConfigProvider>> configProviderPlugins;
private final Map<String, ConfigProviderWrapper> configProviders;
/**
* Interface to abstract access to ConfigProvider implementations
*/
private interface ConfigProviderWrapper {
ConfigData get(String path, Set<String> keys);
ConfigData get(String path);
void close();
}
Standards
  • SOLID-DIP
  • Clean-Code-Abstraction


/**
* Creates a ConfigTransformer with the default pattern, of the form <code>${provider:[path:]key}</code>.
*
* @param configProviders a Map of provider names and {@link ConfigProvider} instances.
* @param configProviderPlugins a Map of provider names and {@link ConfigProvider} instances.
*/
public ConfigTransformer(Map<String, ConfigProvider> configProviders) {
this.configProviders = configProviders;
public ConfigTransformer(Map<String, Plugin<ConfigProvider>> configProviderPlugins) {
this.configProviderPlugins = configProviderPlugins;
}

/**
Expand Down Expand Up @@ -94,13 +95,13 @@ public ConfigTransformerResult transform(Map<String, String> configs) {
Map<String, Long> ttls = new HashMap<>();
for (Map.Entry<String, Map<String, Set<String>>> entry : keysByProvider.entrySet()) {
String providerName = entry.getKey();
ConfigProvider provider = configProviders.get(providerName);
Plugin<ConfigProvider> providerPlugin = configProviderPlugins.get(providerName);
Map<String, Set<String>> keysByPath = entry.getValue();
if (provider != null && keysByPath != null) {
if (providerPlugin != null && keysByPath != null) {
for (Map.Entry<String, Set<String>> pathWithKeys : keysByPath.entrySet()) {
String path = pathWithKeys.getKey();
Set<String> keys = new HashSet<>(pathWithKeys.getValue());
ConfigData configData = provider.get(path, keys);
ConfigData configData = providerPlugin.get().get(path, keys);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Null Check

No null check on providerPlugin.get() before invoking get(path, keys). If the plugin returns null, this will cause a NullPointerException and service disruption.

Suggested change
ConfigData configData = providerPlugin.get().get(path, keys);
ConfigProvider provider = providerPlugin.get();
if (provider == null) {
throw new ConfigException("ConfigProvider plugin returned null provider for " + providerName);
}
ConfigData configData = provider.get(path, keys);
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Precondition
  • SRE-Defensive-Programming

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception Handling Missing in Config Retrieval

The providerPlugin.get() call may throw an exception, but there's no exception handling. If an exception occurs during retrieval, it could lead to unhandled failures and service unavailability.

Suggested change
ConfigData configData = providerPlugin.get().get(path, keys);
ConfigData configData;
try {
configData = providerPlugin.get().get(path, keys);
} catch (Exception e) {
throw new ConfigException("Failed to retrieve configuration from provider '" + providerName + "'", e);
}
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • SRE-Error-Handling

Map<String, String> data = configData.data();
Long ttl = configData.ttl();
if (ttl != null && ttl >= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
* <p>Kafka Connect discovers implementations of this interface using the Java {@link java.util.ServiceLoader} mechanism.
* To support this, implementations of this interface should also contain a service provider configuration file in
* {@code META-INF/services/org.apache.kafka.common.config.provider.ConfigProvider}.
* <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the config provider to register metrics.
* The following tags are automatically added to all metrics registered: <code>config</code> set to
* <code>config.providers</code>, <code>class</code> set to the ConfigProvider class name,
* and <code>provider</code> set to the provider name.
*/
public interface ConfigProvider extends Configurable, Closeable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.config;

import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.internals.Plugin;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -45,12 +46,12 @@ public class ConfigTransformerTest {

@BeforeEach
public void setup() {
configTransformer = new ConfigTransformer(Collections.singletonMap("test", new TestConfigProvider()));
configTransformer = new ConfigTransformer(Map.of("test", Plugin.wrapInstance(new TestConfigProvider(), null, "config.providers")));
}

@Test
public void testReplaceVariable() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testKey}"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testKey}"));
Map<String, String> data = result.data();
Map<String, Long> ttls = result.ttls();
assertEquals(TEST_RESULT, data.get(MY_KEY));
Expand All @@ -59,7 +60,7 @@ public void testReplaceVariable() {

@Test
public void testReplaceVariableWithTTL() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testKeyWithTTL}"));
Map<String, String> data = result.data();
Map<String, Long> ttls = result.ttls();
assertEquals(TEST_RESULT_WITH_TTL, data.get(MY_KEY));
Expand All @@ -68,28 +69,28 @@ public void testReplaceVariableWithTTL() {

@Test
public void testReplaceMultipleVariablesInValue() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "hello, ${test:testPath:testKey}; goodbye, ${test:testPath:testKeyWithTTL}!!!"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "hello, ${test:testPath:testKey}; goodbye, ${test:testPath:testKeyWithTTL}!!!"));
Map<String, String> data = result.data();
assertEquals("hello, testResult; goodbye, testResultWithTTL!!!", data.get(MY_KEY));
}

@Test
public void testNoReplacement() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:missingKey}"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:missingKey}"));
Map<String, String> data = result.data();
assertEquals("${test:testPath:missingKey}", data.get(MY_KEY));
}

@Test
public void testSingleLevelOfIndirection() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testIndirection}"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testIndirection}"));
Map<String, String> data = result.data();
assertEquals("${test:testPath:testResult}", data.get(MY_KEY));
}

@Test
public void testReplaceVariableNoPath() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testKey}"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testKey}"));
Map<String, String> data = result.data();
Map<String, Long> ttls = result.ttls();
assertEquals(TEST_RESULT_NO_PATH, data.get(MY_KEY));
Expand All @@ -98,7 +99,7 @@ public void testReplaceVariableNoPath() {

@Test
public void testReplaceMultipleVariablesWithoutPathInValue() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "first ${test:testKey}; second ${test:testKey}"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "first ${test:testKey}; second ${test:testKey}"));
Map<String, String> data = result.data();
assertEquals("first testResultNoPath; second testResultNoPath", data.get(MY_KEY));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.config.provider;

import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigData;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.Monitorable;
import org.apache.kafka.common.metrics.PluginMetrics;

import java.io.IOException;
import java.util.Map;
import java.util.Set;

public class MonitorableConfigProvider implements ConfigProvider, Monitorable {
public static final String NAME = "name";
public static final String DESCRIPTION = "description";
protected boolean configured = false;

@Override
public void withPluginMetrics(PluginMetrics metrics) {
MetricName metricName = metrics.metricName(NAME, DESCRIPTION, Map.of());
metrics.addMetric(metricName, (Measurable) (config, now) -> 123);
}

@Override
public ConfigData get(String path) {
return null;
}

@Override
public ConfigData get(String path, Set<String> keys) {
return null;
}

@Override
public void close() throws IOException {
}

@Override
public void configure(Map<String, ?> configs) {
configured = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigTransformer;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.WorkerConfig;
Expand Down Expand Up @@ -269,18 +270,19 @@ List<String> configProviders() {
Map<String, String> transform(Map<String, String> props) {
// transform worker config according to config.providers
List<String> providerNames = configProviders();
Map<String, ConfigProvider> providers = new HashMap<>();
Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
for (String name : providerNames) {
ConfigProvider configProvider = plugins.newConfigProvider(
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
this,
CONFIG_PROVIDERS_CONFIG + "." + name,
Plugins.ClassLoaderUsage.PLUGINS
name,
Plugins.ClassLoaderUsage.PLUGINS,
null
);
providers.put(name, configProvider);
providerPlugins.put(name, configProviderPlugin);
}
ConfigTransformer transformer = new ConfigTransformer(providers);
ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
Map<String, String> transformed = transformer.transform(props).data();
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
return transformed;
Comment on lines +273 to 286
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Guard against null providers and ensure plugins are always closed on error

  • Plugins.newConfigProvider may return null (e.g., missing <name>.class); adding these to the map risks NPE at transform time.
  • If transform throws, provider plugins currently aren’t closed, leaking resources.

Apply both a null-guard and a try/finally to guarantee closure.

-        Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
-        for (String name : providerNames) {
-            Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
-                    this,
-                    name,
-                    Plugins.ClassLoaderUsage.PLUGINS,
-                    null
-            );
-            providerPlugins.put(name, configProviderPlugin);
-        }
-        ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
-        Map<String, String> transformed = transformer.transform(props).data();
-        providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
-        return transformed;
+        Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
+        for (String name : providerNames) {
+            Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
+                    this,
+                    name,
+                    Plugins.ClassLoaderUsage.PLUGINS,
+                    null
+            );
+            if (configProviderPlugin != null) {
+                providerPlugins.put(name, configProviderPlugin);
+            } else {
+                // Missing config.providers.<name>.class; skip to avoid NPE during transform
+            }
+        }
+        try {
+            ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
+            return transformer.transform(props).data();
+        } finally {
+            providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
for (String name : providerNames) {
ConfigProvider configProvider = plugins.newConfigProvider(
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
this,
CONFIG_PROVIDERS_CONFIG + "." + name,
Plugins.ClassLoaderUsage.PLUGINS
name,
Plugins.ClassLoaderUsage.PLUGINS,
null
);
providers.put(name, configProvider);
providerPlugins.put(name, configProviderPlugin);
}
ConfigTransformer transformer = new ConfigTransformer(providers);
ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
Map<String, String> transformed = transformer.transform(props).data();
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
return transformed;
Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
for (String name : providerNames) {
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
this,
name,
Plugins.ClassLoaderUsage.PLUGINS,
null
);
if (configProviderPlugin != null) {
providerPlugins.put(name, configProviderPlugin);
} else {
// Missing config.providers.<name>.class; skip to avoid NPE during transform
}
}
try {
ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
return transformer.transform(props).data();
} finally {
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
}
🤖 Prompt for AI Agents
In
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
around lines 273-286, newConfigProvider can return null and provider plugins
aren’t closed if transform throws; change the code to (1) declare
providerPlugins before creating plugins, (2) wrap the plugin creation loop and
the transform call in a try/finally so all created plugins are closed in the
finally block, and (3) null-guard each Plugin returned from
plugins.newConfigProvider — if it is null throw an informative ConfigException
(or otherwise fail fast) instead of inserting null into the map so transform
cannot later hit an NPE. Ensure the finally closes only the plugins actually
created.

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,17 @@ public Worker(

private WorkerConfigTransformer initConfigTransformer() {
final List<String> providerNames = config.getList(WorkerConfig.CONFIG_PROVIDERS_CONFIG);
Map<String, ConfigProvider> providerMap = new HashMap<>();
Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>();
for (String providerName : providerNames) {
ConfigProvider configProvider = plugins.newConfigProvider(
config,
WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName,
ClassLoaderUsage.PLUGINS
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
config,
providerName,
ClassLoaderUsage.PLUGINS,
metrics.metrics()
);
Comment on lines +214 to 219
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource Leak Risk

The plugin instance is created but not properly closed if an exception occurs during initialization. If any provider fails to initialize, previous successfully initialized providers won't be closed, causing resource leaks.

Suggested change
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
config,
providerName,
ClassLoaderUsage.PLUGINS,
metrics.metrics()
);
Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>();
List<Plugin<ConfigProvider>> initializedPlugins = new ArrayList<>();
try {
for (String providerName : providerNames) {
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
config,
providerName,
ClassLoaderUsage.PLUGINS,
metrics.metrics()
);
initializedPlugins.add(configProviderPlugin);
providerPluginMap.put(providerName, configProviderPlugin);
}
return new WorkerConfigTransformer(this, providerPluginMap);
} catch (Exception e) {
initializedPlugins.forEach(plugin -> Utils.closeQuietly(plugin, "config provider plugin"));
throw e;
}
Standards
  • Resource-Management-Completeness
  • Logic-Verification-Error-Handling
  • Algorithm-Correctness-Exception-Safety

providerMap.put(providerName, configProvider);
providerPluginMap.put(providerName, configProviderPlugin);
}
return new WorkerConfigTransformer(this, providerMap);
return new WorkerConfigTransformer(this, providerPluginMap);
}

public WorkerConfigTransformer configTransformer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.config.ConfigTransformer;
import org.apache.kafka.common.config.ConfigTransformerResult;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.Herder.ConfigReloadAction;
import org.apache.kafka.connect.util.Callback;
Expand All @@ -42,12 +43,12 @@ public class WorkerConfigTransformer implements AutoCloseable {
private final Worker worker;
private final ConfigTransformer configTransformer;
private final ConcurrentMap<String, Map<String, HerderRequest>> requests = new ConcurrentHashMap<>();
private final Map<String, ConfigProvider> configProviders;
private final Map<String, Plugin<ConfigProvider>> configProviderPlugins;

public WorkerConfigTransformer(Worker worker, Map<String, ConfigProvider> configProviders) {
public WorkerConfigTransformer(Worker worker, Map<String, Plugin<ConfigProvider>> configProviderPlugins) {
this.worker = worker;
this.configProviders = configProviders;
this.configTransformer = new ConfigTransformer(configProviders);
this.configProviderPlugins = configProviderPlugins;
this.configTransformer = new ConfigTransformer(configProviderPlugins);
}

public Map<String, String> transform(Map<String, String> configs) {
Expand Down Expand Up @@ -97,6 +98,6 @@ private void scheduleReload(String connectorName, String path, long ttl) {

@Override
public void close() {
configProviders.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent Provider Closing

The code closes Plugin objects directly instead of accessing the underlying ConfigProvider instances. This inconsistency with other fixes could lead to resource leaks if Plugin.get() isn't called.

Suggested change
configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x.get(), "config provider"));
Standards
  • ISO-IEC-25010-Reliability-Recoverability
  • ISO-IEC-25010-Functional-Correctness-Consistency
  • DbC-Resource-Mgmt
  • SRE-Resource-Management

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource Leak Risk in WorkerConfigTransformer

Similar to the AbstractConfig issue, this code closes Plugin wrappers instead of the actual ConfigProvider instances. This prevents proper resource cleanup, potentially causing resource leaks in production environments.

Suggested change
configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x.get(), "config provider"));
Standards
  • CWE-772
  • OWASP-A06

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.Connector;
Expand Down Expand Up @@ -627,7 +629,8 @@ private <U> U newVersionedPlugin(
return plugin;
}

public ConfigProvider newConfigProvider(AbstractConfig config, String providerPrefix, ClassLoaderUsage classLoaderUsage) {
public Plugin<ConfigProvider> newConfigProvider(AbstractConfig config, String providerName, ClassLoaderUsage classLoaderUsage, Metrics metrics) {
String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName;
String classPropertyName = providerPrefix + ".class";
Map<String, String> originalConfig = config.originalsStrings();
if (!originalConfig.containsKey(classPropertyName)) {
Expand All @@ -643,7 +646,7 @@ public ConfigProvider newConfigProvider(AbstractConfig config, String providerPr
try (LoaderSwap loaderSwap = safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) {
plugin.configure(configProviderConfig);
}
return plugin;
return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource Leak Risk

ConfigProvider resource is wrapped but not closed if configuration fails. If configure() throws exception, the plugin instance leaks resources as close() is never called.

Suggested change
return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
try {
plugin.configure(configProviderConfig);
return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
} catch (Exception e) {
Utils.closeQuietly(plugin, "config provider");
throw e;
}
Standards
  • CWE-772
  • OWASP-A06

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Null Check Missing in Plugin Instantiation

The plugin instance is wrapped without checking if it's null. This could lead to NullPointerException if plugin instantiation failed but didn't throw an exception.

Suggested change
return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
if (plugin == null) {
throw new ConnectException("Failed to instantiate config provider for " + providerName);
}
return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • SRE-Defensive-Programming

}

/**
Expand Down
Loading
Loading