Skip to content

Commit 8916dbc

Browse files
committed
Check response.error for async producer
1 parent 4474a50 commit 8916dbc

File tree

3 files changed

+27
-21
lines changed

3 files changed

+27
-21
lines changed

kafka/producer/base.py

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
from kafka.common import (
1818
ProduceRequest, TopicAndPartition, RetryOptions,
19-
UnsupportedCodecError, FailedPayloadsError,
20-
RequestTimedOutError, AsyncProducerQueueFull
19+
kafka_errors, UnsupportedCodecError, FailedPayloadsError,
20+
RequestTimedOutError, AsyncProducerQueueFull, UnknownError
2121
)
2222
from kafka.common import (
2323
RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES)
@@ -89,41 +89,46 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
8989
if not reqs:
9090
continue
9191

92-
reqs_to_retry, error_type = [], None
92+
reqs_to_retry, error_cls = [], None
93+
do_backoff, do_refresh = False, False
94+
95+
def _handle_error(error_cls, reqs, all_retries):
96+
if ((error_cls == RequestTimedOutError and
97+
retry_options.retry_on_timeouts) or
98+
error_cls in RETRY_ERROR_TYPES):
99+
all_retries += reqs
100+
if error_cls in RETRY_BACKOFF_ERROR_TYPES:
101+
do_backoff += reqs
102+
if error_cls in RETRY_REFRESH_ERROR_TYPES:
103+
do_refresh = True
93104

94105
try:
95106
reply = client.send_produce_request(reqs.keys(),
96107
acks=req_acks,
97108
timeout=ack_timeout,
98109
fail_on_error=False)
99-
reqs_to_retry = [req for broker_responses in reply
100-
for response in broker_responses
101-
for req in response.failed_payloads
102-
if isinstance(response, FailedPayloadsError)]
103-
if reqs_to_retry:
104-
error_type = FailedPayloadsError
105-
106-
except RequestTimedOutError:
107-
error_type = RequestTimedOutError
108-
if retry_options.retry_on_timeouts:
109-
reqs_to_retry = reqs.keys()
110+
for i, response in enumerate(reply):
111+
if isinstance(response, FailedPayloadsError):
112+
_handle_error(FailedPayloadsError, response.failed_payloads, reqs_to_retry)
113+
elif isinstance(response, ProduceResponse) and response.error:
114+
error_cls = kafka_errors.get(response.error, UnknownError)
115+
_handle_error(error_cls, [reqs.keys()[i]], reqs_to_retry)
110116

111117
except Exception as ex:
112-
error_type = type(ex)
113-
if type(ex) in RETRY_ERROR_TYPES:
114-
reqs_to_retry = reqs.keys()
118+
error_cls = kafka_errors.get(type(ex), UnknownError)
119+
_handle_error(error_cls, reqs.keys(), reqs_to_retry)
115120

116121
if not reqs_to_retry:
117122
reqs = {}
118123
continue
119124

120125
# doing backoff before next retry
121-
if error_type in RETRY_BACKOFF_ERROR_TYPES and retry_options.backoff_ms:
126+
if do_backoff and retry_options.backoff_ms:
122127
log.info("Doing backoff for %s(ms)." % retry_options.backoff_ms)
123128
time.sleep(float(retry_options.backoff_ms) / 1000)
124129

125130
# refresh topic metadata before next retry
126-
if error_type in RETRY_REFRESH_ERROR_TYPES:
131+
if do_refresh:
127132
client.load_metadata_for_topics()
128133

129134
reqs = dict((key, count + 1) for (key, count) in reqs.items()

test/test_producer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ def test_first_send_failed(self):
143143
def send_side_effect(reqs, *args, **kwargs):
144144
if self.client.is_first_time:
145145
self.client.is_first_time = False
146-
return [[FailedPayloadsError(reqs)]]
146+
return [FailedPayloadsError(reqs)]
147147
return []
148148

149149
self.client.send_produce_request.side_effect = send_side_effect
@@ -165,7 +165,7 @@ def test_with_limited_retries(self):
165165
self.queue.put((TopicAndPartition("test", i), "msg %i" % i, "key %i" % i))
166166

167167
def send_side_effect(reqs, *args, **kwargs):
168-
return [[FailedPayloadsError(reqs)]]
168+
return [FailedPayloadsError(reqs)]
169169

170170
self.client.send_produce_request.side_effect = send_side_effect
171171

tox.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ commands =
1414
nosetests {posargs:-v --with-id --id-file={envdir}/.noseids --with-timer --timer-top-n 10 --with-coverage --cover-erase --cover-package kafka}
1515
setenv =
1616
PROJECT_ROOT = {toxinidir}
17+
passenv = KAFKA_VERSION
1718

1819
[testenv:py33]
1920
deps =

0 commit comments

Comments
 (0)