@@ -138,7 +138,8 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
138138 Arguments:
139139
140140 payloads: list of object-like entities with a topic (str) and
141- partition (int) attribute
141+ partition (int) attribute; payloads with duplicate topic-partitions
142+ are not supported.
142143
143144 encode_fn: a method to encode the list of payloads to a request body,
144145 must accept client_id, correlation_id, and payloads as
@@ -152,6 +153,10 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
152153
153154 List of response objects in the same order as the supplied payloads
154155 """
156+ # encoders / decoders do not maintain ordering currently
157+ # so we need to keep this so we can rebuild order before returning
158+ original_ordering = [(p .topic , p .partition ) for p in payloads ]
159+
155160 # Group the requests by topic+partition
156161 brokers_for_payloads = []
157162 payloads_by_broker = collections .defaultdict (list )
@@ -165,7 +170,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
165170
166171 # For each broker, send the list of request payloads
167172 # and collect the responses and errors
168- responses_by_broker = collections . defaultdict ( list )
173+ responses = {}
169174 broker_failures = []
170175 for broker , payloads in payloads_by_broker .items ():
171176 requestId = self ._next_id ()
@@ -184,7 +189,8 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
184189 'to server %s: %s' , requestId , broker , e )
185190
186191 for payload in payloads :
187- responses_by_broker [broker ].append (FailedPayloadsError (payload ))
192+ topic_partition = (payload .topic , payload .partition )
193+ responses [topic_partition ] = FailedPayloadsError (payload )
188194
189195 # No exception, try to get response
190196 else :
@@ -196,7 +202,8 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
196202 log .debug ('Request %s does not expect a response '
197203 '(skipping conn.recv)' , requestId )
198204 for payload in payloads :
199- responses_by_broker [broker ].append (None )
205+ topic_partition = (payload .topic , payload .partition )
206+ responses [topic_partition ] = None
200207 continue
201208
202209 try :
@@ -208,12 +215,17 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
208215 requestId , broker , e )
209216
210217 for payload in payloads :
211- responses_by_broker [broker ].append (FailedPayloadsError (payload ))
218+ topic_partition = (payload .topic , payload .partition )
219+ responses [topic_partition ] = FailedPayloadsError (payload )
212220
213221 else :
222+ _resps = []
214223 for payload_response in decoder_fn (response ):
215- responses_by_broker [broker ].append (payload_response )
216- log .debug ('Response %s: %s' , requestId , responses_by_broker [broker ])
224+ topic_partition = (payload_response .topic ,
225+ payload_response .partition )
226+ responses [topic_partition ] = payload_response
227+ _resps .append (payload_response )
228+ log .debug ('Response %s: %s' , requestId , _resps )
217229
218230 # Connection errors generally mean stale metadata
219231 # although sometimes it means incorrect api request
@@ -223,9 +235,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
223235 self .reset_all_metadata ()
224236
225237 # Return responses in the same order as provided
226- responses_by_payload = [responses_by_broker [broker ].pop (0 )
227- for broker in brokers_for_payloads ]
228- return responses_by_payload
238+ return [responses [tp ] for tp in original_ordering ]
229239
230240 def __repr__ (self ):
231241 return '<KafkaClient client_id=%s>' % (self .client_id )
0 commit comments