33from . import unittest
44
55from kafka import SimpleConsumer , KafkaConsumer , MultiProcessConsumer
6- from kafka .common import KafkaConfigurationError
6+ from kafka .common import (
7+ KafkaConfigurationError , FetchResponse ,
8+ FailedPayloadsError , OffsetAndMessage ,
9+ NotLeaderForPartitionError , UnknownTopicOrPartitionError
10+ )
11+
712
813class TestKafkaConsumer (unittest .TestCase ):
914 def test_non_integer_partitions (self ):
@@ -14,6 +19,7 @@ def test_broker_list_required(self):
1419 with self .assertRaises (KafkaConfigurationError ):
1520 KafkaConsumer ()
1621
22+
1723class TestMultiProcessConsumer (unittest .TestCase ):
1824 def test_partition_list (self ):
1925 client = MagicMock ()
@@ -22,3 +28,70 @@ def test_partition_list(self):
2228 consumer = MultiProcessConsumer (client , 'testing-group' , 'testing-topic' , partitions = partitions )
2329 self .assertEqual (fetch_last_known_offsets .call_args [0 ], (partitions ,) )
2430 self .assertEqual (client .get_partition_ids_for_topic .call_count , 0 ) # pylint: disable=no-member
31+
32+ def test_simple_consumer_failed_payloads (self ):
33+ client = MagicMock ()
34+ consumer = SimpleConsumer (client , group = None ,
35+ topic = 'topic' , partitions = [0 , 1 ],
36+ auto_commit = False )
37+
38+ def failed_payloads (payload ):
39+ return FailedPayloadsError (payload )
40+
41+ client .send_fetch_request .side_effect = self .fail_requests_factory (failed_payloads )
42+
43+ # This should not raise an exception
44+ consumer .get_messages (5 )
45+
46+ def test_simple_consumer_leader_change (self ):
47+ client = MagicMock ()
48+ consumer = SimpleConsumer (client , group = None ,
49+ topic = 'topic' , partitions = [0 , 1 ],
50+ auto_commit = False )
51+
52+ # Mock so that only the first request gets a valid response
53+ def not_leader (request ):
54+ return FetchResponse (request .topic , request .partition ,
55+ NotLeaderForPartitionError .errno , - 1 , ())
56+
57+ client .send_fetch_request .side_effect = self .fail_requests_factory (not_leader )
58+
59+ # This should not raise an exception
60+ consumer .get_messages (20 )
61+
62+ # client should have updated metadata
63+ self .assertGreaterEqual (client .reset_topic_metadata .call_count , 1 )
64+ self .assertGreaterEqual (client .load_metadata_for_topics .call_count , 1 )
65+
66+ def test_simple_consumer_unknown_topic_partition (self ):
67+ client = MagicMock ()
68+ consumer = SimpleConsumer (client , group = None ,
69+ topic = 'topic' , partitions = [0 , 1 ],
70+ auto_commit = False )
71+
72+ # Mock so that only the first request gets a valid response
73+ def unknown_topic_partition (request ):
74+ return FetchResponse (request .topic , request .partition ,
75+ UnknownTopicOrPartitionError .errno , - 1 , ())
76+
77+ client .send_fetch_request .side_effect = self .fail_requests_factory (unknown_topic_partition )
78+
79+ # This should not raise an exception
80+ with self .assertRaises (UnknownTopicOrPartitionError ):
81+ consumer .get_messages (20 )
82+
83+ @staticmethod
84+ def fail_requests_factory (error_factory ):
85+ # Mock so that only the first request gets a valid response
86+ def fail_requests (payloads , ** kwargs ):
87+ responses = [
88+ FetchResponse (payloads [0 ].topic , payloads [0 ].partition , 0 , 0 ,
89+ (OffsetAndMessage (
90+ payloads [0 ].offset + i ,
91+ "msg %d" % (payloads [0 ].offset + i ))
92+ for i in range (10 ))),
93+ ]
94+ for failure in payloads [1 :]:
95+ responses .append (error_factory (failure ))
96+ return responses
97+ return fail_requests
0 commit comments