Skip to content

Commit e132231

Browse files
committed
Don't lose data!
* New default retry behavior: Retry until successful * Now makes sure the data is in Kafka before completion. Prior, the default was `retries => 0` which means never retry. The implications of this are that any fault (network failure, Kafka restart, etc), could cause data loss. This commit makes the following changes: * `retries` now has no default value (aka: nil) * Any >=0 value for `retries` will behave the same as it did before. Slight difference in internal behavior in this patch -- We now no longer ignore the Future<RecordMetadata> returned by KafkaProducer.send(). We send the whole batch of events and then wait for all of those operations to complete. If any fail, we retry only the failed transmissions. Prior to this patch, we would call `send()`, which is asynchronous, and then acknowledge in the pipeline. This would cause data loss, even if the PQ was enabled, under the following circumstances: 1) Logstash send() to Kafka then returns -- indicating that the data is in Kafka, which was not true. This means we would ack the transmission to the PQ but Kafka may not have the data yet! 2) Logstash crashes before the KafkaProducer client actually sends it to Kafka. Fixes #149 Test Coverage: * Move specs to call newly-implemented multi_receive This also required a few important changes to the specs: * Mocks (expect..to_receive) were not doing `.and_call_original` so method expectations were returning nil[1] * Old `ssl` setting is now `security_protocol => "SSL"` [1] ProducerRecord.new was returning `nil` due to missing .and_call_original, for exmaple.
1 parent 7c9699d commit e132231

File tree

4 files changed

+159
-24
lines changed

4 files changed

+159
-24
lines changed

docs/index.asciidoc

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -291,10 +291,17 @@ retries are exhausted.
291291
===== `retries`
292292

293293
* Value type is <<number,number>>
294-
* Default value is `0`
294+
* There is no default value for this setting.
295+
296+
The default retry behavior is to retry until successful. To prevent data loss,
297+
the use of this setting is discouraged.
298+
299+
If you choose to set `retries`, a value greater than zero will cause the
300+
client to only retry a fixed number of times. This will result in data loss
301+
if a transport fault exists for longer than your retry count (network outage,
302+
Kafka down, etc).
295303

296-
Setting a value greater than zero will cause the client to
297-
resend any record whose send fails with a potentially transient error.
304+
A value less than zero is a configuration error.
298305

299306
[id="plugins-{type}s-{plugin}-retry_backoff_ms"]
300307
===== `retry_backoff_ms`

lib/logstash/outputs/kafka.rb

Lines changed: 86 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,15 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base
109109
# elapses the client will resend the request if necessary or fail the request if
110110
# retries are exhausted.
111111
config :request_timeout_ms, :validate => :string
112-
# Setting a value greater than zero will cause the client to
113-
# resend any record whose send fails with a potentially transient error.
114-
config :retries, :validate => :number, :default => 0
112+
# The default retry behavior is to retry until successful. To prevent data loss,
113+
# the use of this setting is discouraged.
114+
#
115+
# If you choose to set `retries`, a value greater than zero will cause the
116+
# client to only retry a fixed number of times. This will result in data loss
117+
# if a transient error outlasts your retry count.
118+
#
119+
# A value less than zero is a configuration error.
120+
config :retries, :validate => :number
115121
# The amount of time to wait before attempting to retry a failed produce request to a given topic partition.
116122
config :retry_backoff_ms, :validate => :number, :default => 100
117123
# The size of the TCP send buffer to use when sending data.
@@ -170,6 +176,17 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base
170176

171177
public
172178
def register
179+
@thread_batch_map = Concurrent::Hash.new
180+
181+
if !@retries.nil?
182+
if @retries < 0
183+
raise ConfigurationError, "A negative retry count (#{@retries}) is not valid. Must be a value >= 0"
184+
end
185+
186+
@logger.warn("Kafka output is configured with finite retry. This instructs Logstash to LOSE DATA after a set number of send attempts fails. If you do not want to lose data if Kafka is down, then you must remove the retry setting.", :retries => @retries)
187+
end
188+
189+
173190
@producer = create_producer
174191
@codec.on_event do |event, data|
175192
begin
@@ -178,22 +195,80 @@ def register
178195
else
179196
record = org.apache.kafka.clients.producer.ProducerRecord.new(event.sprintf(@topic_id), event.sprintf(@message_key), data)
180197
end
181-
@producer.send(record)
198+
prepare(record)
182199
rescue LogStash::ShutdownSignal
183200
@logger.debug('Kafka producer got shutdown signal')
184201
rescue => e
185202
@logger.warn('kafka producer threw exception, restarting',
186203
:exception => e)
187204
end
188205
end
189-
190206
end # def register
191207

192-
def receive(event)
193-
if event == LogStash::SHUTDOWN
194-
return
208+
def prepare(record)
209+
# This output is threadsafe, so we need to keep a batch per thread.
210+
@thread_batch_map[Thread.current].add(record)
211+
end
212+
213+
def multi_receive(events)
214+
t = Thread.current
215+
if !@thread_batch_map.include?(t)
216+
@thread_batch_map[t] = java.util.ArrayList.new(events.size)
217+
end
218+
219+
events.each do |event|
220+
break if event == LogStash::SHUTDOWN
221+
@codec.encode(event)
222+
end
223+
224+
batch = @thread_batch_map[t]
225+
if batch.any?
226+
retrying_send(batch)
227+
batch.clear
195228
end
196-
@codec.encode(event)
229+
end
230+
231+
def retrying_send(batch)
232+
remaining = @retries;
233+
234+
while batch.any?
235+
if !remaining.nil?
236+
if remaining < 0
237+
# TODO(sissel): Offer to DLQ? Then again, if it's a transient fault,
238+
# DLQing would make things worse (you dlq data that would be successful
239+
# after the fault is repaired)
240+
logger.info("Exhausted user-configured retry count when sending to Kafka. Dropping these events.",
241+
:max_retries => @retries, :drop_count => batch.count)
242+
break
243+
end
244+
245+
remaining -= 1
246+
end
247+
248+
futures = batch.collect { |record| @producer.send(record) }
249+
250+
failures = []
251+
futures.each_with_index do |future, i|
252+
begin
253+
result = future.get()
254+
rescue => e
255+
# TODO(sissel): Add metric to count failures, possibly by exception type.
256+
logger.debug? && logger.debug("KafkaProducer.send() failed: #{e}", :exception => e);
257+
failures << batch[i]
258+
end
259+
end
260+
261+
# No failures? Cool. Let's move on.
262+
break if failures.empty?
263+
264+
# Otherwise, retry with any failed transmissions
265+
batch = failures
266+
delay = 1.0 / @retry_backoff_ms
267+
logger.info("Sending batch to Kafka failed. Will retry after a delay.", :batch_size => batch.size,
268+
:failures => failures.size, :sleep => delay);
269+
sleep(delay)
270+
end
271+
197272
end
198273

199274
def close
@@ -217,8 +292,8 @@ def create_producer
217292
props.put(kafka::MAX_REQUEST_SIZE_CONFIG, max_request_size.to_s)
218293
props.put(kafka::RECONNECT_BACKOFF_MS_CONFIG, reconnect_backoff_ms) unless reconnect_backoff_ms.nil?
219294
props.put(kafka::REQUEST_TIMEOUT_MS_CONFIG, request_timeout_ms) unless request_timeout_ms.nil?
220-
props.put(kafka::RETRIES_CONFIG, retries.to_s)
221-
props.put(kafka::RETRY_BACKOFF_MS_CONFIG, retry_backoff_ms.to_s)
295+
props.put(kafka::RETRIES_CONFIG, retries.to_s) unless retries.nil?
296+
props.put(kafka::RETRY_BACKOFF_MS_CONFIG, retry_backoff_ms.to_s)
222297
props.put(kafka::SEND_BUFFER_CONFIG, send_buffer_bytes.to_s)
223298
props.put(kafka::VALUE_SERIALIZER_CLASS_CONFIG, value_serializer)
224299

spec/integration/outputs/kafka_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@
157157
def load_kafka_data(config)
158158
kafka = LogStash::Outputs::Kafka.new(config)
159159
kafka.register
160-
num_events.times do kafka.receive(event) end
160+
kafka.multi_receive(num_events.times.collect { event })
161161
kafka.close
162162
end
163163

spec/unit/outputs/kafka_spec.rb

Lines changed: 62 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,34 +25,87 @@
2525
context 'when outputting messages' do
2626
it 'should send logstash event to kafka broker' do
2727
expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
28-
.with(an_instance_of(org.apache.kafka.clients.producer.ProducerRecord))
28+
.with(an_instance_of(org.apache.kafka.clients.producer.ProducerRecord)).and_call_original
2929
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config)
3030
kafka.register
31-
kafka.receive(event)
31+
kafka.multi_receive([event])
3232
end
3333

3434
it 'should support Event#sprintf placeholders in topic_id' do
3535
topic_field = 'topic_name'
3636
expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new)
37-
.with("my_topic", event.to_s)
38-
expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
37+
.with("my_topic", event.to_s).and_call_original
38+
expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send).and_call_original
3939
kafka = LogStash::Outputs::Kafka.new({'topic_id' => "%{#{topic_field}}"})
4040
kafka.register
41-
kafka.receive(event)
41+
kafka.multi_receive([event])
4242
end
4343

4444
it 'should support field referenced message_keys' do
4545
expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new)
46-
.with("test", "172.0.0.1", event.to_s)
47-
expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
46+
.with("test", "172.0.0.1", event.to_s).and_call_original
47+
expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send).and_call_original
4848
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_key" => "%{host}"}))
4949
kafka.register
50-
kafka.receive(event)
50+
kafka.multi_receive([event])
5151
end
5252

5353
it 'should raise config error when truststore location is not set and ssl is enabled' do
54-
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"ssl" => "true"}))
54+
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("security_protocol" => "SSL"))
5555
expect { kafka.register }.to raise_error(LogStash::ConfigurationError, /ssl_truststore_location must be set when SSL is enabled/)
5656
end
5757
end
58+
59+
context "when a send fails" do
60+
context "and the default retries behavior is used" do
61+
# Fail this many times and then finally succeed.
62+
let(:failcount) { (rand * 10).to_i }
63+
64+
# Expect KafkaProducer.send() to get called again after every failure, plus the successful one.
65+
let(:sendcount) { failcount + 1 }
66+
67+
it "should retry until successful" do
68+
count = 0;
69+
70+
expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
71+
.exactly(sendcount).times
72+
.and_wrap_original do |m, *args|
73+
if count < failcount
74+
p count => failcount
75+
count += 1
76+
# inject some failures.
77+
78+
# Return a custom Future that will raise an exception to simulate a Kafka send() problem.
79+
future = java.util.concurrent.FutureTask.new(java.util.concurrent.Callable.new { raise "Failed" })
80+
future.run
81+
future
82+
else
83+
m.call(*args)
84+
end
85+
end
86+
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config)
87+
kafka.register
88+
kafka.multi_receive([event])
89+
end
90+
end
91+
92+
context "and when retries is set by the user" do
93+
let(:retries) { (rand * 10).to_i }
94+
let(:max_sends) { retries + 1 }
95+
96+
it "should give up after retries are exhausted" do
97+
expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
98+
.at_most(max_sends).times
99+
.and_wrap_original do |m, *args|
100+
# Always fail.
101+
future = java.util.concurrent.FutureTask.new(java.util.concurrent.Callable.new { raise "Failed" })
102+
future.run
103+
future
104+
end
105+
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries))
106+
kafka.register
107+
kafka.multi_receive([event])
108+
end
109+
end
110+
end
58111
end

0 commit comments

Comments
 (0)