Skip to content
Snippets Groups Projects
Commit 5a387858 authored by bne86's avatar bne86
Browse files

creating namespace within neo4j done. maybe not most efficient implementation,...

creating namespace within neo4j done. maybe not most efficient implementation, but we have experts for that.
parent 230dcf77
Branches
No related tags found
No related merge requests found
Pipeline #
"""consume kafka messages and publish them to neo4j.""" """consume kafka messages and publish them to neo4j."""
from functools import partial
from kafka import KafkaConsumer from kafka import KafkaConsumer
from kafka.errors import KafkaError from kafka.errors import KafkaError
from neo4j.v1 import GraphDatabase from neo4j.v1 import GraphDatabase
...@@ -15,7 +16,7 @@ def connect_kafka(): ...@@ -15,7 +16,7 @@ def connect_kafka():
logging.info('{}. try'.format(tries)) logging.info('{}. try'.format(tries))
try: try:
consumer = KafkaConsumer('swift', consumer = KafkaConsumer('swift',
bootstrap_servers='kafka:9092', bootstrap_servers='localhost:9092',
client_id='swift-kafka-consumer', client_id='swift-kafka-consumer',
key_deserializer=lambda k: k.decode( key_deserializer=lambda k: k.decode(
'utf-8'), 'utf-8'),
...@@ -27,23 +28,50 @@ def connect_kafka(): ...@@ -27,23 +28,50 @@ def connect_kafka():
print('Consumer done with initialization') print('Consumer done with initialization')
return consumer return consumer
def connect_neo(): def connect_neo():
host = os.getenv("NEO_PORT_7687_TCP_ADDR", "neo") host = os.getenv("NEO_PORT_7687_TCP_ADDR", "127.0.0.1")
port = os.getenv("NEO_PORT_7687_TCP_PORT", "7687") port = os.getenv("NEO_PORT_7687_TCP_PORT", "7687")
# docker run -d -p 7687:7687 -p 7474:7474 --env=NEO4J_AUTH=none neo4j
driver = GraphDatabase.driver("bolt://%s:%s" % (host, port), driver = GraphDatabase.driver("bolt://%s:%s" % (host, port),
auth=None, auth=None,
encrypted=False) encrypted=False)
session = driver.session() return driver
return session
def create_neo_constrains(db_driver):
session = db_driver.session()
session.run(
"CREATE constraint on (account:Account) "
"ASSERT account.container is unique;")
def store_message(db_driver, key, value):
if key in ['PUT', 'POST']:
try:
result = db_driver.session().run(
"MERGE (a:Account {name: {aname}})"
"MERGE (c:Container {name: {cname}})-[:CON_OF_ACC]->(a)"
"MERGE (o:Object {name: {oname}})-[:OBJ_OF_CON]->(c)"
"SET o.updated={updated},o.checksum={etag};",
{
"aname": value[u'Account'],
"cname": value[u'Container'],
"oname": value[u'Object'],
"updated": value[u'Last-Modified'],
"etag": value[u'Etag'],
}
)
logging.info(result)
except KeyError:
logging.error('Catching errors currently not implemented')
if __name__ == '__main__': if __name__ == '__main__':
logging.getLogger().setLevel('INFO') logging.getLogger().setLevel('INFO')
logging.info('Starting Kafka Consumer') logging.info('Starting Kafka Consumer')
logging.info(os.environ)
kafka_consumer = connect_kafka() kafka_consumer = connect_kafka()
neo_session = connect_neo() db_driver = connect_neo()
create_neo_constrains(db_driver)
try: try:
for message in kafka_consumer: for message in kafka_consumer:
...@@ -52,5 +80,6 @@ if __name__ == '__main__': ...@@ -52,5 +80,6 @@ if __name__ == '__main__':
message.offset, message.offset,
message.key, message.key,
message.value)) message.value))
store_message(db_driver, message.key, message.value)
except KeyboardInterrupt: except KeyboardInterrupt:
logging.info('Exiting Kafka Consumer...') logging.info('Exiting Kafka Consumer...')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment