Skip to content
Snippets Groups Projects
Select Git revision
  • 0588184612b07bbe0e6ba989fb9fb4be9ef55f96
  • documentation default
  • master protected
  • integration
  • pre_update
5 results

Cell Magics.ipynb

Blame
  • consumer.py 1.35 KiB
    """consume kafka messages."""
    
    from kafka import KafkaConsumer
    from kafka.errors import KafkaError
    import json
    import logging
    
    if __name__ == '__main__':
        logging.getLogger().setLevel('INFO')
        logging.info('Starting Kafka Consumer')
    
        consumer = None
        tries = 1
        while not consumer:
            logging.info('{}. try'.format(tries))
            try:
                consumer = KafkaConsumer('swift',
                                         bootstrap_servers='kafka:9092',
                                         client_id='swift-kafka-consumer',
                                         key_deserializer=lambda k: k.decode('utf-8'),
                                         value_deserializer=lambda m: json.loads(m.decode('utf-8')))
            except KafkaError:
                logging.info('Currently something wrong with Kafka, retrying...')
                tries += 1
        print('Consumer done with initialization')
        try:
            for message in consumer:
                logging.info("%s:%d:%d: key=%s value=%s" % (message.topic,
                                                            message.partition,
                                                            message.offset,
                                                            message.key,
                                                            message.value))
        except KeyboardInterrupt:
            logging.info('Exiting Kafka Consumer...')