Skip to content

Conversation

@jordansissel
Copy link
Contributor

@jordansissel jordansissel commented Aug 31, 2017

Don't lose data!

  • New default retry behavior: Retry until successful
  • Now makes sure the data is in Kafka before completion.

Prior, the default was retries => 0 which means never retry.

The implications of this are that any fault (network failure, Kafka
restart, etc), could cause data loss.

This commit makes the following changes:

  • retries now has no default value (aka: nil)
  • Any >=0 value for retries will behave the same as it did before.

Slight difference in internal behavior in this patch -- We now no longer
ignore the Future returned by KafkaProducer.send(). We
send the whole batch of events and then wait for all of those operations
to complete. If any fail, we retry only the failed transmissions.

Prior to this patch, we would call send(), which is asynchronous, and
then acknowledge in the pipeline. This would cause data loss, even if
the PQ was enabled, under the following circumstances:

  1. Logstash send() to Kafka then returns -- indicating that the data is
    in Kafka, which was not true. This means we would ack the
    transmission to the PQ but Kafka may not have the data yet!
  2. Logstash crashes before the KafkaProducer client actually sends it to
    Kafka.

Fixes #149

Test Coverage:

  • Move specs to call newly-implemented multi_receive

This also required a few important changes to the specs:

  • Mocks (expect..to_receive) were not doing .and_call_original so
    method expectations were returning nil[1]
  • Old ssl setting is now security_protocol => "SSL"

[1] ProducerRecord.new was returning nil due to missing
.and_call_original, for exmaple.

@jordansissel
Copy link
Contributor Author

jordansissel commented Aug 31, 2017

TODO items:

  • Verify manually that this works
  • Add test coverage
  • Backport to plugin major branches 5.x and 6.x?

@jordansissel
Copy link
Contributor Author

The tests are failing for me locally but some of the failures are not due to this PR.

@jordansissel jordansissel changed the title [WIP] New default retry behavior: Retry until successful New default retry behavior: Retry until successful Sep 1, 2017
@jordansissel jordansissel force-pushed the issue/149 branch 2 times, most recently from fd5bae5 to d96bed3 Compare September 1, 2017 20:48
@jordansissel
Copy link
Contributor Author

Ok code done, tests written, docs updated. Ready for review!

* New default retry behavior: Retry until successful
* Now makes sure the data is in Kafka before completion.

Prior, the default was `retries => 0` which means never retry.

The implications of this are that any fault (network failure, Kafka
restart, etc), could cause data loss.

This commit makes the following changes:

* `retries` now has no default value (aka: nil)
* Any >=0 value for `retries` will behave the same as it did before.

Slight difference in internal behavior in this patch -- We now no longer
ignore the Future<RecordMetadata> returned by KafkaProducer.send(). We
send the whole batch of events and then wait for all of those operations
to complete. If any fail, we retry only the failed transmissions.

Prior to this patch, we would call `send()`, which is asynchronous, and
then acknowledge in the pipeline. This would cause data loss, even if
the PQ was enabled, under the following circumstances:

1) Logstash send() to Kafka then returns -- indicating that the data is
   in Kafka, which was not true. This means we would ack the
   transmission to the PQ but Kafka may not have the data yet!
2) Logstash crashes before the KafkaProducer client actually sends it to
   Kafka.

Fixes #149

Test Coverage:

* Move specs to call newly-implemented multi_receive

This also required a few important changes to the specs:

* Mocks (expect..to_receive) were not doing `.and_call_original` so
  method expectations were returning nil[1]
* Old `ssl` setting is now `security_protocol => "SSL"`

[1] ProducerRecord.new was returning `nil` due to missing
.and_call_original, for exmaple.
remaining -= 1
end

futures = batch.collect { |record| @producer.send(record) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jordansissel one problem I see here is this:
The default max.block.ms (timeout on a send that can block if either the Kafka client's output buffer is full or fetching metadata is blocked) is 60s.
So for Kafka outages taking more than max.block.ms + (max metadata age) we'll start loosing data won't we (we will only catch these way upstream and just move on the next batch right now I think)?
I think we should catch these and retry the send calls with a back-off on org.apache.kafka.common.errors.TimeoutException right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. Yeah, I didn't check what exceptions can be thrown here.

I'll add handling for the 3 listed here: https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I added coverage for the 3 exceptions thrown by KafkaProducer.send() and added test coverage for it as well.

@jordansissel
Copy link
Contributor Author

I left a TODO item to handle SerializationExceptions by DLQing them since I felt DLQ was out of scope for this PR.

@original-brownbear
Copy link
Contributor

@jordansissel LGTM :)

@allenmchan
Copy link

Didn't realize there was this limitation in the kafka output plugin. When are you guys planning to merge this and release it to public?

@guyboertje
Copy link

@jordansissel
Are there any reasons against a back-port to 5.x and 6.x?

/cc @ppf2

@elasticsearch-bot
Copy link

Jordan Sissel merged this into the following branches!

Branch Commits
master 1cb7b4d, 9978749

@jordansissel
Copy link
Contributor Author

Didn't realize there was this limitation in the kafka output plugin

It's a bug, not a feature.

@jordansissel
Copy link
Contributor Author

Are there any reasons against a back-port to 5.x and 6.x?

Agreed. There is no '6.x' branch, but I will make one.

@elasticsearch-bot
Copy link

Jordan Sissel merged this into the following branches!

Branch Commits
5.x aa0341a, 9d49f53

elasticsearch-bot pushed a commit that referenced this pull request Sep 28, 2017
* New default retry behavior: Retry until successful
* Now makes sure the data is in Kafka before completion.

Prior, the default was `retries => 0` which means never retry.

The implications of this are that any fault (network failure, Kafka
restart, etc), could cause data loss.

This commit makes the following changes:

* `retries` now has no default value (aka: nil)
* Any >=0 value for `retries` will behave the same as it did before.

Slight difference in internal behavior in this patch -- We now no longer
ignore the Future<RecordMetadata> returned by KafkaProducer.send(). We
send the whole batch of events and then wait for all of those operations
to complete. If any fail, we retry only the failed transmissions.

Prior to this patch, we would call `send()`, which is asynchronous, and
then acknowledge in the pipeline. This would cause data loss, even if
the PQ was enabled, under the following circumstances:

1) Logstash send() to Kafka then returns -- indicating that the data is
   in Kafka, which was not true. This means we would ack the
   transmission to the PQ but Kafka may not have the data yet!
2) Logstash crashes before the KafkaProducer client actually sends it to
   Kafka.

Fixes #149

Test Coverage:

* Move specs to call newly-implemented multi_receive

This also required a few important changes to the specs:

* Mocks (expect..to_receive) were not doing `.and_call_original` so
  method expectations were returning nil[1]
* Old `ssl` setting is now `security_protocol => "SSL"`

[1] ProducerRecord.new was returning `nil` due to missing
.and_call_original, for exmaple.

Fixes #151
elasticsearch-bot pushed a commit that referenced this pull request Sep 28, 2017
@elasticsearch-bot
Copy link

Jordan Sissel merged this into the following branches!

Branch Commits
6.x b09fd17, 3f8ff91

elasticsearch-bot pushed a commit that referenced this pull request Sep 28, 2017
* New default retry behavior: Retry until successful
* Now makes sure the data is in Kafka before completion.

Prior, the default was `retries => 0` which means never retry.

The implications of this are that any fault (network failure, Kafka
restart, etc), could cause data loss.

This commit makes the following changes:

* `retries` now has no default value (aka: nil)
* Any >=0 value for `retries` will behave the same as it did before.

Slight difference in internal behavior in this patch -- We now no longer
ignore the Future<RecordMetadata> returned by KafkaProducer.send(). We
send the whole batch of events and then wait for all of those operations
to complete. If any fail, we retry only the failed transmissions.

Prior to this patch, we would call `send()`, which is asynchronous, and
then acknowledge in the pipeline. This would cause data loss, even if
the PQ was enabled, under the following circumstances:

1) Logstash send() to Kafka then returns -- indicating that the data is
   in Kafka, which was not true. This means we would ack the
   transmission to the PQ but Kafka may not have the data yet!
2) Logstash crashes before the KafkaProducer client actually sends it to
   Kafka.

Fixes #149

Test Coverage:

* Move specs to call newly-implemented multi_receive

This also required a few important changes to the specs:

* Mocks (expect..to_receive) were not doing `.and_call_original` so
  method expectations were returning nil[1]
* Old `ssl` setting is now `security_protocol => "SSL"`

[1] ProducerRecord.new was returning `nil` due to missing
.and_call_original, for exmaple.

Fixes #151
elasticsearch-bot pushed a commit that referenced this pull request Sep 28, 2017
jordansissel added a commit that referenced this pull request Oct 2, 2017
jordansissel added a commit that referenced this pull request Oct 2, 2017
jordansissel added a commit that referenced this pull request Oct 2, 2017
elasticsearch-bot pushed a commit that referenced this pull request Oct 6, 2017
elasticsearch-bot pushed a commit that referenced this pull request Oct 6, 2017
elasticsearch-bot pushed a commit that referenced this pull request Oct 6, 2017
@ppf2 ppf2 mentioned this pull request Dec 11, 2017
@TiaraH
Copy link

TiaraH commented Mar 6, 2018

This fixed says that logstash failed to send file to kafka will retry till send success. But when logstash retrying, where are the files? Is there any path queue in logstash?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants