diff --git a/messagebus/messagebus.py b/messagebus/messagebus.py index 0d5fb0b5565befad01e591c976b30d836bd5ba8b..d9930913c3e8495d98970af716ea5e6922caa895 100644 --- a/messagebus/messagebus.py +++ b/messagebus/messagebus.py @@ -5,6 +5,9 @@ from swift.common.utils import config_true_value, get_logger, split_path from swift.proxy.controllers.base import get_object_info from webob import Request, Response +from kafka import KafkaProducer +import json + class MessagebusMiddleware(object): """Middleware that sends information to message bus for every request.""" @@ -25,7 +28,7 @@ class MessagebusMiddleware(object): log_route='messagebus', log_to_console=False) - def __call__(self, env, start_response): + def __call__(self, req, start_response): """plug into the call. :param env: request environment @@ -33,9 +36,20 @@ class MessagebusMiddleware(object): :return: """ self.start_response = start_response - #request = Request(env) - self.logger.info("Messagebus Middleware") - return self.app(env, start_response) + self.method = Request(req).method + return self.app(req, self.submit_to_messagebus) + + def submit_to_messagebus(self, status, headers, exc_info=None): + headers_send = dict(headers) + # headers_send.pop('X-Auth-Token') + self.logger.info(headers_send) + producer = KafkaProducer(bootstrap_servers='kafka', + value_serializer=lambda v: json.dumps(v).encode('utf-8')) + producer.send('swift-messagebus', + headers_send, + key=bytes(self.method) + ) + self.start_response(status, headers) def filter_factory(global_config, **local_conf):