Select Git revision
Cell Magics.ipynb
consumer.py 1.35 KiB
"""consume kafka messages."""
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
import logging
if __name__ == '__main__':
logging.getLogger().setLevel('INFO')
logging.info('Starting Kafka Consumer')
consumer = None
tries = 1
while not consumer:
logging.info('{}. try'.format(tries))
try:
consumer = KafkaConsumer('swift',
bootstrap_servers='kafka:9092',
client_id='swift-kafka-consumer',
key_deserializer=lambda k: k.decode('utf-8'),
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
except KafkaError:
logging.info('Currently something wrong with Kafka, retrying...')
tries += 1
print('Consumer done with initialization')
try:
for message in consumer:
logging.info("%s:%d:%d: key=%s value=%s" % (message.topic,
message.partition,
message.offset,
message.key,
message.value))
except KeyboardInterrupt:
logging.info('Exiting Kafka Consumer...')