From 1bbe78e6cbf11f6f49dfdcbc40b3198d66b24a89 Mon Sep 17 00:00:00 2001
From: "Benedikt von St. Vieth" <b.von.st.vieth@fz-juelich.de>
Date: Fri, 23 Dec 2016 14:36:24 +0100
Subject: [PATCH] swift-kafka now only writing to messagebus on response
 building.

---
 messagebus/messagebus.py | 22 ++++++++++++++++++----
 1 file changed, 18 insertions(+), 4 deletions(-)

diff --git a/messagebus/messagebus.py b/messagebus/messagebus.py
index 0d5fb0b..d993091 100644
--- a/messagebus/messagebus.py
+++ b/messagebus/messagebus.py
@@ -5,6 +5,9 @@ from swift.common.utils import config_true_value, get_logger, split_path
 from swift.proxy.controllers.base import get_object_info
 from webob import Request, Response
 
+from kafka import KafkaProducer
+import json
+
 
 class MessagebusMiddleware(object):
     """Middleware that sends information to message bus for every request."""
@@ -25,7 +28,7 @@ class MessagebusMiddleware(object):
                                      log_route='messagebus',
                                      log_to_console=False)
 
-    def __call__(self, env, start_response):
+    def __call__(self, req, start_response):
         """plug into the call.
 
         :param env: request environment
@@ -33,9 +36,20 @@ class MessagebusMiddleware(object):
         :return:
         """
         self.start_response = start_response
-        #request = Request(env)
-        self.logger.info("Messagebus Middleware")
-        return self.app(env, start_response)
+        self.method = Request(req).method
+        return self.app(req, self.submit_to_messagebus)
+
+    def submit_to_messagebus(self, status, headers, exc_info=None):
+        headers_send = dict(headers)
+        # headers_send.pop('X-Auth-Token')
+        self.logger.info(headers_send)
+        producer = KafkaProducer(bootstrap_servers='kafka',
+                                 value_serializer=lambda v: json.dumps(v).encode('utf-8'))
+        producer.send('swift-messagebus',
+                      headers_send,
+                      key=bytes(self.method)
+                      )
+        self.start_response(status, headers)
 
 
 def filter_factory(global_config, **local_conf):
-- 
GitLab