Skip to content

Commit bf42924

Browse files
authored
KAFKA-19572: Added check to prevent NPE logs during ShareConsumer::close (#20290)
*What* https://issues.apache.org/jira/browse/KAFKA-19572 - If a `ShareConsumer` constructor failed due to any exception, then we call `close()` in the catch block. - If there were uninitialized members accessed during `close()`, then it would throw a NPE. Currently there are no null checks, hence we were attempting to use these fields during `close()` execution. - To avoid this, PR adds null checks in the `close()` function before we access fields which possibly could be null. Reviewers: Apoorv Mittal <[email protected]>, Lianet Magrans <[email protected]>
1 parent 9d5dd07 commit bf42924

File tree

2 files changed

+20
-0
lines changed

2 files changed

+20
-0
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -928,6 +928,9 @@ private void close(final Duration timeout, final boolean swallowException) {
928928
}
929929

930930
private void stopFindCoordinatorOnClose() {
931+
if (applicationEventHandler == null) {
932+
return;
933+
}
931934
log.debug("Stop finding coordinator during consumer close");
932935
applicationEventHandler.add(new StopFindCoordinatorOnCloseEvent());
933936
}
@@ -944,6 +947,10 @@ private Timer createTimerForCloseRequests(Duration timeout) {
944947
* 2. leave the group
945948
*/
946949
private void sendAcknowledgementsAndLeaveGroup(final Timer timer, final AtomicReference<Throwable> firstException) {
950+
if (applicationEventHandler == null || backgroundEventProcessor == null ||
951+
backgroundEventReaper == null || backgroundEventQueue == null) {
952+
return;
953+
}
947954
completeQuietly(
948955
() -> applicationEventHandler.addAndGet(new ShareAcknowledgeOnCloseEvent(acknowledgementsToSend(), calculateDeadlineMs(timer))),
949956
"Failed to send pending acknowledgements with a timeout(ms)=" + timer.timeoutMs(), firstException);
@@ -1035,6 +1042,9 @@ private void maybeThrowInvalidGroupIdException() {
10351042
* If the acknowledgement commit callback throws an exception, this method will throw an exception.
10361043
*/
10371044
private void handleCompletedAcknowledgements(boolean onClose) {
1045+
if (backgroundEventQueue == null || backgroundEventReaper == null || backgroundEventProcessor == null) {
1046+
return;
1047+
}
10381048
// If the user gets any fatal errors, they will get these exceptions in the background queue.
10391049
// While closing, we ignore these exceptions so that the consumers close successfully.
10401050
processBackgroundEvents(onClose ? e -> (e instanceof GroupAuthorizationException

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.kafka.common.metrics.Metrics;
4444
import org.apache.kafka.common.protocol.Errors;
4545
import org.apache.kafka.common.serialization.StringDeserializer;
46+
import org.apache.kafka.common.utils.LogCaptureAppender;
4647
import org.apache.kafka.common.utils.LogContext;
4748
import org.apache.kafka.common.utils.MockTime;
4849
import org.apache.kafka.common.utils.Time;
@@ -77,6 +78,7 @@
7778
import static java.util.Collections.singletonList;
7879
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
7980
import static org.junit.jupiter.api.Assertions.assertEquals;
81+
import static org.junit.jupiter.api.Assertions.assertFalse;
8082
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
8183
import static org.junit.jupiter.api.Assertions.assertNull;
8284
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -207,11 +209,19 @@ public void testFailConstructor() {
207209
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
208210
props.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "an.invalid.class");
209211
final ConsumerConfig config = new ConsumerConfig(props);
212+
213+
LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
210214
KafkaException ce = assertThrows(
211215
KafkaException.class,
212216
() -> newConsumer(config));
213217
assertTrue(ce.getMessage().contains("Failed to construct Kafka share consumer"), "Unexpected exception message: " + ce.getMessage());
214218
assertTrue(ce.getCause().getMessage().contains("Class an.invalid.class cannot be found"), "Unexpected cause: " + ce.getCause());
219+
220+
boolean npeLogged = appender.getEvents().stream()
221+
.flatMap(event -> event.getThrowableInfo().stream())
222+
.anyMatch(str -> str.contains("NullPointerException"));
223+
224+
assertFalse(npeLogged, "Unexpected NullPointerException during consumer construction");
215225
}
216226

217227
@Test

0 commit comments

Comments
 (0)