Skip to content
Snippets Groups Projects
Commit 3072dee9 authored by bne86's avatar bne86
Browse files

add producer to docker inventory, make consumer and producer work again.

parent f9941f0b
No related branches found
No related tags found
No related merge requests found
......@@ -15,7 +15,14 @@ services:
consumer:
build:
context: ..
dockerfile: docker/kafka/Dockerfile
dockerfile: docker/kafka/consumer/Dockerfile
links:
- kafka
producer:
build:
context: ..
dockerfile: docker/kafka/producer/Dockerfile
links:
- kafka
......
"""consume kafka messages."""
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import time
if __name__ == '__main__':
consumer = None
tries = 1
while not consumer:
print('{}. try'.format(tries))
try:
consumer = KafkaConsumer('swift-messagebus',
bootstrap_servers='kafka')
except KafkaError:
print('Currently something wrong with Kafka, retrying...')
time.sleep(5)
while True:
message = consumer.poll()
consumer.subscribe()
print(message)
time.sleep(10)
FROM python:2.7-alpine
MAINTAINER bne86 <b.von.st.vieth@fz-juelich.de>
#RUN apt-get update && \
# apt-get install -y python2.7-minimal python2.7-setuptools && \
# apt-get clean
ADD . swift-kafka
WORKDIR swift-kafka
RUN python setup.py install
ADD docker/kafka/consumer.py consumer.py
ADD docker/kafka/consumer/consumer.py consumer.py
CMD python consumer.py
"""consume kafka messages."""
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
import logging
if __name__ == '__main__':
logging.getLogger().setLevel('INFO')
logging.info('Starting Kafka Consumer')
consumer = None
tries = 1
while not consumer:
logging.info('{}. try'.format(tries))
try:
consumer = KafkaConsumer('swift',
bootstrap_servers='kafka:9092',
client_id='swift-kafka-consumer',
key_deserializer=lambda k: k.decode('utf-8'),
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
except KafkaError:
logging.info('Currently something wrong with Kafka, retrying...')
tries += 1
print('Consumer done with initialization')
try:
for message in consumer:
logging.info("%s:%d:%d: key=%s value=%s" % (message.topic,
message.partition,
message.offset,
message.key,
message.value))
except KeyboardInterrupt:
logging.info('Exiting Kafka Consumer...')
FROM python:2.7-alpine
MAINTAINER bne86 <b.von.st.vieth@fz-juelich.de>
ADD . swift-kafka
WORKDIR swift-kafka
RUN python setup.py install
ADD docker/kafka/producer/producer.py producer.py
CMD python producer.py
......@@ -8,8 +8,8 @@ import time
if __name__ == '__main__':
logging.getLogger().setLevel('INFO')
logging.info('Starting Kafka Producers')
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092',
logging.info('Starting Kafka Producer')
producer = KafkaProducer(bootstrap_servers='kafka:9092',
key_serializer=lambda k: k.encode('utf-8'),
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
......@@ -19,7 +19,7 @@ if __name__ == '__main__':
payload = {'time': current}
logging.info("Send to Kafka: {}".format(payload))
producer.send(topic='swift', key='producer-test', value=payload)
time.sleep(1)
time.sleep(10)
except KeyboardInterrupt:
logging.info("Exiting...")
logging.info("Exiting Kafka Producer...")
break
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment