Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/reference/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,11 @@ The following constructors are available.
[source, java]
----
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
ContainerProperties containerProperties)

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties, TopicPartitionInitialOffset... topicPartitions)
ContainerProperties containerProperties,
TopicPartitionInitialOffset... topicPartitions)

----

Expand Down Expand Up @@ -183,7 +184,7 @@ The single constructor is similar to the first `KafkaListenerContainer` construc
[source, java]
----
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
ContainerProperties containerProperties)

----

Expand Down
13 changes: 10 additions & 3 deletions src/reference/asciidoc/quick-tour.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,17 @@ public void testAutoCommit() throws Exception {
logger.info("Stop auto");
}
----

private KafkaMessageListenerContainer<Integer, String> createContainer(ContainerProperties containerProps) {
[source, java]
----
private KafkaMessageListenerContainer<Integer, String> createContainer(
ContainerProperties containerProps) {
Map<String, Object> props = consumerProps();
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<Integer, String>(props);
new DefaultKafkaConsumerFactory<Integer, String>(props);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
}
Expand Down Expand Up @@ -182,7 +186,10 @@ public class Config {
}
}
----

[source, java]
----
public class Listener {
private final CountDownLatch latch1 = new CountDownLatch(1);
Expand Down
169 changes: 169 additions & 0 deletions src/reference/asciidoc/si-kafka.adoc
Original file line number Diff line number Diff line change
@@ -1,2 +1,171 @@
[[si-kafka]]
=== Spring Integration Kafka

==== Introduction

This documentation pertains to versions 2.0.0 and above; for documentation for earlier releases, see the https://github.com/spring-projects/spring-integration-kafka/blob/1.3.x/README.md[1.3.x README].

Spring Integration Kafka is now based on the http://projects.spring.io/spring-kafka/[Spring for Apache Kafka project].
It provides the following components:

- Outbound Channel Adapter
- Message-Driven Channel Adapter

These are discussed in the following sections.

[[si-outbound]]
==== Outbound Channel Adapter

The Outbound channel adapter is used to publish messages from a Spring Integration channel to Kafka topics.
The channel is defined in the application context and then wired into the application that sends messages to Kafka.
Sender applications can publish to Kafka via Spring Integration messages, which are internally converted
to Kafka messages by the outbound channel adapter, as follows: the payload of the Spring Integration message will be
used to populate the payload of the Kafka message, and (by default) the `kafka_messageKey` header of the Spring
Integration message will be used to populate the key of the Kafka message
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Period in the end of sentence.


The target topic and partition for publishing the message can be customized through the `kafka_topic`
and `kafka_partitionId` headers, respectively.

In addition, the `<int-kafka:outbound-channel-adapter>` provides the ability to extract the key, target topic, and
target partition by applying SpEL expressions on the outbound message. To that end, it supports the mutually exclusive
pairs of attributes `topic`/`topic-expression`, `message-key`/`message-key-expression`, and
`partition-id`/`partition-id-expression`, to allow the specification of `topic`,`message-key` and `partition-id`
respectively as static values on the adapter, or to dynamically evaluate their values at runtime against
the request message.

IMPORTANT: The `KafkaHeaders` interface (provided by `spring-kafka`) contains constants used for interacting with
headers.
The `messageKey` and `topic` default headers now require a `kafka_` prefix.
When migrating from an earlier version that used the old headers, you need to specify
`message-key-expression="headers.messageKey"` and `topic-expression="headers.topic"` on the
`<int-kafka:outbound-channel-adapter>`, or simply change the headers upstream to
the new headers from `KafkaHeaders` using a `<header-enricher>` or `MessageBuilder`.
Or, of course, configure them on the adapter using `topic` and `message-key` if you are using constant values.

NOTE : If the adapter is configured with a topic or message key (either with a constant or expression), those are used
and the corresponding header is ignored.
If you wish the header to override the configuration, you need to configure it in an expression, such as:

`topic-expression="headers.topic != null ? headers.topic : 'myTopic'"`.

The adapter requires a `KafkaTemplate`.

Here is an example of how the Kafka outbound channel adapter is configured with XML:

[source, xml]
----
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-template="template"
auto-startup="false"
channel="inputToKafka"
topic="foo"
message-key-expression="'bar'"
partition-id-expression="2">
</int-kafka:outbound-channel-adapter>
<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
... <!-- more producer properties -->
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
----

As you can see, the adapter requires a `KafkaTemplate` which, in turn, requires a suitably configured `KafkaProducerFactory`.

When using Java Configuration:

[source, java]
----
@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler handler() throws Exception {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setTopicExpression(new LiteralExpression("someTopic"));
handler.setMessageKeyExpression(new LiteralExpression("someKey"));
return handler;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
// set more properties
return new DefaultKafkaProducerFactory<>(props);
}
----

[[si-inbound]]
==== Message Driven Channel Adapter:

The `KafkaMessageDrivenChannelAdapter` (`<int-kafka:message-driven-channel-adapter>`) uses a `spring-kafka`
`KafkaMessageListenerContainer` or `ConcurrentListenerContainer`.

An example of xml configuration variant is shown here:

[source, xml]
----
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
channel="nullChannel"
error-channel="errorChannel" />
<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
...
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg name="topics" value="foo" />
</bean>
----

When using Java Configuration:

[source, java]
----
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
return kafkaMessageDrivenChannelAdapter;
}
@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
ContainerProperties properties = new ContainerProperties(this.topic);
// set more properties
return new KafkaMessageListenerContainer<>(consumerFactory(), properties);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
// set more properties
return new DefaultKafkaConsumerFactory<>(props);
}
----