Skip to content

Commit f9ccf83

Browse files
KAFKA-18066: Fix mismatched StreamThread ID in log messages (#19517)
This PR fixes an issue where the thread name shown in log messages did not match the actual execution context. Previously, log entries displayed the context of the newly created thread, while the logger reflected the current executing thread. This mismatch led to confusion and made log tracing more difficult. Changes: - Use logger without context to not have context - Updated log messages to explicitly describe the thread being created - Fixed instances where the log context reflected the current thread instead of the newly created one Reviewers: Matthias J. Sax <[email protected]>
1 parent de2adb6 commit f9ccf83

File tree

5 files changed

+16
-19
lines changed

5 files changed

+16
-19
lines changed

streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class ActiveTaskCreator {
7777
final String threadId,
7878
final int threadIdx,
7979
final UUID processId,
80-
final Logger log,
80+
final LogContext logContext,
8181
final boolean stateUpdaterEnabled,
8282
final boolean processingThreadsEnabled) {
8383
this.topologyMetadata = topologyMetadata;
@@ -91,15 +91,12 @@ class ActiveTaskCreator {
9191
this.threadId = threadId;
9292
this.threadIdx = threadIdx;
9393
this.processId = processId;
94-
this.log = log;
94+
this.log = logContext.logger(getClass());
9595
this.stateUpdaterEnabled = stateUpdaterEnabled;
9696
this.processingThreadsEnabled = processingThreadsEnabled;
9797

9898
createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics);
9999

100-
final String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName());
101-
final LogContext logContext = new LogContext(threadIdPrefix);
102-
103100
streamsProducer = new StreamsProducer(
104101
producer(),
105102
processingMode(applicationConfig),

streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,20 +52,20 @@ class StandbyTaskCreator {
5252
final StateDirectory stateDirectory,
5353
final ChangelogReader storeChangelogReader,
5454
final String threadId,
55-
final Logger log,
55+
final LogContext logContext,
5656
final boolean stateUpdaterEnabled) {
5757
this.topologyMetadata = topologyMetadata;
5858
this.applicationConfig = applicationConfig;
5959
this.streamsMetrics = streamsMetrics;
6060
this.stateDirectory = stateDirectory;
6161
this.storeChangelogReader = storeChangelogReader;
62-
this.log = log;
62+
this.log = logContext.logger(getClass());
6363
this.stateUpdaterEnabled = stateUpdaterEnabled;
6464

6565
createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics);
6666

6767
dummyCache = new ThreadCache(
68-
new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())),
68+
logContext,
6969
0,
7070
streamsMetrics
7171
);

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import org.apache.kafka.streams.state.internals.ThreadCache;
7474

7575
import org.slf4j.Logger;
76+
import org.slf4j.LoggerFactory;
7677

7778
import java.time.Duration;
7879
import java.util.Arrays;
@@ -397,15 +398,15 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
397398
final String logPrefix = String.format("stream-thread [%s] ", threadId);
398399
final LogContext logContext = new LogContext(logPrefix);
399400
final LogContext restorationLogContext = stateUpdaterEnabled ? new LogContext(String.format("state-updater [%s] ", restorationThreadId)) : logContext;
400-
final Logger log = logContext.logger(StreamThread.class);
401+
final Logger log = LoggerFactory.getLogger(StreamThread.class);
401402

402403
final ReferenceContainer referenceContainer = new ReferenceContainer();
403404
referenceContainer.adminClient = adminClient;
404405
referenceContainer.streamsMetadataState = streamsMetadataState;
405406
referenceContainer.time = time;
406407
referenceContainer.clientTags = config.getClientTags();
407408

408-
log.info("Creating restore consumer client");
409+
log.info("Creating restore consumer client for thread {}", threadId);
409410
final Map<String, Object> restoreConsumerConfigs = config.getRestoreConsumerConfigs(restoreConsumerClientId(restorationThreadId));
410411
final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
411412

@@ -434,7 +435,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
434435
threadId,
435436
threadIdx,
436437
processId,
437-
log,
438+
logContext,
438439
stateUpdaterEnabled,
439440
proceessingThreadsEnabled
440441
);
@@ -445,10 +446,10 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
445446
stateDirectory,
446447
changelogReader,
447448
threadId,
448-
log,
449+
logContext,
449450
stateUpdaterEnabled);
450451

451-
final Tasks tasks = new Tasks(new LogContext(logPrefix));
452+
final Tasks tasks = new Tasks(logContext);
452453
final boolean processingThreadsEnabled =
453454
InternalConfig.processingThreadsEnabled(config.originals());
454455

@@ -483,7 +484,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
483484
);
484485
referenceContainer.taskManager = taskManager;
485486

486-
log.info("Creating consumer client");
487+
log.info("Creating consumer client for thread {}", threadId);
487488
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
488489
final Map<String, Object> consumerConfigs = config.getMainConsumerConfigs(applicationId, consumerClientId(threadId), threadIdx);
489490
consumerConfigs.put(StreamsConfig.InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
@@ -494,7 +495,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
494495
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
495496
}
496497

497-
final MainConsumerSetup mainConsumerSetup = setupMainConsumer(topologyMetadata, config, clientSupplier, processId, log, consumerConfigs);
498+
final MainConsumerSetup mainConsumerSetup = setupMainConsumer(topologyMetadata, config, clientSupplier, processId, log, threadId, consumerConfigs);
498499

499500
taskManager.setMainConsumer(mainConsumerSetup.mainConsumer);
500501
referenceContainer.mainConsumer = mainConsumerSetup.mainConsumer;
@@ -535,6 +536,7 @@ private static MainConsumerSetup setupMainConsumer(final TopologyMetadata topolo
535536
final KafkaClientSupplier clientSupplier,
536537
final UUID processId,
537538
final Logger log,
539+
final String threadId,
538540
final Map<String, Object> consumerConfigs) {
539541
if (config.getString(StreamsConfig.GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.STREAMS.name)) {
540542
if (topologyMetadata.hasNamedTopologies()) {

streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ private void createTasks() {
279279
"clientId-StreamThread-0",
280280
0,
281281
uuid,
282-
new LogContext().logger(ActiveTaskCreator.class),
282+
new LogContext(),
283283
false,
284284
false);
285285

streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@
117117
import org.mockito.junit.jupiter.MockitoExtension;
118118
import org.mockito.junit.jupiter.MockitoSettings;
119119
import org.mockito.quality.Strictness;
120-
import org.slf4j.Logger;
121120

122121
import java.io.File;
123122
import java.io.IOException;
@@ -4139,7 +4138,6 @@ private void setupInternalTopologyWithoutState(final StreamsConfig config) {
41394138
// TODO: change return type to `StandbyTask`
41404139
private Collection<Task> createStandbyTask(final StreamsConfig config) {
41414140
final LogContext logContext = new LogContext("test");
4142-
final Logger log = logContext.logger(StreamThreadTest.class);
41434141
final StreamsMetricsImpl streamsMetrics =
41444142
new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime);
41454143
final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(
@@ -4149,7 +4147,7 @@ private Collection<Task> createStandbyTask(final StreamsConfig config) {
41494147
stateDirectory,
41504148
new MockChangelogReader(),
41514149
CLIENT_ID,
4152-
log,
4150+
logContext,
41534151
false);
41544152
return standbyTaskCreator.createTasks(singletonMap(new TaskId(1, 2), emptySet()));
41554153
}

0 commit comments

Comments
 (0)