Skip to content

Commit f988bd8

Browse files
Fix snappy related Kafka errors and relax thread leak checks
Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent 6fb0c1b commit f988bd8

File tree

3 files changed

+33
-26
lines changed

3 files changed

+33
-26
lines changed

plugins/ingestion-kafka/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@ versions << [
2121
'docker': '3.3.6',
2222
'testcontainers': '1.19.7',
2323
'ducttape': '1.0.8',
24+
'snappy': '1.1.10.7',
2425
]
2526

2627
dependencies {
2728
// kafka
2829
api "org.slf4j:slf4j-api:${versions.slf4j}"
2930
api "org.apache.kafka:kafka-clients:${versions.kafka}"
31+
api "org.xerial.snappy:snappy-java:${versions.snappy}"
3032

3133
// test
3234
testImplementation "com.github.docker-java:docker-java-api:${versions.docker}"

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
package org.opensearch.plugin.kafka;
1010

11-
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
11+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
1212

1313
import org.apache.kafka.clients.producer.KafkaProducer;
1414
import org.apache.kafka.clients.producer.Producer;
@@ -45,7 +45,7 @@
4545
/**
4646
* Integration test for Kafka ingestion
4747
*/
48-
@ThreadLeakLingering(linger = 15000) // wait for container pull thread to die
48+
@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
4949
public class IngestFromKafkaIT extends OpenSearchIntegTestCase {
5050
static final String topicName = "test";
5151

@@ -75,29 +75,31 @@ public void testPluginsAreInstalled() {
7575
}
7676

7777
public void testKafkaIngestion() {
78-
setupKafka();
79-
// create an index with ingestion source from kafka
80-
createIndex(
81-
"test",
82-
Settings.builder()
83-
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
84-
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
85-
.put("ingestion_source.type", "kafka")
86-
.put("ingestion_source.pointer.init.reset", "earliest")
87-
.put("ingestion_source.param.topic", "test")
88-
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
89-
.build(),
90-
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
91-
);
92-
93-
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21);
94-
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
95-
refresh("test");
96-
SearchResponse response = client().prepareSearch("test").setQuery(query).get();
97-
assertThat(response.getHits().getTotalHits().value(), is(1L));
98-
});
99-
100-
stopKafka();
78+
try {
79+
setupKafka();
80+
// create an index with ingestion source from kafka
81+
createIndex(
82+
"test",
83+
Settings.builder()
84+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
85+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
86+
.put("ingestion_source.type", "kafka")
87+
.put("ingestion_source.pointer.init.reset", "earliest")
88+
.put("ingestion_source.param.topic", "test")
89+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
90+
.build(),
91+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
92+
);
93+
94+
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21);
95+
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
96+
refresh("test");
97+
SearchResponse response = client().prepareSearch("test").setQuery(query).get();
98+
assertThat(response.getHits().getTotalHits().value(), is(1L));
99+
});
100+
} finally {
101+
stopKafka();
102+
}
101103
}
102104

103105
private void setupKafka() {

plugins/ingestion-kafka/src/main/plugin-metadata/plugin-security.policy

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,8 @@ grant {
1717
// Allow host/ip name service lookups
1818
permission java.net.SocketPermission "*", "connect";
1919
permission java.net.SocketPermission "*", "resolve";
20-
};
2120

21+
// Needed for Kafka consumer to load native snappy library
22+
permission java.lang.RuntimePermission "loadLibrary.*";
23+
permission java.io.FilePermission "${java.io.tmpdir}${/}snappy-*", "read,write";
24+
};

0 commit comments

Comments
 (0)