diff --git a/docker/kafka/neo4j-consumer/consumer.py b/docker/kafka/neo4j-consumer/consumer.py index 852ae7bfc91d3b5c2fa124cc5c0e2c0956738256..3ba65f80f02ebe3e67bcfb12ced9d752dd1da5cb 100644 --- a/docker/kafka/neo4j-consumer/consumer.py +++ b/docker/kafka/neo4j-consumer/consumer.py @@ -46,7 +46,8 @@ def connect_neo(): test_session = driver.session() test_session.close() except ServiceUnavailable: - logging.info('Database not jet available at: {}'.format(db_url)) + logging.info( + 'Database not jet available at: {}'.format(db_url)) sleep(5) except KeyboardInterrupt: logging.info('Exiting Neo Consumer...') @@ -66,37 +67,60 @@ def create_neo_constrains(db_driver): session.close() +def execute_query(db_driver, query, params): + try: + db_session = db_driver.session() + result = db_session.run(query, params) + logging.info('Ran %s', result.statement) + except: + logging.error('Catching errors currently not implemented') + else: + db_session.close() + + def store_message(db_driver, key, value): - db_session = db_driver.session() if key in ['PUT', 'POST']: - try: - result = db_session.run( - "MERGE (a:Account {name: {aname}})" - "MERGE (c:Container {name: {cname}})-[:CON_OF_ACC]->(a)" - "MERGE (o:Object {name: {oname}})-[:OBJ_OF_CON]->(c)" - "SET o.updated={updated},o.checksum={etag};", - { - "aname": value[u'Account'], - "cname": value[u'Container'], - "oname": value[u'Object'], - "updated": value[u'Last-Modified'], - "etag": value[u'Etag'], - } - ) - logging.info('Ran %s', result.statement) - except KeyError: - logging.error('Catching errors currently not implemented') + if value[u'Object']: + execute_query(db_driver=db_driver, + query='MERGE (a:Account {name: {aname}})' + 'MERGE (c:Container {name: {cname}})<-[:OWNS]-(a)' + 'MERGE (o:Object {name: {oname}})-[:BELONGS_TO]->(c)' + 'SET o.updated={updated},o.checksum={etag};', + params= + { + 'aname': value[u'Account'], + 'cname': value[u'Container'], + 'oname': value[u'Object'], + 'updated': value[u'Last-Modified'], + 'etag': value[u'Etag'], + } + ) + else: + execute_query(db_driver=db_driver, + query='MERGE (a:Account {name: {aname}})' + 'MERGE (c:Container {name: {cname}})<-[' + ':OWNS]-(a);', + params= + { + 'aname': value[u'Account'], + 'cname': value[u'Container'], + } + ) elif key in ['DELETE']: - try: - result = db_session.run('MATCH (o:Object {name: {oname}})' - 'DETACH DELETE o', - {'oname': value[u'Object']}) - logging.info('Ran %s', result.statement) - except KeyError: - logging.error('Catching errors currently not implemented') + execute_query(db_driver=db_driver, + query='MATCH (o:Object {name: {oname}})-[' + ':BELONGS_TO]->' + '(c:Container {name: {cname}})<-[:OWNS]-' + '(a:Account {name: {aname}})' + 'DETACH DELETE o;', + params= + {'oname': value[u'Object'], + 'aname': value[u'Account'], + 'cname': value[u'Container'], + }) else: logging.info('Unsupported method %s', key) - db_session.close() + if __name__ == '__main__': logging.getLogger().setLevel('INFO')