Skip to content

KAFKA-17876/ KAFKA-19150 Rename AssignmentsManager and RemoteStorageThreadPool metrics #20265

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 17 additions & 20 deletions docs/upgrade.html
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ <h5><a id="upgrade_420_notable" href="#upgrade_420_notable">Notable changes in 4
<li>
The <code>PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG</code> in <code>ProducerConfig</code> was deprecated and will be removed in Kafka 5.0. Please use the <code>PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE_CONFIG</code> instead.
</li>
<li>
The metrics <code>org.apache.kafka.server:type=AssignmentsManager.QueuedReplicaToDirAssignments</code>,
<code>org.apache.kafka.storage.internals.log:type=RemoteStorageThreadPool.RemoteLogReaderTaskQueueSize</code>, and
<code>org.apache.kafka.storage.internals.log:type=RemoteStorageThreadPool.RemoteLogReaderAvgIdlePercent</code>
have been deprecated and will be removed in Kafka 5.0.

As replacements, the following metrics have been introduced, which report the same information:
<code>kafka.server:type=AssignmentsManager.QueuedReplicaToDirAssignments</code>,
<code>kafka.log.remote:type=RemoteStorageThreadPool.RemoteLogReaderTaskQueueSize</code>, and
<code>kafka.log.remote:type=RemoteStorageThreadPool.RemoteLogReaderAvgIdlePercent</code>.
For further details, please refer to <a href="https://cwiki.apache.org/confluence/x/6oqMEw">KIP-1100</a>.
</li>
</ul>

<h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>
Expand All @@ -60,9 +72,6 @@ <h5><a id="upgrade_410_notable" href="#upgrade_410_notable">Notable changes in 4
The logger class name for LogCleaner has been updated from <code>kafka.log.LogCleaner</code> to <code>org.apache.kafka.storage.internals.log.LogCleaner</code> in the log4j2.yaml configuration file.
Added loggers for <code>org.apache.kafka.storage.internals.log.LogCleaner$CleanerThread</code> and <code>org.apache.kafka.storage.internals.log.Cleaner</code> classes to CleanerAppender.
</li>
<li>
The filename for rotated <code>state-change.log</code> files has been updated from <code>stage-change.log.[date]</code> to <code>state-change.log.[date]</code> in the log4j2.yaml configuration file.
</li>
</ul>
</li>
<li><b>Broker</b>
Expand All @@ -74,8 +83,6 @@ <h5><a id="upgrade_410_notable" href="#upgrade_410_notable">Notable changes in 4
</li>
<li>
The KIP-966 part 1: Eligible Leader Replicas(ELR) will be enabled by default on the new clusters.
After the ELR feature enabled, the previously set <code>min.insync.replicas</code> value at the broker-level config will be removed.
Please set at the cluster-level if necessary.
For further details, please refer to <a href="/{{version}}/documentation.html#eligible_leader_replicas">here</a>.
</li>
</ul>
Expand Down Expand Up @@ -110,9 +117,9 @@ <h5><a id="upgrade_410_notable" href="#upgrade_410_notable">Notable changes in 4
</li>
</ul>

<h4><a id="upgrade_4_0_1_from" href="#upgrade_4_0_1_from">Upgrading to 4.0.1</a></h4>
<h4><a id="upgrade_4_0_0_from" href="#upgrade_4_0_0_from">Upgrading to 4.0.0</a></h4>

<h5><a id="upgrade_clients_4_0_1" href="#upgrade_clients_4_0_1">Upgrading Clients to 4.0.1</a></h5>
<h5><a id="upgrade_clients_4_0_0" href="#upgrade_clients_4_0_0">Upgrading Clients to 4.0.0</a></h5>

<p><b>For a rolling upgrade:</b></p>

Expand All @@ -123,7 +130,7 @@ <h5><a id="upgrade_clients_4_0_1" href="#upgrade_clients_4_0_1">Upgrading Client
or <a href="https://cwiki.apache.org/confluence/x/y4kgF">KIP-1124</a>.</li>
</ol>

<h5><a id="upgrade_4_0_1" href="#upgrade_4_0_1">Upgrading Servers to 4.0.1 from any version 3.3.x through 3.9.x</a></h5>
<h5><a id="upgrade_4_0_0" href="#upgrade_4_0_0">Upgrading Servers to 4.0.0 from any version 3.3.x through 3.9.x</a></h5>

<p>Note: Apache Kafka 4.0 only supports KRaft mode - ZooKeeper mode has been removed. As such, <b>broker upgrades to 4.0.0 (and higher) require KRaft mode and
the software and metadata versions must be at least 3.3.x</b> (the first version when KRaft mode was deemed production ready). For clusters in KRaft mode
Expand All @@ -147,13 +154,7 @@ <h5><a id="upgrade_4_0_1" href="#upgrade_4_0_1">Upgrading Servers to 4.0.1 from
has a boolean parameter that indicates if there are metadata changes (i.e. <code>IBP_4_0_IV1(23, "4.0", "IV1", true)</code> means this version has metadata changes).
Given your current and target versions, a downgrade is only possible if there are no metadata changes in the versions between.</li>
</ol>
<h5><a id="upgrade_servers_401_notable" href="#upgrade_servers_401_notable">Notable changes in 4.0.1</a></h5>
<ul>
<li>
The filename for rotated <code>state-change.log</code> files has been updated from <code>stage-change.log.[date]</code> to <code>state-change.log.[date]</code> in the log4j2.yaml configuration file.
See <a href="https://issues.apache.org/jira/browse/KAFKA-19576">KAFKA-19576</a> for details.
</li>
</ul>

<h5><a id="upgrade_servers_400_notable" href="#upgrade_servers_400_notable">Notable changes in 4.0.0</a></h5>
<ul>
<li>
Expand All @@ -165,7 +166,7 @@ <h5><a id="upgrade_servers_400_notable" href="#upgrade_servers_400_notable">Nota
</li>
<li>
Apache Kafka 4.0 only supports KRaft mode - ZooKeeper mode has been removed. About version upgrade,
check <a href="/{{version}}/documentation.html#upgrade_4_0_1">Upgrading to 4.0.1 from any version 3.3.x through 3.9.x</a> for more info.
check <a href="/{{version}}/documentation.html#upgrade_4_0_0">Upgrading to 4.0.0 from any version 3.3.x through 3.9.x</a> for more info.
</li>
<li>
Apache Kafka 4.0 ships with a brand-new group coordinator implementation (See <a href="https://cwiki.apache.org/confluence/x/HhD1D">here</a>).
Expand Down Expand Up @@ -508,10 +509,6 @@ <h5><a id="upgrade_servers_400_notable" href="#upgrade_servers_400_notable">Nota
<li> See <a href="https://cwiki.apache.org/confluence/x/B40ODg">KIP-890</a> and
<a href="https://cwiki.apache.org/confluence/x/8ItyEg">KIP-1050</a> for more details </li>
</ul>
<li>
The filename for rotated <code>state-change.log</code> files incorrectly rotates to <code>stage-change.log.[date]</code> (changing state to stage). This issue is corrected in 4.0.1.
See <a href="https://issues.apache.org/jira/browse/KAFKA-19576">KAFKA-19576</a> for details.
</li>
</ul>
</li>
</ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public final class AssignmentsManager {
/**
* The metric reflecting the number of pending assignments.
*/
@Deprecated(since = "4.2")
static final MetricName DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC =
deprecatedMetricName("QueuedReplicaToDirAssignments");

static final MetricName QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC =
metricName("QueuedReplicaToDirAssignments");

Expand Down Expand Up @@ -142,10 +146,15 @@ public final class AssignmentsManager {
*/
private final KafkaEventQueue eventQueue;

static MetricName metricName(String name) {
@Deprecated(since = "4.2")
static MetricName deprecatedMetricName(String name) {
return KafkaYammerMetrics.getMetricName("org.apache.kafka.server", "AssignmentsManager", name);
}

static MetricName metricName(String name) {
return KafkaYammerMetrics.getMetricName("kafka.server", "AssignmentsManager", name);
}

public AssignmentsManager(
Time time,
NodeToControllerChannelManager channelManager,
Expand Down Expand Up @@ -182,12 +191,18 @@ public AssignmentsManager(
this.ready = new ConcurrentHashMap<>();
this.inflight = Map.of();
this.metricsRegistry = metricsRegistry;
this.metricsRegistry.newGauge(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC, new Gauge<Integer>() {
this.metricsRegistry.newGauge(DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC, new Gauge<Integer>() {
@Override
public Integer value() {
return numPending();
}
});
this.metricsRegistry.newGauge(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC, new Gauge<Integer>() {
@Override
public Integer value() {
return numPending();
}
});
this.previousGlobalFailures = 0;
this.eventQueue = new KafkaEventQueue(time,
new LogContext("[AssignmentsManager id=" + nodeId + "]"),
Expand Down Expand Up @@ -248,6 +263,7 @@ public void run() {
log.error("Unexpected exception shutting down NodeToControllerChannelManager", e);
}
try {
metricsRegistry.removeMetric(DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
metricsRegistry.removeMetric(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
} catch (Exception e) {
log.error("Unexpected exception removing metrics.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,13 @@ int queuedReplicaToDirAssignments() {
return queuedReplicaToDirAssignments.value();
}

@SuppressWarnings("unchecked") // do not warn about Gauge typecast.
int deprecatedQueuedReplicaToDirAssignments() {
Gauge<Integer> queuedReplicaToDirAssignments =
(Gauge<Integer>) findMetric(AssignmentsManager.DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
return queuedReplicaToDirAssignments.value();
}

@Override
public void close() throws Exception {
try {
Expand Down Expand Up @@ -279,17 +286,20 @@ public void testStartAndShutdown() throws Exception {
public void testSuccessfulAssignment() throws Exception {
try (TestEnv testEnv = new TestEnv()) {
assertEquals(0, testEnv.queuedReplicaToDirAssignments());
assertEquals(0, testEnv.deprecatedQueuedReplicaToDirAssignments());
testEnv.onAssignment(new TopicIdPartition(TOPIC_1, 0), DIR_1);
TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
assertEquals(1, testEnv.assignmentsManager.numPending());
assertEquals(1, testEnv.queuedReplicaToDirAssignments());
assertEquals(1, testEnv.deprecatedQueuedReplicaToDirAssignments());
});
assertEquals(0, testEnv.assignmentsManager.previousGlobalFailures());
assertEquals(1, testEnv.assignmentsManager.numInFlight());
testEnv.successfullyCompleteCallbackOfRequestAssigningTopic1ToDir1();
TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
assertEquals(0, testEnv.assignmentsManager.numPending());
assertEquals(0, testEnv.queuedReplicaToDirAssignments());
assertEquals(0, testEnv.deprecatedQueuedReplicaToDirAssignments());
assertEquals(1, testEnv.success(new TopicIdPartition(TOPIC_1, 0)));
});
assertEquals(0, testEnv.assignmentsManager.previousGlobalFailures());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,16 @@ public class RemoteStorageMetrics {
"kafka.server", "BrokerTopicMetrics", REMOTE_DELETE_LAG_SEGMENTS);
public static final MetricName REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC = getMetricName(
"kafka.log.remote", "RemoteLogManager", REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
public static final MetricName REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = getMetricName(
@Deprecated(since = "4.2")
private static final MetricName DEPRECATE_REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = getMetricName(
"org.apache.kafka.storage.internals.log", "RemoteStorageThreadPool", REMOTE_LOG_READER_TASK_QUEUE_SIZE);
public static final MetricName REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC = getMetricName(
@Deprecated(since = "4.2")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private static final MetricName DEPRECATE_REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC = getMetricName(
"org.apache.kafka.storage.internals.log", "RemoteStorageThreadPool", REMOTE_LOG_READER_AVG_IDLE_PERCENT);
public static final MetricName REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = getMetricName(
"kafka.log.remote", "RemoteStorageThreadPool", REMOTE_LOG_READER_TASK_QUEUE_SIZE);
public static final MetricName REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC = getMetricName(
"kafka.log.remote", "RemoteStorageThreadPool", REMOTE_LOG_READER_AVG_IDLE_PERCENT);
public static final MetricName REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC = getMetricName(
"kafka.log.remote", "RemoteLogManager", REMOTE_LOG_READER_FETCH_RATE_AND_TIME_MS);

Expand All @@ -115,6 +121,8 @@ public static Set<MetricName> allMetrics() {
metrics.add(REMOTE_DELETE_LAG_BYTES_METRIC);
metrics.add(REMOTE_DELETE_LAG_SEGMENTS_METRIC);
metrics.add(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC);
metrics.add(DEPRECATE_REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC);
metrics.add(DEPRECATE_REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC);
metrics.add(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC);
metrics.add(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC);
metrics.add(REMOTE_LOG_METADATA_COUNT_METRIC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@

public final class RemoteStorageThreadPool extends ThreadPoolExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteStorageThreadPool.class);
private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(this.getClass());
private final KafkaMetricsGroup internalsLogMetricsGroup = new KafkaMetricsGroup(this.getClass());
private final KafkaMetricsGroup logRemoteMetricsGroup = new KafkaMetricsGroup("kafka.log.remote", "RemoteStorageThreadPool");

public RemoteStorageThreadPool(String threadNamePattern,
int numThreads,
Expand All @@ -45,9 +46,13 @@ public RemoteStorageThreadPool(String threadNamePattern,
ThreadUtils.createThreadFactory(threadNamePattern, false,
(t, e) -> LOGGER.error("Uncaught exception in thread '{}':", t.getName(), e))
);
metricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(),
internalsLogMetricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(),
() -> getQueue().size());
metricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(),
internalsLogMetricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(),
() -> 1 - (double) getActiveCount() / (double) getCorePoolSize());
logRemoteMetricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(),
() -> getQueue().size());
logRemoteMetricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(),
() -> 1 - (double) getActiveCount() / (double) getCorePoolSize());
}

Expand All @@ -59,6 +64,7 @@ protected void afterExecute(Runnable runnable, Throwable th) {
}

public void removeMetrics() {
REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(metricsGroup::removeMetric);
REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(internalsLogMetricsGroup::removeMetric);
REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(logRemoteMetricsGroup::removeMetric);
}
}