Skip to content
Snippets Groups Projects
Commit c7c01eba authored by bne86's avatar bne86
Browse files

reduce to one kafka constructor. db session open, run, close.

parent 2c618e8a
No related branches found
No related tags found
No related merge requests found
Pipeline #
......@@ -24,8 +24,7 @@ def connect_kafka():
'utf-8'),
value_deserializer=lambda m: json.loads(
m.decode('utf-8')),
auto_offset_reset="earliest")
KafkaConsumer(auto_offset_reset='earliest',
auto_offset_reset="earliest",
enable_auto_commit=False)
except KafkaError:
logging.info('Currently something wrong with Kafka, retrying...')
......@@ -67,7 +66,8 @@ def create_neo_constrains(db_driver):
session.close()
def store_message(db_session, key, value):
def store_message(db_driver, key, value):
db_session = db_driver.session()
if key in ['PUT', 'POST']:
try:
result = db_session.run(
......@@ -96,6 +96,7 @@ def store_message(db_session, key, value):
logging.error('Catching errors currently not implemented')
else:
logging.info('Unsupported method %s', key)
db_session.close()
if __name__ == '__main__':
logging.getLogger().setLevel('INFO')
......@@ -104,7 +105,6 @@ if __name__ == '__main__':
kafka_consumer = connect_kafka()
db_driver = connect_neo()
create_neo_constrains(db_driver)
db_session = db_driver.session()
try:
for message in kafka_consumer:
......@@ -113,7 +113,7 @@ if __name__ == '__main__':
message.offset,
message.key,
message.value))
store_message(db_session, message.key, message.value)
store_message(db_driver, message.key, message.value)
except KeyboardInterrupt:
logging.info('Exiting Kafka Consumer...')
db_session.close()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment