@@ -86,30 +86,39 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
8686 client .send_produce_request (reqs ,
8787 acks = req_acks ,
8888 timeout = ack_timeout )
89+
8990 except FailedPayloadsError as ex :
90- failed_reqs = ex .args [0 ]
91- log .exception ("Failed payloads count %s" % len (failed_reqs ))
92-
93- # if no limit, retry all failed messages until success
94- if retries_limit is None :
95- reqs_to_retry = failed_reqs
96- # makes sense to check failed reqs only if we have a limit > 0
97- elif retries_limit > 0 :
98- for req in failed_reqs :
99- if retries_limit and req .retries < retries_limit :
100- updated_req = req ._replace (retries = req .retries + 1 )
101- reqs_to_retry .append (updated_req )
91+ log .warning ("Async producer send warning: failed payloads." )
92+ reqs_to_retry = filter_by_retries (ex .args [0 ], retries_limit )
93+
10294 except Exception as ex :
103- log .exception ("Unable to send message: %s" % type (ex ))
104- finally :
105- reqs = []
95+ log .error ("Async producer send exception: %s" % type (ex ))
96+ reqs_to_retry = filter_by_retries (reqs , retries_limit )
10697
98+ reqs = []
10799 if reqs_to_retry and retry_backoff :
108100 reqs = reqs_to_retry
109101 log .warning ("%s requests will be retried next call." % len (reqs ))
110102 time .sleep (float (retry_backoff ) / 1000 )
111103
112104
105+ def filter_by_retries (failed_reqs , retries_limit ):
106+ """ Get requests to retry using retries limit """
107+
108+ # if no limit, retry all failed messages until success
109+ if retries_limit is None :
110+ return failed_reqs
111+
112+ # makes sense to check failed reqs only if we have a limit > 0
113+ reqs_to_retry = []
114+ if retries_limit > 0 :
115+ for req in failed_reqs :
116+ if req .retries < retries_limit :
117+ updated_req = req ._replace (retries = req .retries + 1 )
118+ reqs_to_retry .append (updated_req )
119+ return reqs_to_retry
120+
121+
113122class Producer (object ):
114123 """
115124 Base class to be used by producers
0 commit comments