11Usage
22=====
33
4- High level
5- ----------
4+ SimpleProducer
5+ --------------
66
77.. code :: python
88
9- from kafka import SimpleProducer, KafkaClient, KafkaConsumer
9+ from kafka import SimpleProducer, KafkaClient
1010
1111 # To send messages synchronously
1212 kafka = KafkaClient(" localhost:9092" )
@@ -51,17 +51,6 @@ High level
5151 batch_send_every_n = 20 ,
5252 batch_send_every_t = 60 )
5353
54- # To consume messages
55- consumer = KafkaConsumer(" my-topic" , group_id = " my_group" ,
56- metadata_broker_list = [" localhost:9092" ])
57- for message in consumer:
58- # message is raw byte string -- decode if necessary!
59- # e.g., for unicode: `message.decode('utf-8')`
60- print (message)
61-
62- kafka.close()
63-
64-
6554 Keyed messages
6655--------------
6756
@@ -80,6 +69,92 @@ Keyed messages
8069 producer = KeyedProducer(kafka, partitioner = RoundRobinPartitioner)
8170
8271
72+
73+ KafkaConsumer
74+ -------------
75+
76+ .. code :: python
77+
78+ from kafka import KafkaConsumer
79+
80+ # To consume messages
81+ consumer = KafkaConsumer(" my-topic" ,
82+ group_id = " my_group" ,
83+ bootstrap_servers = [" localhost:9092" ])
84+ for message in consumer:
85+ # message value is raw byte string -- decode if necessary!
86+ # e.g., for unicode: `message.value.decode('utf-8')`
87+ print (" %s :%d :%d : key=%s value=%s " % (message.topic, message.partition,
88+ message.offset, message.key,
89+ message.value))
90+
91+ kafka.close()
92+
93+
94+ messages (m) are namedtuples with attributes:
95+
96+ * `m.topic `: topic name (str)
97+ * `m.partition `: partition number (int)
98+ * `m.offset `: message offset on topic-partition log (int)
99+ * `m.key `: key (bytes - can be None)
100+ * `m.value `: message (output of deserializer_class - default is raw bytes)
101+
102+
103+ .. code :: python
104+
105+ from kafka import KafkaConsumer
106+
107+ # more advanced consumer -- multiple topics w/ auto commit offset
108+ # management
109+ consumer = KafkaConsumer(' topic1' , ' topic2' ,
110+ bootstrap_servers = [' localhost:9092' ],
111+ group_id = ' my_consumer_group' ,
112+ auto_commit_enable = True ,
113+ auto_commit_interval_ms = 30 * 1000 ,
114+ auto_offset_reset = ' smallest' )
115+
116+ # Infinite iteration
117+ for m in consumer:
118+ do_some_work(m)
119+
120+ # Mark this message as fully consumed
121+ # so it can be included in the next commit
122+ #
123+ # **messages that are not marked w/ task_done currently do not commit!
124+ kafka.task_done(m)
125+
126+ # If auto_commit_enable is False, remember to commit() periodically
127+ kafka.commit()
128+
129+ # Batch process interface
130+ while True :
131+ for m in kafka.fetch_messages():
132+ process_message(m)
133+ kafka.task_done(m)
134+
135+
136+ Configuration settings can be passed to constructor,
137+ otherwise defaults will be used:
138+
139+ .. code :: python
140+
141+ client_id= ' kafka.consumer.kafka' ,
142+ group_id= None ,
143+ fetch_message_max_bytes= 1024 * 1024 ,
144+ fetch_min_bytes= 1 ,
145+ fetch_wait_max_ms= 100 ,
146+ refresh_leader_backoff_ms= 200 ,
147+ bootstrap_servers= [],
148+ socket_timeout_ms= 30 * 1000 ,
149+ auto_offset_reset= ' largest' ,
150+ deserializer_class= lambda msg : msg,
151+ auto_commit_enable= False ,
152+ auto_commit_interval_ms= 60 * 1000 ,
153+ consumer_timeout_ms= - 1
154+
155+ Configuration parameters are described in more detail at
156+ http:// kafka.apache.org/ documentation.html# highlevelconsumerapi
157+
83158 Multiprocess consumer
84159---------------------
85160
0 commit comments