diff --git a/docker/kafka/neo4j-consumer/consumer.py b/docker/kafka/neo4j-consumer/consumer.py index 05f59dfcc942b44e516dda18f85975026babd0a1..c7029ba0a8da9f953b922df6d777552f586ac5bb 100644 --- a/docker/kafka/neo4j-consumer/consumer.py +++ b/docker/kafka/neo4j-consumer/consumer.py @@ -1,5 +1,6 @@ """consume kafka messages and publish them to neo4j.""" +from functools import partial from kafka import KafkaConsumer from kafka.errors import KafkaError from neo4j.v1 import GraphDatabase @@ -15,7 +16,7 @@ def connect_kafka(): logging.info('{}. try'.format(tries)) try: consumer = KafkaConsumer('swift', - bootstrap_servers='kafka:9092', + bootstrap_servers='localhost:9092', client_id='swift-kafka-consumer', key_deserializer=lambda k: k.decode( 'utf-8'), @@ -27,23 +28,50 @@ def connect_kafka(): print('Consumer done with initialization') return consumer + def connect_neo(): - host = os.getenv("NEO_PORT_7687_TCP_ADDR", "neo") + host = os.getenv("NEO_PORT_7687_TCP_ADDR", "127.0.0.1") port = os.getenv("NEO_PORT_7687_TCP_PORT", "7687") - # docker run -d -p 7687:7687 -p 7474:7474 --env=NEO4J_AUTH=none neo4j driver = GraphDatabase.driver("bolt://%s:%s" % (host, port), auth=None, encrypted=False) - session = driver.session() - return session + return driver + + +def create_neo_constrains(db_driver): + session = db_driver.session() + session.run( + "CREATE constraint on (account:Account) " + "ASSERT account.container is unique;") + + +def store_message(db_driver, key, value): + if key in ['PUT', 'POST']: + try: + result = db_driver.session().run( + "MERGE (a:Account {name: {aname}})" + "MERGE (c:Container {name: {cname}})-[:CON_OF_ACC]->(a)" + "MERGE (o:Object {name: {oname}})-[:OBJ_OF_CON]->(c)" + "SET o.updated={updated},o.checksum={etag};", + { + "aname": value[u'Account'], + "cname": value[u'Container'], + "oname": value[u'Object'], + "updated": value[u'Last-Modified'], + "etag": value[u'Etag'], + } + ) + logging.info(result) + except KeyError: + logging.error('Catching errors currently not implemented') if __name__ == '__main__': logging.getLogger().setLevel('INFO') logging.info('Starting Kafka Consumer') - logging.info(os.environ) kafka_consumer = connect_kafka() - neo_session = connect_neo() + db_driver = connect_neo() + create_neo_constrains(db_driver) try: for message in kafka_consumer: @@ -52,5 +80,6 @@ if __name__ == '__main__': message.offset, message.key, message.value)) + store_message(db_driver, message.key, message.value) except KeyboardInterrupt: logging.info('Exiting Kafka Consumer...')