Skip to content
Snippets Groups Projects
Commit 1bbe78e6 authored by Benedikt von St. Vieth's avatar Benedikt von St. Vieth
Browse files

swift-kafka now only writing to messagebus on response building.

parent dd0fa63d
No related branches found
No related tags found
No related merge requests found
...@@ -5,6 +5,9 @@ from swift.common.utils import config_true_value, get_logger, split_path ...@@ -5,6 +5,9 @@ from swift.common.utils import config_true_value, get_logger, split_path
from swift.proxy.controllers.base import get_object_info from swift.proxy.controllers.base import get_object_info
from webob import Request, Response from webob import Request, Response
from kafka import KafkaProducer
import json
class MessagebusMiddleware(object): class MessagebusMiddleware(object):
"""Middleware that sends information to message bus for every request.""" """Middleware that sends information to message bus for every request."""
...@@ -25,7 +28,7 @@ class MessagebusMiddleware(object): ...@@ -25,7 +28,7 @@ class MessagebusMiddleware(object):
log_route='messagebus', log_route='messagebus',
log_to_console=False) log_to_console=False)
def __call__(self, env, start_response): def __call__(self, req, start_response):
"""plug into the call. """plug into the call.
:param env: request environment :param env: request environment
...@@ -33,9 +36,20 @@ class MessagebusMiddleware(object): ...@@ -33,9 +36,20 @@ class MessagebusMiddleware(object):
:return: :return:
""" """
self.start_response = start_response self.start_response = start_response
#request = Request(env) self.method = Request(req).method
self.logger.info("Messagebus Middleware") return self.app(req, self.submit_to_messagebus)
return self.app(env, start_response)
def submit_to_messagebus(self, status, headers, exc_info=None):
headers_send = dict(headers)
# headers_send.pop('X-Auth-Token')
self.logger.info(headers_send)
producer = KafkaProducer(bootstrap_servers='kafka',
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('swift-messagebus',
headers_send,
key=bytes(self.method)
)
self.start_response(status, headers)
def filter_factory(global_config, **local_conf): def filter_factory(global_config, **local_conf):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment