Skip to content
Snippets Groups Projects
Commit 230dcf77 authored by bne86's avatar bne86
Browse files

rename neo4j compose container to neo, also change os.getenvironment code...

rename neo4j compose container to neo, also change os.getenvironment code because links are no longer exported as environment variables (see https://docs.docker.com/compose/link-env-deprecated/)
parent 38ccec13
Branches
Tags
No related merge requests found
Pipeline #
...@@ -12,14 +12,14 @@ services: ...@@ -12,14 +12,14 @@ services:
- zookeeper - zookeeper
- kafka - kafka
neo4j-consumer: neo-consumer:
build: build:
context: .. context: ..
dockerfile: docker/kafka/neo4j-consumer/Dockerfile dockerfile: docker/kafka/neo4j-consumer/Dockerfile
links: links:
- kafka - kafka
- neo4j - neo
neo4j: neo:
image: neo4j:latest image: neo4j:latest
ports: ports:
- "7474:7474" - "7474:7474"
......
"""consume kafka messages.""" """consume kafka messages and publish them to neo4j."""
from kafka import KafkaConsumer from kafka import KafkaConsumer
from kafka.errors import KafkaError from kafka.errors import KafkaError
from neo4j.v1 import GraphDatabase
import json import json
import logging import logging
import os
if __name__ == '__main__':
logging.getLogger().setLevel('INFO')
logging.info('Starting Kafka Consumer')
def connect_kafka():
consumer = None consumer = None
tries = 1 tries = 1
while not consumer: while not consumer:
...@@ -17,14 +17,36 @@ if __name__ == '__main__': ...@@ -17,14 +17,36 @@ if __name__ == '__main__':
consumer = KafkaConsumer('swift', consumer = KafkaConsumer('swift',
bootstrap_servers='kafka:9092', bootstrap_servers='kafka:9092',
client_id='swift-kafka-consumer', client_id='swift-kafka-consumer',
key_deserializer=lambda k: k.decode('utf-8'), key_deserializer=lambda k: k.decode(
value_deserializer=lambda m: json.loads(m.decode('utf-8'))) 'utf-8'),
value_deserializer=lambda m: json.loads(
m.decode('utf-8')))
except KafkaError: except KafkaError:
logging.info('Currently something wrong with Kafka, retrying...') logging.info('Currently something wrong with Kafka, retrying...')
tries += 1 tries += 1
print('Consumer done with initialization') print('Consumer done with initialization')
return consumer
def connect_neo():
host = os.getenv("NEO_PORT_7687_TCP_ADDR", "neo")
port = os.getenv("NEO_PORT_7687_TCP_PORT", "7687")
# docker run -d -p 7687:7687 -p 7474:7474 --env=NEO4J_AUTH=none neo4j
driver = GraphDatabase.driver("bolt://%s:%s" % (host, port),
auth=None,
encrypted=False)
session = driver.session()
return session
if __name__ == '__main__':
logging.getLogger().setLevel('INFO')
logging.info('Starting Kafka Consumer')
logging.info(os.environ)
kafka_consumer = connect_kafka()
neo_session = connect_neo()
try: try:
for message in consumer: for message in kafka_consumer:
logging.info("%s:%d:%d: key=%s value=%s" % (message.topic, logging.info("%s:%d:%d: key=%s value=%s" % (message.topic,
message.partition, message.partition,
message.offset, message.offset,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment