-
Notifications
You must be signed in to change notification settings - Fork 76
Open
Description
Since Kafka 0.10.0.0, it is now possible to include a timestamp in Kafka records.
It would be great if it was possible to also specify this in the kafka output plugin with the message_timestamp setting (e.g.). The value of that setting could either be:
- the name of a field (e.g.
"@timestamp") whose value can be resolved to a long value - a sprintf-style field (e.g.
"%{second_timestamp}000") whose interpolated value can be resolved to a long value
A sample configuration would look like this:
input {
stdin {}
}
output {
kafka {
bootstrap_servers => "localhost:9092"
topic_id => "test"
message_key => "%{key_field}"
message_timestamp => "@timestamp"
# or like this
# message_timestamp => "%{@second_timestamp}000"
}
}
The record would then need to be created using the full-fledge ProducerRecord constructor:
msg_ts = get_timestamp_value_and_coerce_to_long(@message_timestamp) unless @message_timestamp.nil?
record = org.apache.kafka.clients.producer.ProducerRecord.new(event.sprintf(@topic_id), null, msg_ts, event.sprintf(@message_key), data)
henridf, jinleileiking, cgiraldo, neerav-salaria-guavus and 0x1346614
Metadata
Metadata
Assignees
Labels
No labels