Skip to content

Commit 5c15eb5

Browse files
Support custom kafka consumer configs
Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent 8ee5eeb commit 5c15eb5

File tree

4 files changed

+25
-7
lines changed

4 files changed

+25
-7
lines changed

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ public void testSegmentReplicationWithRemoteStore() throws Exception {
5757
.put("ingestion_source.param.topic", topicName)
5858
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
5959
.put("index.replication.type", "SEGMENT")
60+
// set custom kafka consumer properties
61+
.put("ingestion_source.param.fetch.min.bytes", 30000)
62+
.put("ingestion_source.param.enable.auto.commit", false)
6063
.build(),
6164
mapping
6265
);

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
package org.opensearch.plugin.kafka;
1010

1111
import org.apache.kafka.clients.consumer.Consumer;
12-
import org.apache.kafka.clients.consumer.ConsumerConfig;
1312
import org.apache.kafka.clients.consumer.ConsumerRecord;
1413
import org.apache.kafka.clients.consumer.ConsumerRecords;
1514
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -99,9 +98,10 @@ protected static Consumer<byte[], byte[]> createConsumer(String clientId, KafkaS
9998
Properties consumerProp = new Properties();
10099
consumerProp.put("bootstrap.servers", config.getBootstrapServers());
101100
consumerProp.put("client.id", clientId);
102-
if (config.getAutoOffsetResetConfig() != null && !config.getAutoOffsetResetConfig().isEmpty()) {
103-
consumerProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, config.getAutoOffsetResetConfig());
104-
}
101+
102+
logger.info("Kafka consumer properties for topic {}: {}", config.getTopic(), config.getConsumerConfigurations());
103+
consumerProp.putAll(config.getConsumerConfigurations());
104+
105105
// TODO: why Class org.apache.kafka.common.serialization.StringDeserializer could not be found if set the deserializer as prop?
106106
// consumerProp.put("key.deserializer",
107107
// "org.apache.kafka.common.serialization.StringDeserializer");

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaSourceConfig.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import org.opensearch.core.util.ConfigurationUtils;
1212

13+
import java.util.HashMap;
1314
import java.util.Map;
1415

1516
/**
@@ -18,21 +19,27 @@
1819
public class KafkaSourceConfig {
1920
private final String PROP_TOPIC = "topic";
2021
private final String PROP_BOOTSTRAP_SERVERS = "bootstrap_servers";
21-
// TODO: support pass any generic kafka configs
2222
private final String PROP_AUTO_OFFSET_RESET = "auto.offset.reset";
2323

2424
private final String topic;
2525
private final String bootstrapServers;
2626
private final String autoOffsetResetConfig;
2727

28+
private final Map<String, Object> consumerConfigsMap;
29+
2830
/**
29-
* Constructor
31+
* Extracts and look for required and optional kafka consumer configurations.
3032
* @param params the configuration parameters
3133
*/
3234
public KafkaSourceConfig(Map<String, Object> params) {
3335
this.topic = ConfigurationUtils.readStringProperty(params, PROP_TOPIC);
3436
this.bootstrapServers = ConfigurationUtils.readStringProperty(params, PROP_BOOTSTRAP_SERVERS);
3537
this.autoOffsetResetConfig = ConfigurationUtils.readOptionalStringProperty(params, PROP_AUTO_OFFSET_RESET);
38+
this.consumerConfigsMap = new HashMap<>(params);
39+
40+
// remove above configurations
41+
consumerConfigsMap.remove(PROP_TOPIC);
42+
consumerConfigsMap.remove(PROP_BOOTSTRAP_SERVERS);
3643
}
3744

3845
/**
@@ -60,4 +67,8 @@ public String getBootstrapServers() {
6067
public String getAutoOffsetResetConfig() {
6168
return autoOffsetResetConfig;
6269
}
70+
71+
public Map<String, Object> getConsumerConfigurations() {
72+
return consumerConfigsMap;
73+
}
6374
}

plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaSourceConfigTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616

1717
public class KafkaSourceConfigTests extends OpenSearchTestCase {
1818

19-
public void testConstructorAndGetters() {
19+
public void testKafkaSourceConfig() {
2020
Map<String, Object> params = new HashMap<>();
2121
params.put("topic", "topic");
2222
params.put("bootstrap_servers", "bootstrap");
23+
params.put("fetch.min.bytes", 30000);
24+
params.put("enable.auto.commit", false);
2325

2426
KafkaSourceConfig config = new KafkaSourceConfig(params);
2527

@@ -29,5 +31,7 @@ public void testConstructorAndGetters() {
2931
"bootstrap",
3032
config.getBootstrapServers()
3133
);
34+
Assert.assertEquals("Incorrect fetch.min.bytes", 30000, config.getConsumerConfigurations().get("fetch.min.bytes"));
35+
Assert.assertEquals("Incorrect enable.auto.commit", false, config.getConsumerConfigurations().get("enable.auto.commit"));
3236
}
3337
}

0 commit comments

Comments
 (0)