diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java index 3998d672006a3..eec41c6d3b4f1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java @@ -36,6 +36,7 @@ import java.util.Collections; import static org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult.EMPTY; +import static org.apache.kafka.clients.consumer.internals.RequestState.RETRY_BACKOFF_JITTER; /** *

Manages the request creation and response handling for the heartbeat. The module creates a @@ -113,7 +114,7 @@ public abstract class AbstractHeartbeatRequestManagerManages the request creation and response handling for the streams group heartbeat. The class creates a @@ -330,7 +331,7 @@ public StreamsGroupHeartbeatRequestManager(final LogContext logContext, 0, retryBackoffMs, retryBackoffMaxMs, - maxPollIntervalMs + RETRY_BACKOFF_JITTER ); this.pollTimer = time.timer(maxPollIntervalMs); } diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java b/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java index 0599448014717..73d68b6cf4450 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java @@ -19,6 +19,8 @@ import java.util.concurrent.ThreadLocalRandom; +import static java.lang.String.format; + /** * A utility class for keeping the parameters and providing the value of exponential * retry backoff, exponential reconnect backoff, exponential timeout, etc. @@ -42,6 +44,9 @@ public ExponentialBackoff(long initialInterval, int multiplier, long maxInterval this.initialInterval = Math.min(maxInterval, initialInterval); this.multiplier = multiplier; this.maxInterval = maxInterval; + if (jitter < 0 || jitter > 1) { + throw new IllegalArgumentException(format("jitter must be between 0 and 1, but got %s", jitter)); + } this.jitter = jitter; this.expMax = maxInterval > initialInterval ? Math.log(maxInterval / (double) Math.max(initialInterval, 1)) / Math.log(multiplier) : 0; diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffTest.java index 4e843863ab5c7..fff921db29b22 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffTest.java @@ -20,6 +20,7 @@ import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class ExponentialBackoffTest { @@ -54,4 +55,14 @@ public void testExponentialBackoffWithoutJitter() { assertEquals(400, exponentialBackoff.backoff(2)); assertEquals(400, exponentialBackoff.backoff(3)); } + + @Test + public void testExponentialBackoffWithInvalidJitter() { + assertEquals("jitter must be between 0 and 1, but got -1.0", + assertThrows(IllegalArgumentException.class, + () -> new ExponentialBackoff(100, 2, 400, -1)).getMessage()); + assertEquals("jitter must be between 0 and 1, but got 3000.0", + assertThrows(IllegalArgumentException.class, + () -> new ExponentialBackoff(100, 2, 400, 3000)).getMessage()); + } }