Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.hamcrest.Matcher;

/**
* The Hamcrest {@link Matcher}s utilities.
*
* @author Gary Russell
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
import scala.collection.Set;

/**
* The {@link KafkaRule} implementation for the embedded Kafka Broker and Zookeeper.
*
* @author Marius Bogoevici
* @author Artem Bilan
* @author Gary Russell
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.springframework.kafka.test.rule.KafkaEmbedded;

/**
* The Kafka specific testing utilities.
*
* @author Gary Russell
*
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@
* factory or for fine-grained control over endpoints registration. See
* {@link EnableKafka} Javadoc for complete usage details.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Stephane Nicoll
* @author Juergen Hoeller
* @author Gary Russell
Expand Down Expand Up @@ -485,7 +488,8 @@ private Object resolveExpression(String value) {

/**
* Resolve the specified value if possible.
*
* @param value the value to resolve
* @return the resolved value
* @see ConfigurableBeanFactory#resolveEmbeddedValue
*/
private String resolve(String value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@
public @interface TopicPartition {

/**
* The topic to listen on.
* @return the topic to listen on. Property place holders
* and SpEL expressions are supported, which must resolve
* to a String.
*/
String topic();

/**
* The partitions within the topic.
* @return the partitions within the topic. Property place
* holders and SpEL expressions are supported, which must
* resolve to Integers (or Strings that can be parsed as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
/**
* Base {@link KafkaListenerContainerFactory} for Spring's base container implementation.
*
* @param <C> the {@link AbstractMessageListenerContainer} implementation type.
* @param <K> the key type.
* @param <V> the value type.
*
* @author Stephane Nicoll
*
* @see AbstractMessageListenerContainer
Expand All @@ -51,6 +55,7 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
private Long pollTimeout;

/**
* Specify a {@link ConsumerFactory} to use.
* @param consumerFactory The consumer factory.
*/
public void setConsumerFactory(ConsumerFactory<K, V> consumerFactory) {
Expand All @@ -62,6 +67,7 @@ public ConsumerFactory<K, V> getConsumerFactory() {
}

/**
* Specify an {@link ErrorHandler} to use.
* @param errorHandler The error handler.
* @see AbstractMessageListenerContainer#setErrorHandler(ErrorHandler)
*/
Expand All @@ -70,6 +76,7 @@ public void setErrorHandler(ErrorHandler errorHandler) {
}

/**
* Specify an {@link Executor} to use.
* @param taskExecutor the {@link Executor} to use.
* @see AbstractKafkaListenerContainerFactory#setTaskExecutor
*/
Expand All @@ -78,6 +85,7 @@ public void setTaskExecutor(Executor taskExecutor) {
}

/**
* Specify an {@code autoStartup boolean} flag.
* @param autoStartup true for auto startup.
* @see AbstractMessageListenerContainer#setAutoStartup(boolean)
*/
Expand All @@ -86,6 +94,7 @@ public void setAutoStartup(Boolean autoStartup) {
}

/**
* Specify a {@code phase} to use.
* @param phase The phase.
* @see AbstractMessageListenerContainer#setPhase(int)
*/
Expand All @@ -94,6 +103,7 @@ public void setPhase(int phase) {
}

/**
* Specify an {@code ackCount} to use.
* @param ackCount the ack count.
* @see AbstractMessageListenerContainer#setAckCount(int)
*/
Expand All @@ -102,6 +112,7 @@ public void setAckCount(Integer ackCount) {
}

/**
* Specify an {@link AckMode} to use.
* @param ackMode the ack mode.
* @see AbstractMessageListenerContainer#setAckMode(AckMode)
*/
Expand All @@ -110,6 +121,7 @@ public void setAckMode(AckMode ackMode) {
}

/**
* Specify a {@code pollTimeout} to use.
* @param pollTimeout the poll timeout
* @see AbstractMessageListenerContainer#setPollTimeout(long)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@
import org.springframework.util.Assert;

/**
* Base model for a Kafka listener endpoint
* Base model for a Kafka listener endpoint.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Stephane Nicoll
* @author Gary Russell
Expand Down Expand Up @@ -95,8 +98,7 @@ public String getId() {
}

/**
* Set the topics to use. Either these or 'topicPattern'
* or 'topicPartitions'
* Set the topics to use. Either these or 'topicPattern' or 'topicPartitions'
* should be provided, but not a mixture.
* @param topics to set.
* @see #setTopicPartitions(TopicPartition...)
Expand All @@ -109,6 +111,7 @@ public void setTopics(String... topics) {
}

/**
* Return the topics for this endpoint.
* @return the topics for this endpoint.
*/
@Override
Expand All @@ -131,6 +134,7 @@ public void setTopicPartitions(TopicPartition... topicPartitions) {
}

/**
* Return the topicPartitions for this endpoint.
* @return the topicPartitions for this endpoint.
*/
@Override
Expand All @@ -150,6 +154,7 @@ public void setTopicPattern(Pattern topicPattern) {
}

/**
* Return the topicPattern for this endpoint.
* @return the topicPattern for this endpoint.
*/
@Override
Expand Down Expand Up @@ -207,6 +212,7 @@ private void setupMessageListener(MessageListenerContainer container) {
}

/**
* Return a description for this endpoint.
* @return a description for this endpoint.
* <p>Available to subclasses, for inclusion in their {@code toString()} result.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@

package org.springframework.kafka.config;

import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerContainer;

/**
* Factory of {@link MessageListenerContainer} based on a
* {@link KafkaListenerEndpoint} definition.
*
* @param <C> the {@link AbstractMessageListenerContainer} implementation type.
*
* @author Stephane Nicoll
*
* @see KafkaListenerEndpoint
*/
public interface KafkaListenerContainerFactory<C extends MessageListenerContainer> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
public interface KafkaListenerEndpoint {

/**
* Return the id of this endpoint.
* @return the id of this endpoint. The id can be further qualified
* when the endpoint is resolved against its actual listener
* container.
Expand All @@ -42,21 +43,25 @@ public interface KafkaListenerEndpoint {
String getId();

/**
* Return the group of this endpoint or null if not in a group.
* @return the group of this endpoint or null if not in a group.
*/
String getGroup();

/**
* Return the topics for this endpoint.
* @return the topics for this endpoint.
*/
Collection<String> getTopics();

/**
* Return the topicPartitions for this endpoint.
* @return the topicPartitions for this endpoint.
*/
Collection<TopicPartition> getTopicPartitions();

/**
* Return the topicPattern for this endpoint.
* @return the topicPattern for this endpoint.
*/
Pattern getTopicPattern();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public void setEndpointRegistry(KafkaListenerEndpointRegistry endpointRegistry)
}

/**
* Return the {@link KafkaListenerEndpointRegistry} instance for this
* registrar, may be {@code null}.
* @return the {@link KafkaListenerEndpointRegistry} instance for this
* registrar, may be {@code null}.
*/
Expand All @@ -84,6 +86,7 @@ public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory kafkaHand
}

/**
* Return the custom {@link MessageHandlerMethodFactory} to use, if any.
* @return the custom {@link MessageHandlerMethodFactory} to use, if any.
*/
public MessageHandlerMethodFactory getMessageHandlerMethodFactory() {
Expand Down Expand Up @@ -201,7 +204,8 @@ private static final class KafkaListenerEndpointDescriptor {

private final KafkaListenerContainerFactory<?> containerFactory;

private KafkaListenerEndpointDescriptor(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> containerFactory) {
private KafkaListenerEndpointDescriptor(KafkaListenerEndpoint endpoint,
KafkaListenerContainerFactory<?> containerFactory) {
this.endpoint = endpoint;
this.containerFactory = containerFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public Set<String> getListenerContainerIds() {
}

/**
* Return the managed {@link MessageListenerContainer} instance(s).
* @return the managed {@link MessageListenerContainer} instance(s).
*/
public Collection<MessageListenerContainer> getListenerContainers() {
Expand Down Expand Up @@ -262,6 +263,7 @@ public boolean isRunning() {
/**
* Start the specified {@link MessageListenerContainer} if it should be started
* on startup.
* @param listenerContainer the listener container to start.
* @see MessageListenerContainer#isAutoStartup()
*/
private static void startIfNecessary(MessageListenerContainer listenerContainer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
* A {@link KafkaListenerEndpoint} providing the method to invoke to process
* an incoming message for this endpoint.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Stephane Nicoll
* @author Artem Bilan
* @author Gary Russell
Expand Down Expand Up @@ -77,6 +80,7 @@ public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory messageHa
}

/**
* Return the {@link MessageHandlerMethodFactory}.
* @return the messageHandlerMethodFactory
*/
protected MessageHandlerMethodFactory getMessageHandlerMethodFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,16 @@
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;

/**
* The {@link MethodKafkaListenerEndpoint} extension for several POJO methods
* based on the {@link org.springframework.kafka.annotation.KafkaHandler}.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Gary Russell
*
* @see org.springframework.kafka.annotation.KafkaHandler
* @see DelegatingInvocableHandler
*/
public class MultiMethodKafkaListenerEndpoint<K, V> extends MethodKafkaListenerEndpoint<K, V> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
/**
* A {@link KafkaListenerContainerFactory} implementation to build a regular
* {@link ConcurrentMessageListenerContainer}.
*
* <p>This should be the default for most users and a good transition paths
* <p>
* This should be the default for most users and a good transition paths
* for those that are used to build such container definition manually.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Stephane Nicoll
* @author Gary Russell
* @author Artem Bilan
Expand All @@ -41,6 +44,7 @@ public class SimpleKafkaListenerContainerFactory<K, V>
private Long recentOffset;

/**
* Specify the container concurrency.
* @param concurrency the number of consumers to create.
* @see ConcurrentMessageListenerContainer#setConcurrency(int)
*/
Expand All @@ -49,6 +53,7 @@ public void setConcurrency(Integer concurrency) {
}

/**
* Specify the offset lag from the end of commit.
* @param recentOffset the recent offset.
* @see ConcurrentMessageListenerContainer#setRecentOffset(long)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
* A {@link KafkaListenerEndpoint} simply providing the {@link MessageListener} to
* invoke to process an incoming message for this endpoint.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Stephane Nicoll
* @author Gary Russell
*/
Expand All @@ -42,6 +45,8 @@ public void setMessageListener(MessageListener<K, V> messageListener) {
}

/**
* Return the {@link MessageListener} to invoke when a message matching
* the endpoint is received.
* @return the {@link MessageListener} to invoke when a message matching
* the endpoint is received.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@
import org.apache.kafka.clients.consumer.Consumer;

/**
* @author Gary Russell
* The strategy to produce a {@link Consumer} instance(s).
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Gary Russell
*/
public interface ConsumerFactory<K, V> {

Expand Down
Loading