diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/hamcrest/KafkaMatchers.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/hamcrest/KafkaMatchers.java index 25dd7ab9e7..985afa133c 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/hamcrest/KafkaMatchers.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/hamcrest/KafkaMatchers.java @@ -22,6 +22,7 @@ import org.hamcrest.Matcher; /** + * The Hamcrest {@link Matcher}s utilities. * * @author Gary Russell * diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java index 9acdb4495a..a723320b27 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java @@ -63,6 +63,8 @@ import scala.collection.Set; /** + * The {@link KafkaRule} implementation for the embedded Kafka Broker and Zookeeper. + * * @author Marius Bogoevici * @author Artem Bilan * @author Gary Russell diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java index b0ab3acac5..6abcd8e35f 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java @@ -25,6 +25,8 @@ import org.springframework.kafka.test.rule.KafkaEmbedded; /** + * The Kafka specific testing utilities. + * * @author Gary Russell * */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index 1c1bcca756..48acb3799c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -73,6 +73,9 @@ * factory or for fine-grained control over endpoints registration. See * {@link EnableKafka} Javadoc for complete usage details. * + * @param the key type. + * @param the value type. + * * @author Stephane Nicoll * @author Juergen Hoeller * @author Gary Russell @@ -485,7 +488,8 @@ private Object resolveExpression(String value) { /** * Resolve the specified value if possible. - * + * @param value the value to resolve + * @return the resolved value * @see ConfigurableBeanFactory#resolveEmbeddedValue */ private String resolve(String value) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/TopicPartition.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/TopicPartition.java index 26abf1951e..d9054b7ecc 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/TopicPartition.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/TopicPartition.java @@ -31,6 +31,7 @@ public @interface TopicPartition { /** + * The topic to listen on. * @return the topic to listen on. Property place holders * and SpEL expressions are supported, which must resolve * to a String. @@ -38,6 +39,7 @@ String topic(); /** + * The partitions within the topic. * @return the partitions within the topic. Property place * holders and SpEL expressions are supported, which must * resolve to Integers (or Strings that can be parsed as diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java index 200ffc1e00..6c8c384913 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java @@ -27,6 +27,10 @@ /** * Base {@link KafkaListenerContainerFactory} for Spring's base container implementation. * + * @param the {@link AbstractMessageListenerContainer} implementation type. + * @param the key type. + * @param the value type. + * * @author Stephane Nicoll * * @see AbstractMessageListenerContainer @@ -51,6 +55,7 @@ public abstract class AbstractKafkaListenerContainerFactory consumerFactory) { @@ -62,6 +67,7 @@ public ConsumerFactory getConsumerFactory() { } /** + * Specify an {@link ErrorHandler} to use. * @param errorHandler The error handler. * @see AbstractMessageListenerContainer#setErrorHandler(ErrorHandler) */ @@ -70,6 +76,7 @@ public void setErrorHandler(ErrorHandler errorHandler) { } /** + * Specify an {@link Executor} to use. * @param taskExecutor the {@link Executor} to use. * @see AbstractKafkaListenerContainerFactory#setTaskExecutor */ @@ -78,6 +85,7 @@ public void setTaskExecutor(Executor taskExecutor) { } /** + * Specify an {@code autoStartup boolean} flag. * @param autoStartup true for auto startup. * @see AbstractMessageListenerContainer#setAutoStartup(boolean) */ @@ -86,6 +94,7 @@ public void setAutoStartup(Boolean autoStartup) { } /** + * Specify a {@code phase} to use. * @param phase The phase. * @see AbstractMessageListenerContainer#setPhase(int) */ @@ -94,6 +103,7 @@ public void setPhase(int phase) { } /** + * Specify an {@code ackCount} to use. * @param ackCount the ack count. * @see AbstractMessageListenerContainer#setAckCount(int) */ @@ -102,6 +112,7 @@ public void setAckCount(Integer ackCount) { } /** + * Specify an {@link AckMode} to use. * @param ackMode the ack mode. * @see AbstractMessageListenerContainer#setAckMode(AckMode) */ @@ -110,6 +121,7 @@ public void setAckMode(AckMode ackMode) { } /** + * Specify a {@code pollTimeout} to use. * @param pollTimeout the poll timeout * @see AbstractMessageListenerContainer#setPollTimeout(long) */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java index 739f71dd15..f736e28cf7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java @@ -36,7 +36,10 @@ import org.springframework.util.Assert; /** - * Base model for a Kafka listener endpoint + * Base model for a Kafka listener endpoint. + * + * @param the key type. + * @param the value type. * * @author Stephane Nicoll * @author Gary Russell @@ -95,8 +98,7 @@ public String getId() { } /** - * Set the topics to use. Either these or 'topicPattern' - * or 'topicPartitions' + * Set the topics to use. Either these or 'topicPattern' or 'topicPartitions' * should be provided, but not a mixture. * @param topics to set. * @see #setTopicPartitions(TopicPartition...) @@ -109,6 +111,7 @@ public void setTopics(String... topics) { } /** + * Return the topics for this endpoint. * @return the topics for this endpoint. */ @Override @@ -131,6 +134,7 @@ public void setTopicPartitions(TopicPartition... topicPartitions) { } /** + * Return the topicPartitions for this endpoint. * @return the topicPartitions for this endpoint. */ @Override @@ -150,6 +154,7 @@ public void setTopicPattern(Pattern topicPattern) { } /** + * Return the topicPattern for this endpoint. * @return the topicPattern for this endpoint. */ @Override @@ -207,6 +212,7 @@ private void setupMessageListener(MessageListenerContainer container) { } /** + * Return a description for this endpoint. * @return a description for this endpoint. *

Available to subclasses, for inclusion in their {@code toString()} result. */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerContainerFactory.java index d97883fd10..62001748cb 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerContainerFactory.java @@ -16,13 +16,17 @@ package org.springframework.kafka.config; +import org.springframework.kafka.listener.AbstractMessageListenerContainer; import org.springframework.kafka.listener.MessageListenerContainer; /** * Factory of {@link MessageListenerContainer} based on a * {@link KafkaListenerEndpoint} definition. * + * @param the {@link AbstractMessageListenerContainer} implementation type. + * * @author Stephane Nicoll + * * @see KafkaListenerEndpoint */ public interface KafkaListenerContainerFactory { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java index 8884407db4..40f6aca0db 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java @@ -34,6 +34,7 @@ public interface KafkaListenerEndpoint { /** + * Return the id of this endpoint. * @return the id of this endpoint. The id can be further qualified * when the endpoint is resolved against its actual listener * container. @@ -42,21 +43,25 @@ public interface KafkaListenerEndpoint { String getId(); /** + * Return the group of this endpoint or null if not in a group. * @return the group of this endpoint or null if not in a group. */ String getGroup(); /** + * Return the topics for this endpoint. * @return the topics for this endpoint. */ Collection getTopics(); /** + * Return the topicPartitions for this endpoint. * @return the topicPartitions for this endpoint. */ Collection getTopicPartitions(); /** + * Return the topicPattern for this endpoint. * @return the topicPattern for this endpoint. */ Pattern getTopicPattern(); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java index 31783adc5a..7ee9b2a1e1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java @@ -63,6 +63,8 @@ public void setEndpointRegistry(KafkaListenerEndpointRegistry endpointRegistry) } /** + * Return the {@link KafkaListenerEndpointRegistry} instance for this + * registrar, may be {@code null}. * @return the {@link KafkaListenerEndpointRegistry} instance for this * registrar, may be {@code null}. */ @@ -84,6 +86,7 @@ public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory kafkaHand } /** + * Return the custom {@link MessageHandlerMethodFactory} to use, if any. * @return the custom {@link MessageHandlerMethodFactory} to use, if any. */ public MessageHandlerMethodFactory getMessageHandlerMethodFactory() { @@ -201,7 +204,8 @@ private static final class KafkaListenerEndpointDescriptor { private final KafkaListenerContainerFactory containerFactory; - private KafkaListenerEndpointDescriptor(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory containerFactory) { + private KafkaListenerEndpointDescriptor(KafkaListenerEndpoint endpoint, + KafkaListenerContainerFactory containerFactory) { this.endpoint = endpoint; this.containerFactory = containerFactory; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java index 91647facb0..24e1e262e6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java @@ -104,6 +104,7 @@ public Set getListenerContainerIds() { } /** + * Return the managed {@link MessageListenerContainer} instance(s). * @return the managed {@link MessageListenerContainer} instance(s). */ public Collection getListenerContainers() { @@ -262,6 +263,7 @@ public boolean isRunning() { /** * Start the specified {@link MessageListenerContainer} if it should be started * on startup. + * @param listenerContainer the listener container to start. * @see MessageListenerContainer#isAutoStartup() */ private static void startIfNecessary(MessageListenerContainer listenerContainer) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java index 4bed83de43..febbd3470c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java @@ -29,6 +29,9 @@ * A {@link KafkaListenerEndpoint} providing the method to invoke to process * an incoming message for this endpoint. * + * @param the key type. + * @param the value type. + * * @author Stephane Nicoll * @author Artem Bilan * @author Gary Russell @@ -77,6 +80,7 @@ public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory messageHa } /** + * Return the {@link MessageHandlerMethodFactory}. * @return the messageHandlerMethodFactory */ protected MessageHandlerMethodFactory getMessageHandlerMethodFactory() { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java index 4d3e29e1ab..4b91f68f8e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java @@ -26,8 +26,16 @@ import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; /** + * The {@link MethodKafkaListenerEndpoint} extension for several POJO methods + * based on the {@link org.springframework.kafka.annotation.KafkaHandler}. + * + * @param the key type. + * @param the value type. + * * @author Gary Russell * + * @see org.springframework.kafka.annotation.KafkaHandler + * @see DelegatingInvocableHandler */ public class MultiMethodKafkaListenerEndpoint extends MethodKafkaListenerEndpoint { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/SimpleKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/SimpleKafkaListenerContainerFactory.java index 63d89f3653..6a4eb5ca8f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/SimpleKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/SimpleKafkaListenerContainerFactory.java @@ -25,10 +25,13 @@ /** * A {@link KafkaListenerContainerFactory} implementation to build a regular * {@link ConcurrentMessageListenerContainer}. - * - *

This should be the default for most users and a good transition paths + *

+ * This should be the default for most users and a good transition paths * for those that are used to build such container definition manually. * + * @param the key type. + * @param the value type. + * * @author Stephane Nicoll * @author Gary Russell * @author Artem Bilan @@ -41,6 +44,7 @@ public class SimpleKafkaListenerContainerFactory private Long recentOffset; /** + * Specify the container concurrency. * @param concurrency the number of consumers to create. * @see ConcurrentMessageListenerContainer#setConcurrency(int) */ @@ -49,6 +53,7 @@ public void setConcurrency(Integer concurrency) { } /** + * Specify the offset lag from the end of commit. * @param recentOffset the recent offset. * @see ConcurrentMessageListenerContainer#setRecentOffset(long) */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/SimpleKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/SimpleKafkaListenerEndpoint.java index a5e7e92f52..1149a3e9f3 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/SimpleKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/SimpleKafkaListenerEndpoint.java @@ -24,6 +24,9 @@ * A {@link KafkaListenerEndpoint} simply providing the {@link MessageListener} to * invoke to process an incoming message for this endpoint. * + * @param the key type. + * @param the value type. + * * @author Stephane Nicoll * @author Gary Russell */ @@ -42,6 +45,8 @@ public void setMessageListener(MessageListener messageListener) { } /** + * Return the {@link MessageListener} to invoke when a message matching + * the endpoint is received. * @return the {@link MessageListener} to invoke when a message matching * the endpoint is received. */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ConsumerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ConsumerFactory.java index 8007506a51..2c6d0f8e65 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/ConsumerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ConsumerFactory.java @@ -19,8 +19,12 @@ import org.apache.kafka.clients.consumer.Consumer; /** - * @author Gary Russell + * The strategy to produce a {@link Consumer} instance(s). + * + * @param the key type. + * @param the value type. * + * @author Gary Russell */ public interface ConsumerFactory { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java index 3ee8aaca78..dfee6ec1c1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java @@ -24,8 +24,14 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; /** - * @author Gary Russell + * The {@link ConsumerFactory} implementation to produce a new {@link Consumer} instance + * for provided {@link Map} {@code configs} on each {@link #createConsumer()} + * invocation. + * + * @param the key type. + * @param the value type. * + * @author Gary Russell */ public class DefaultKafkaConsumerFactory implements ConsumerFactory { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java index 152209f00e..3eaea3fbfc 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java @@ -37,8 +37,17 @@ import org.springframework.context.Lifecycle; /** - * @author Gary Russell + * The {@link ProducerFactory} implementation for the {@code singleton} shared {@link Producer} + * instance. + *

+ * The {@link Producer} instance is freed from the external {@link Producer#close()} invocation + * with the internal wrapper. The real {@link Producer#close()} is called on the target + * {@link Producer} during the {@link Lifecycle#stop()} or {@link DisposableBean#destroy()}. + * + * @param the key type. + * @param the value type. * + * @author Gary Russell */ public class DefaultKafkaProducerFactory implements ProducerFactory, Lifecycle, DisposableBean { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaException.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaException.java index 52d0e06940..7407a8f7a1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaException.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaException.java @@ -16,12 +16,16 @@ package org.springframework.kafka.core; +import org.springframework.core.NestedRuntimeException; + /** - * @author Gary Russell + * The Spring Kafka specif {@link NestedRuntimeException} implementation. * + * @author Gary Russell + * @author Artem Bilan */ @SuppressWarnings("serial") -public class KafkaException extends RuntimeException { +public class KafkaException extends NestedRuntimeException { public KafkaException(String message) { super(message); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java index caac080086..aeecc0a0d4 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java @@ -22,11 +22,13 @@ import org.apache.kafka.clients.producer.RecordMetadata; /** - * @author Marius Bogoevici - * @author Gary Russell + * The basic Kafka operation contract. * * @param the key type. * @param the value type. + * + * @author Marius Bogoevici + * @author Gary Russell */ public interface KafkaOperations { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index ec8a8ff947..7e306dd159 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -32,6 +32,9 @@ /** * A template for executing high-level operations. * + * @param the key type. + * @param the value type. + * * @author Marius Bogoevici * @author Gary Russell */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactory.java index 8d9e18895c..5a5b38f02c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactory.java @@ -19,8 +19,12 @@ import org.apache.kafka.clients.producer.Producer; /** - * @author Gary Russell + * The strategy to produce a {@link Producer} instance(s). + * + * @param the key type. + * @param the value type. * + * @author Gary Russell */ public interface ProducerFactory { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index d431a1877c..1b0edcf43f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -27,6 +27,10 @@ import org.springframework.util.Assert; /** + * The base implementation for the {@link MessageListenerContainer}. + * + * @param the key type. + * @param the value type. * * @author Gary Russell */ @@ -35,6 +39,9 @@ public abstract class AbstractMessageListenerContainer protected final Log logger = LogFactory.getLog(this.getClass()); //NOSONAR + /** + * The offset commit behavior enumeration. + */ public enum AckMode { /** * Call {@link Consumer#commitAsync()} after each record is passed to the listener. @@ -156,6 +163,7 @@ public void setAckMode(AckMode ackMode) { } /** + * Return the {@link AckMode}. * @return the {@link AckMode} * @see #setAckMode(AckMode) */ @@ -172,6 +180,7 @@ public void setPollTimeout(long pollTimeout) { } /** + * Return the poll timeout. * @return the poll timeout. * @see #setPollTimeout(long) */ @@ -189,6 +198,7 @@ public void setAckCount(int count) { } /** + * Return the count. * @return the count. * @see #setAckCount(int) */ @@ -207,7 +217,8 @@ public void setAckTime(long millis) { } /** - * @return the time. + * Return the ack time. + * @return the ack time. * @see AbstractMessageListenerContainer#setAckTime(long) */ public long getAckTime() { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AcknowledgingMessageListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AcknowledgingMessageListener.java index 9d8d615451..ffcee55a54 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AcknowledgingMessageListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AcknowledgingMessageListener.java @@ -24,14 +24,16 @@ * Listener for handling incoming Kafka messages, propagating an acknowledgment handle that recipients * can invoke when the message has been processed. * + * @param the key type. + * @param the value type. + * * @author Marius Bogoevici * @author Gary Russell */ public interface AcknowledgingMessageListener { /** - * Executes when a Kafka message is received - * + * Executes when a Kafka message is received. * @param record the Kafka message to be processed * @param acknowledgment a handle for acknowledging the message processing */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java index 0e92bfaec9..bd8ce7f87a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java @@ -35,6 +35,9 @@ * constructor is used, the {@link TopicPartition}s are distributed evenly across the * instances. * + * @param the key type. + * @param the value type. + * * @author Marius Bogoevici * @author Gary Russell */ @@ -131,6 +134,8 @@ public void setConcurrency(int concurrency) { } /** + * Return the list of {@link KafkaMessageListenerContainer}s created by + * this container. * @return the list of {@link KafkaMessageListenerContainer}s created by * this container. */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandler.java index a383df4223..0613b5769a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandler.java @@ -19,7 +19,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; /** - * Handles errors thrown during the execution of a {@link MessageListener} + * Handles errors thrown during the execution of a {@link MessageListener}. * * @author Marius Bogoevici * @author Gary Russell diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 7c571e0417..ced5512532 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -52,8 +52,10 @@ *

* With the latter, initial partition offsets can be provided. * - * @author Gary Russell + * @param the key type. + * @param the value type. * + * @author Gary Russell */ public class KafkaMessageListenerContainer extends AbstractMessageListenerContainer { @@ -134,6 +136,8 @@ public void setRecentOffset(long recentOffset) { } /** + * Return the {@link TopicPartition}s currently assigned to this container, + * either explicitly or by Kafka; may be null if not assigned yet. * @return the {@link TopicPartition}s currently assigned to this container, * either explicitly or by Kafka; may be null if not assigned yet. */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerExecutionFailedException.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerExecutionFailedException.java index f39ba64d8c..cacbac14ed 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerExecutionFailedException.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerExecutionFailedException.java @@ -19,8 +19,9 @@ import org.springframework.kafka.core.KafkaException; /** - * @author Gary Russell + * The listener specif {@link KafkaException} extension. * + * @author Gary Russell */ @SuppressWarnings("serial") public class ListenerExecutionFailedException extends KafkaException { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/LoggingErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/LoggingErrorHandler.java index 4c78813e26..6e7987335c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/LoggingErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/LoggingErrorHandler.java @@ -23,6 +23,8 @@ import org.springframework.util.ObjectUtils; /** + * The {@link ErrorHandler} implementation for logging purpose. + * * @author Marius Bogoevici * @author Gary Russell */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListener.java index b5d5cdda4e..032bfe699d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListener.java @@ -19,7 +19,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; /** - * Listener for handling incoming Kafka messages + * Listener for handling incoming Kafka messages. + * + * @param the key type. + * @param the value type. * * @author Marius Bogoevici * @author Gary Russell diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractAdaptableMessageListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractAdaptableMessageListener.java index c80b051c09..06d3599ae9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractAdaptableMessageListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractAdaptableMessageListener.java @@ -27,6 +27,9 @@ * An abstract {@link MessageListener} adapter providing the necessary infrastructure * to extract the payload of a {@link org.springframework.messaging.Message}. * + * @param the key type. + * @param the value type. + * * @author Stephane Nicoll * @author Gary Russell * @@ -36,16 +39,13 @@ public abstract class AbstractAdaptableMessageListener implements MessageListener, AcknowledgingMessageListener { - /** Logger available to subclasses */ protected final Log logger = LogFactory.getLog(getClass()); //NOSONAR /** * Kafka {@link MessageListener} entry point. - *

- * Delegates the message to the target listener method, with appropriate conversion of the message argument. + *

Delegates the message to the target listener method, with appropriate conversion of the message argument. * In case of an exception, the {@link #handleListenerException(Throwable)} method will be invoked. - *

* @param record the incoming Kafka {@link ConsumerRecord}. * @see #handleListenerException * @see AcknowledgingMessageListener#onMessage(ConsumerRecord, org.springframework.kafka.support.Acknowledgment) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java index 172a66f8ba..c5f62bf49f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java @@ -58,7 +58,8 @@ public DelegatingInvocableHandler(List handlers, Object } /** - * @return the bean + * Return the bean for this handler. + * @return the bean. */ public Object getBean() { return this.bean; @@ -68,9 +69,9 @@ public Object getBean() { * Invoke the method with the given message. * @param message the message. * @param providedArgs additional arguments. + * @return the result of the invocation. * @throws Exception raised if no suitable argument resolver can be found, * or the method raised an exception. - * @return the result of the invocation. */ public Object invoke(Message message, Object... providedArgs) throws Exception { //NOSONAR Class payloadClass = message.getPayload().getClass(); @@ -79,6 +80,7 @@ public Object invoke(Message message, Object... providedArgs) throws Exceptio } /** + * Determine the {@link InvocableHandlerMethod} for the provided type. * @param payloadClass the payload class. * @return the handler. */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java index 4903a8997b..e21c725e00 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java @@ -70,5 +70,4 @@ public Object getBean() { } } - } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index e1a96bd874..7d6fbf0a9c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -36,6 +36,9 @@ * the {@link Acknowledgment} are provided as additional arguments so that these can * be injected as method arguments if necessary. * + * @param the key type. + * @param the value type. + * * @author Stephane Nicoll * @author Gary Russell * @author Artem Bilan @@ -56,7 +59,7 @@ public void setHandlerMethod(HandlerAdapter handlerMethod) { } /** - * Set the MessageConverter + * Set the MessageConverter. * @param messageConverter the converter. */ public void setMessageConverter(MessageConverter messageConverter) { @@ -64,6 +67,8 @@ public void setMessageConverter(MessageConverter messageConverter) { } /** + * Return the {@link MessagingMessageConverter} for this listener, + * being able to convert {@link org.springframework.messaging.Message}. * @return the {@link MessagingMessageConverter} for this listener, * being able to convert {@link org.springframework.messaging.Message}. */ @@ -88,6 +93,10 @@ protected Message toMessagingMessage(ConsumerRecord record, Acknowledgm /** * Invoke the handler, wrapping any exception to a {@link ListenerExecutionFailedException} * with a dedicated error message. + * @param record the record to process during invocation. + * @param acknowledgment the acknowledgment to use if any. + * @param message the message to process. + * @return the result of invocation. */ private Object invokeHandler(ConsumerRecord record, Acknowledgment acknowledgment, Message message) { try { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java b/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java index fa364a8260..ed128b6e78 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java @@ -17,6 +17,8 @@ package org.springframework.kafka.support; /** + * The Kafka specific message headers constants. + * * @author Artem Bilan * @author Marius Bogoevici */ @@ -24,14 +26,30 @@ public abstract class KafkaHeaders { private static final String PREFIX = "kafka_"; + /** + * The header for topic. + */ public static final String TOPIC = PREFIX + "topic"; + + /** + * The header for message key. + */ public static final String MESSAGE_KEY = PREFIX + "messageKey"; + /** + * The header for topic partition. + */ public static final String PARTITION_ID = PREFIX + "partitionId"; + /** + * The header for partition offset. + */ public static final String OFFSET = PREFIX + "offset"; + /** + * The header for {@link Acknowledgment}. + */ public static final String ACKNOWLEDGMENT = PREFIX + "acknowledgment"; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/LoggingProducerListener.java b/spring-kafka/src/main/java/org/springframework/kafka/support/LoggingProducerListener.java index cee3d03f3c..b20888cc0b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/LoggingProducerListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/LoggingProducerListener.java @@ -24,6 +24,9 @@ /** * The {@link ProducerListener} that logs exceptions thrown when sending messages. * + * @param the key type. + * @param the value type. + * * @author Marius Bogoevici * @author Gary Russell */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/ProducerListener.java b/spring-kafka/src/main/java/org/springframework/kafka/support/ProducerListener.java index 8309ec5fc9..7b8e3f8a0d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/ProducerListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/ProducerListener.java @@ -25,6 +25,9 @@ * Its main goal is to provide a stateless singleton delegate for {@link org.apache.kafka.clients.producer.Callback}s, * which, in all but the most trivial cases, requires creating a separate instance per message. * + * @param the key type. + * @param the value type. + * * @author Marius Bogoevici * @author Gary Russell * @@ -33,7 +36,7 @@ public interface ProducerListener { /** - * Invoked after the successful send of a message (that is, after it has been acknowledged by the broker) + * Invoked after the successful send of a message (that is, after it has been acknowledged by the broker). * @param topic the destination topic * @param partition the destination partition (could be null) * @param key the key of the outbound message @@ -43,7 +46,7 @@ public interface ProducerListener { void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata); /** - * Invoked after an attempt to send a message has failed + * Invoked after an attempt to send a message has failed. * @param topic the destination topic * @param partition the destination partition (could be null) * @param key the key of the outbound message diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/ProducerListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/ProducerListenerAdapter.java index 2af460f76f..6b2377bd79 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/ProducerListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/ProducerListenerAdapter.java @@ -21,6 +21,9 @@ /** * No-op implementation of {@link ProducerListener}, to be used as base class for other implementations. * + * @param the key type. + * @param the value type. + * * @author Marius Bogoevici * @author Gary Russell * @author Artem Bilan diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/ProducerListenerInvokingCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/support/ProducerListenerInvokingCallback.java index bc663bd8f7..1e37e3449d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/ProducerListenerInvokingCallback.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/ProducerListenerInvokingCallback.java @@ -25,6 +25,9 @@ * Adapts the {@link org.apache.kafka.clients.producer.Callback} interface of the * {@link org.apache.kafka.clients.producer.Producer} to a {@link ProducerListener}. * + * @param the key type. + * @param the value type. + * * @author Marius Bogoevici * @author Gary Russell */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java index 4231dfe1a0..4138d6a505 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java @@ -22,8 +22,12 @@ import org.springframework.messaging.Message; /** - * @author Gary Russell + * The Kafka specific {@link Message} converter strategy. + * + * @param the key type. + * @param the value type. * + * @author Gary Russell */ public interface MessageConverter { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java index da27a348a0..cca74f41da 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java @@ -27,6 +27,13 @@ import org.springframework.messaging.support.MessageBuilder; /** + * The Messaging specific {@link MessageConverter} implementation. + *

+ * Populates {@link KafkaHeaders} based on the {@link ConsumerRecord} onto the returned message. + * + * @param the key type. + * @param the value type. + * * @author Marius Bogoevici * @author Gary Russell */ diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java index aeddbeb510..aae8acb352 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java @@ -17,10 +17,9 @@ package org.springframework.kafka.listener; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; import static org.mockito.Matchers.anyLong; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import java.util.List; import java.util.Map; @@ -328,16 +327,17 @@ public void testConcurrencyWithPartitions() { }; ConsumerFactory cf = mock(ConsumerFactory.class); Consumer consumer = mock(Consumer.class); - when(cf.createConsumer()).thenReturn(consumer); - doAnswer(new Answer>() { + given(cf.createConsumer()).willReturn(consumer); + given(consumer.poll(anyLong())) + .willAnswer(new Answer>() { - @Override - public ConsumerRecords answer(InvocationOnMock invocation) throws Throwable { - Thread.sleep(100); - return null; - } + @Override + public ConsumerRecords answer(InvocationOnMock invocation) throws Throwable { + Thread.sleep(100); + return null; + } - }).when(consumer).poll(anyLong()); + }); ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer<>(cf, topic1PartitionS); container.setMessageListener(new MessageListener() { diff --git a/src/checkstyle/checkstyle.xml b/src/checkstyle/checkstyle.xml index c235458260..0b6ccaa997 100644 --- a/src/checkstyle/checkstyle.xml +++ b/src/checkstyle/checkstyle.xml @@ -82,6 +82,7 @@ org.hamcrest.CoreMatchers.*, org.hamcrest.Matchers.*, org.mockito.Mockito.*, + org.mockito.BDDMockito.*, org.mockito.Matchers.*, org.springframework.kafka.test.hamcrest.KafkaMatchers.*, org.springframework.kafka.test.assertj.KafkaConditions.*" /> @@ -100,31 +101,31 @@ - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + @@ -144,13 +145,13 @@ value="Line has leading space characters; indentation should be performed with tabs only." /> - - - - - - - + + + + + +