"""consume kafka messages and publish them to neo4j.""" from kafka import KafkaConsumer from kafka.errors import KafkaError from time import sleep from neo4j.v1 import GraphDatabase from neo4j.bolt.connection import ServiceUnavailable import json import logging import os def connect_kafka(): consumer = None tries = 1 while not consumer: logging.info('{}. try'.format(tries)) try: consumer = KafkaConsumer('swift', bootstrap_servers='kafka:9092', client_id='swift-kafka-consumer', group_id='kafka-python-neo4j-consumer', key_deserializer=lambda k: k.decode( 'utf-8'), value_deserializer=lambda m: json.loads( m.decode('utf-8')), auto_offset_reset="earliest", enable_auto_commit=False) except KafkaError: logging.info('Currently something wrong with Kafka, retrying...') tries += 1 print('Kafka Consumer done with initialization') return consumer def connect_neo(): host = os.getenv("NEO_PORT_7687_TCP_ADDR", "neo") port = os.getenv("NEO_PORT_7687_TCP_PORT", "7687") db_url = "bolt://%s:%s" % (host, port) for _ in range(0, 12): try: try: driver = GraphDatabase.driver(db_url, auth=None, encrypted=False) test_session = driver.session() test_session.close() except ServiceUnavailable: logging.info( 'Database not jet available at: {}'.format(db_url)) sleep(5) except KeyboardInterrupt: logging.info('Exiting Neo Consumer...') exit(0) if not driver: logging.error('Database not available within a minute, exiting...') exit(1) return driver def create_neo_constrains(db_driver): session = db_driver.session() # unique account and perhaps object id as well? session.run( "CREATE constraint on (account:Account) " "ASSERT account.container is unique;") session.close() def execute_query(db_driver, query, params): try: db_session = db_driver.session() result = db_session.run(query, params) logging.info('Ran %s', result.statement) except: logging.error('Catching errors currently not implemented') else: db_session.close() def store_message(db_driver, key, value): if key in ['PUT', 'POST']: if value[u'Object']: execute_query(db_driver=db_driver, query='MERGE (a:Account {name: {aname}})' 'MERGE (c:Container {name: {cname}})<-[:OWNS]-(a)' 'MERGE (o:Object {name: {oname}})-[:BELONGS_TO]->(c)' 'SET o.updated={updated},o.checksum={etag};', params= { 'aname': value[u'Account'], 'cname': value[u'Container'], 'oname': value[u'Object'], 'updated': value[u'Last-Modified'], 'etag': value[u'Etag'], } ) else: execute_query(db_driver=db_driver, query='MERGE (a:Account {name: {aname}})' 'MERGE (c:Container {name: {cname}})<-[' ':OWNS]-(a);', params= { 'aname': value[u'Account'], 'cname': value[u'Container'], } ) elif key in ['DELETE']: execute_query(db_driver=db_driver, query='MATCH (o:Object {name: {oname}})-[' ':BELONGS_TO]->' '(c:Container {name: {cname}})<-[:OWNS]-' '(a:Account {name: {aname}})' 'DETACH DELETE o;', params= {'oname': value[u'Object'], 'aname': value[u'Account'], 'cname': value[u'Container'], }) else: logging.info('Unsupported method %s', key) if __name__ == '__main__': logging.getLogger().setLevel('INFO') logging.info('Starting Kafka Consumer') kafka_consumer = connect_kafka() db_driver = connect_neo() create_neo_constrains(db_driver) try: for message in kafka_consumer: logging.info("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value)) store_message(db_driver, message.key, message.value) except KeyboardInterrupt: logging.info('Exiting Kafka Consumer...') db_session.close()