Skip to content
Snippets Groups Projects
Select Git revision
  • df7d8919e1f8920972e667cfeb0e8dc6cbce2085
  • main default protected
2 results

hanoi.cc

Blame
  • consumer.py 1.35 KiB
    """consume kafka messages."""
    
    from kafka import KafkaConsumer
    from kafka.errors import KafkaError
    import json
    import logging
    
    if __name__ == '__main__':
        logging.getLogger().setLevel('INFO')
        logging.info('Starting Kafka Consumer')
    
        consumer = None
        tries = 1
        while not consumer:
            logging.info('{}. try'.format(tries))
            try:
                consumer = KafkaConsumer('swift',
                                         bootstrap_servers='kafka:9092',
                                         client_id='swift-kafka-consumer',
                                         key_deserializer=lambda k: k.decode('utf-8'),
                                         value_deserializer=lambda m: json.loads(m.decode('utf-8')))
            except KafkaError:
                logging.info('Currently something wrong with Kafka, retrying...')
                tries += 1
        print('Consumer done with initialization')
        try:
            for message in consumer:
                logging.info("%s:%d:%d: key=%s value=%s" % (message.topic,
                                                            message.partition,
                                                            message.offset,
                                                            message.key,
                                                            message.value))
        except KeyboardInterrupt:
            logging.info('Exiting Kafka Consumer...')