diff --git a/docs/upgrade.html b/docs/upgrade.html index f0f3f5407388d..481ae166237b4 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -40,6 +40,18 @@
Notable changes in 4
  • The PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG in ProducerConfig was deprecated and will be removed in Kafka 5.0. Please use the PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE_CONFIG instead.
  • +
  • + The metrics org.apache.kafka.server:type=AssignmentsManager.QueuedReplicaToDirAssignments, + org.apache.kafka.storage.internals.log:type=RemoteStorageThreadPool.RemoteLogReaderTaskQueueSize, and + org.apache.kafka.storage.internals.log:type=RemoteStorageThreadPool.RemoteLogReaderAvgIdlePercent + have been deprecated and will be removed in Kafka 5.0. + + As replacements, the following metrics have been introduced, which report the same information: + kafka.server:type=AssignmentsManager.QueuedReplicaToDirAssignments, + kafka.log.remote:type=RemoteStorageThreadPool.RemoteLogReaderTaskQueueSize, and + kafka.log.remote:type=RemoteStorageThreadPool.RemoteLogReaderAvgIdlePercent. + For further details, please refer to KIP-1100. +
  • Upgrading to 4.1.0

    @@ -60,9 +72,6 @@
    Notable changes in 4 The logger class name for LogCleaner has been updated from kafka.log.LogCleaner to org.apache.kafka.storage.internals.log.LogCleaner in the log4j2.yaml configuration file. Added loggers for org.apache.kafka.storage.internals.log.LogCleaner$CleanerThread and org.apache.kafka.storage.internals.log.Cleaner classes to CleanerAppender. -
  • - The filename for rotated state-change.log files has been updated from stage-change.log.[date] to state-change.log.[date] in the log4j2.yaml configuration file. -
  • Broker @@ -74,8 +83,6 @@
    Notable changes in 4
  • The KIP-966 part 1: Eligible Leader Replicas(ELR) will be enabled by default on the new clusters. - After the ELR feature enabled, the previously set min.insync.replicas value at the broker-level config will be removed. - Please set at the cluster-level if necessary. For further details, please refer to here.
  • @@ -110,9 +117,9 @@
    Notable changes in 4 -

    Upgrading to 4.0.1

    +

    Upgrading to 4.0.0

    -
    Upgrading Clients to 4.0.1
    +
    Upgrading Clients to 4.0.0

    For a rolling upgrade:

    @@ -123,7 +130,7 @@
    Upgrading Client or KIP-1124. -
    Upgrading Servers to 4.0.1 from any version 3.3.x through 3.9.x
    +
    Upgrading Servers to 4.0.0 from any version 3.3.x through 3.9.x

    Note: Apache Kafka 4.0 only supports KRaft mode - ZooKeeper mode has been removed. As such, broker upgrades to 4.0.0 (and higher) require KRaft mode and the software and metadata versions must be at least 3.3.x (the first version when KRaft mode was deemed production ready). For clusters in KRaft mode @@ -147,13 +154,7 @@

    Upgrading Servers to 4.0.1 from has a boolean parameter that indicates if there are metadata changes (i.e. IBP_4_0_IV1(23, "4.0", "IV1", true) means this version has metadata changes). Given your current and target versions, a downgrade is only possible if there are no metadata changes in the versions between. -
    Notable changes in 4.0.1
    - +
    Notable changes in 4.0.0
    -
  • - The filename for rotated state-change.log files incorrectly rotates to stage-change.log.[date] (changing state to stage). This issue is corrected in 4.0.1. - See KAFKA-19576 for details. -
  • diff --git a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java index 5b20e1475a203..8235ec80c9543 100644 --- a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java +++ b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java @@ -74,6 +74,10 @@ public final class AssignmentsManager { /** * The metric reflecting the number of pending assignments. */ + @Deprecated(since = "4.2") + static final MetricName DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC = + deprecatedMetricName("QueuedReplicaToDirAssignments"); + static final MetricName QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC = metricName("QueuedReplicaToDirAssignments"); @@ -142,10 +146,15 @@ public final class AssignmentsManager { */ private final KafkaEventQueue eventQueue; - static MetricName metricName(String name) { + @Deprecated(since = "4.2") + static MetricName deprecatedMetricName(String name) { return KafkaYammerMetrics.getMetricName("org.apache.kafka.server", "AssignmentsManager", name); } + static MetricName metricName(String name) { + return KafkaYammerMetrics.getMetricName("kafka.server", "AssignmentsManager", name); + } + public AssignmentsManager( Time time, NodeToControllerChannelManager channelManager, @@ -182,12 +191,18 @@ public AssignmentsManager( this.ready = new ConcurrentHashMap<>(); this.inflight = Map.of(); this.metricsRegistry = metricsRegistry; - this.metricsRegistry.newGauge(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC, new Gauge() { + this.metricsRegistry.newGauge(DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC, new Gauge() { @Override public Integer value() { return numPending(); } }); + this.metricsRegistry.newGauge(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC, new Gauge() { + @Override + public Integer value() { + return numPending(); + } + }); this.previousGlobalFailures = 0; this.eventQueue = new KafkaEventQueue(time, new LogContext("[AssignmentsManager id=" + nodeId + "]"), @@ -248,6 +263,7 @@ public void run() { log.error("Unexpected exception shutting down NodeToControllerChannelManager", e); } try { + metricsRegistry.removeMetric(DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC); metricsRegistry.removeMetric(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC); } catch (Exception e) { log.error("Unexpected exception removing metrics.", e); diff --git a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java index 3397a7488ff6a..4c533dd5737d4 100644 --- a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java +++ b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java @@ -250,6 +250,13 @@ int queuedReplicaToDirAssignments() { return queuedReplicaToDirAssignments.value(); } + @SuppressWarnings("unchecked") // do not warn about Gauge typecast. + int deprecatedQueuedReplicaToDirAssignments() { + Gauge queuedReplicaToDirAssignments = + (Gauge) findMetric(AssignmentsManager.DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC); + return queuedReplicaToDirAssignments.value(); + } + @Override public void close() throws Exception { try { @@ -279,10 +286,12 @@ public void testStartAndShutdown() throws Exception { public void testSuccessfulAssignment() throws Exception { try (TestEnv testEnv = new TestEnv()) { assertEquals(0, testEnv.queuedReplicaToDirAssignments()); + assertEquals(0, testEnv.deprecatedQueuedReplicaToDirAssignments()); testEnv.onAssignment(new TopicIdPartition(TOPIC_1, 0), DIR_1); TestUtils.retryOnExceptionWithTimeout(60_000, () -> { assertEquals(1, testEnv.assignmentsManager.numPending()); assertEquals(1, testEnv.queuedReplicaToDirAssignments()); + assertEquals(1, testEnv.deprecatedQueuedReplicaToDirAssignments()); }); assertEquals(0, testEnv.assignmentsManager.previousGlobalFailures()); assertEquals(1, testEnv.assignmentsManager.numInFlight()); @@ -290,6 +299,7 @@ public void testSuccessfulAssignment() throws Exception { TestUtils.retryOnExceptionWithTimeout(60_000, () -> { assertEquals(0, testEnv.assignmentsManager.numPending()); assertEquals(0, testEnv.queuedReplicaToDirAssignments()); + assertEquals(0, testEnv.deprecatedQueuedReplicaToDirAssignments()); assertEquals(1, testEnv.success(new TopicIdPartition(TOPIC_1, 0))); }); assertEquals(0, testEnv.assignmentsManager.previousGlobalFailures()); diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java index 7922d88d831a5..a033d400fd8d1 100644 --- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java +++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java @@ -90,10 +90,16 @@ public class RemoteStorageMetrics { "kafka.server", "BrokerTopicMetrics", REMOTE_DELETE_LAG_SEGMENTS); public static final MetricName REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC = getMetricName( "kafka.log.remote", "RemoteLogManager", REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT); - public static final MetricName REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = getMetricName( + @Deprecated(since = "4.2") + private static final MetricName DEPRECATE_REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = getMetricName( "org.apache.kafka.storage.internals.log", "RemoteStorageThreadPool", REMOTE_LOG_READER_TASK_QUEUE_SIZE); - public static final MetricName REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC = getMetricName( + @Deprecated(since = "4.2") + private static final MetricName DEPRECATE_REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC = getMetricName( "org.apache.kafka.storage.internals.log", "RemoteStorageThreadPool", REMOTE_LOG_READER_AVG_IDLE_PERCENT); + public static final MetricName REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = getMetricName( + "kafka.log.remote", "RemoteStorageThreadPool", REMOTE_LOG_READER_TASK_QUEUE_SIZE); + public static final MetricName REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC = getMetricName( + "kafka.log.remote", "RemoteStorageThreadPool", REMOTE_LOG_READER_AVG_IDLE_PERCENT); public static final MetricName REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC = getMetricName( "kafka.log.remote", "RemoteLogManager", REMOTE_LOG_READER_FETCH_RATE_AND_TIME_MS); @@ -115,6 +121,8 @@ public static Set allMetrics() { metrics.add(REMOTE_DELETE_LAG_BYTES_METRIC); metrics.add(REMOTE_DELETE_LAG_SEGMENTS_METRIC); metrics.add(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC); + metrics.add(DEPRECATE_REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC); + metrics.add(DEPRECATE_REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC); metrics.add(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC); metrics.add(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC); metrics.add(REMOTE_LOG_METADATA_COUNT_METRIC); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java index a09b558b1247a..cf4e66e373e3c 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java @@ -32,7 +32,8 @@ public final class RemoteStorageThreadPool extends ThreadPoolExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteStorageThreadPool.class); - private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(this.getClass()); + private final KafkaMetricsGroup internalsLogMetricsGroup = new KafkaMetricsGroup(this.getClass()); + private final KafkaMetricsGroup logRemoteMetricsGroup = new KafkaMetricsGroup("kafka.log.remote", "RemoteStorageThreadPool"); public RemoteStorageThreadPool(String threadNamePattern, int numThreads, @@ -45,9 +46,13 @@ public RemoteStorageThreadPool(String threadNamePattern, ThreadUtils.createThreadFactory(threadNamePattern, false, (t, e) -> LOGGER.error("Uncaught exception in thread '{}':", t.getName(), e)) ); - metricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(), + internalsLogMetricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(), () -> getQueue().size()); - metricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(), + internalsLogMetricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(), + () -> 1 - (double) getActiveCount() / (double) getCorePoolSize()); + logRemoteMetricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(), + () -> getQueue().size()); + logRemoteMetricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(), () -> 1 - (double) getActiveCount() / (double) getCorePoolSize()); } @@ -59,6 +64,7 @@ protected void afterExecute(Runnable runnable, Throwable th) { } public void removeMetrics() { - REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(metricsGroup::removeMetric); + REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(internalsLogMetricsGroup::removeMetric); + REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(logRemoteMetricsGroup::removeMetric); } }