"""consume kafka messages and publish them to neo4j."""

from kafka import KafkaConsumer
from kafka.errors import KafkaError
from time import sleep
from neo4j.v1 import GraphDatabase
from neo4j.bolt.connection import ServiceUnavailable
import json
import logging
import os


def connect_kafka():
    consumer = None
    tries = 1
    while not consumer:
        logging.info('{}. try'.format(tries))
        try:
            consumer = KafkaConsumer('swift',
                                     bootstrap_servers='kafka:9092',
                                     client_id='swift-kafka-consumer',
                                     group_id='kafka-python-neo4j-consumer',
                                     key_deserializer=lambda k: k.decode(
                                         'utf-8'),
                                     value_deserializer=lambda m: json.loads(
                                         m.decode('utf-8')),
                                     auto_offset_reset="earliest",
                                     enable_auto_commit=False)
        except KafkaError:
            logging.info('Currently something wrong with Kafka, retrying...')
            tries += 1
    print('Kafka Consumer done with initialization')
    return consumer


def connect_neo():
    host = os.getenv("NEO_PORT_7687_TCP_ADDR", "neo")
    port = os.getenv("NEO_PORT_7687_TCP_PORT", "7687")
    db_url = "bolt://%s:%s" % (host, port)
    for _ in range(0, 12):
        try:
            try:
                driver = GraphDatabase.driver(db_url,
                                              auth=None,
                                              encrypted=False)
                test_session = driver.session()
                test_session.close()
            except ServiceUnavailable:
                logging.info(
                    'Database not jet available at: {}'.format(db_url))
                sleep(5)
        except KeyboardInterrupt:
            logging.info('Exiting Neo Consumer...')
            exit(0)
    if not driver:
        logging.error('Database not available within a minute, exiting...')
        exit(1)
    return driver


def create_neo_constrains(db_driver):
    session = db_driver.session()
    # unique account and perhaps object id as well?
    session.run(
        "CREATE constraint on (account:Account) "
        "ASSERT account.container is unique;")
    session.close()


def execute_query(db_driver, query, params):
    try:
        db_session = db_driver.session()
        result = db_session.run(query, params)
        logging.info('Ran %s', result.statement)
    except:
        logging.error('Catching errors currently not implemented')
    else:
        db_session.close()


def store_message(db_driver, key, value):
    if key in ['PUT', 'POST']:
        if value[u'Object']:
            execute_query(db_driver=db_driver,
                          query='MERGE (a:Account {name: {aname}})'
                                'MERGE (c:Container {name: {cname}})<-[:OWNS]-(a)'
                                'MERGE (o:Object {name: {oname}})-[:BELONGS_TO]->(c)'
                                'SET o.updated={updated},o.checksum={etag};',
                          params=
                          {
                              'aname': value[u'Account'],
                              'cname': value[u'Container'],
                              'oname': value[u'Object'],
                              'updated': value[u'Last-Modified'],
                              'etag': value[u'Etag'],
                          }
                          )
        else:
            execute_query(db_driver=db_driver,
                          query='MERGE (a:Account {name: {aname}})'
                                'MERGE (c:Container {name: {cname}})<-['
                                ':OWNS]-(a);',
                          params=
                          {
                              'aname': value[u'Account'],
                              'cname': value[u'Container'],
                          }
                          )
    elif key in ['DELETE']:
        execute_query(db_driver=db_driver,
                      query='MATCH (o:Object {name: {oname}})-['
                            ':BELONGS_TO]->'
                            '(c:Container {name: {cname}})<-[:OWNS]-'
                            '(a:Account {name: {aname}})'
                            'DETACH DELETE o;',
                      params=
                      {'oname': value[u'Object'],
                       'aname': value[u'Account'],
                       'cname': value[u'Container'],
                       })
    else:
        logging.info('Unsupported method %s', key)


if __name__ == '__main__':
    logging.getLogger().setLevel('INFO')
    logging.info('Starting Kafka Consumer')

    kafka_consumer = connect_kafka()
    db_driver = connect_neo()
    create_neo_constrains(db_driver)

    try:
        for message in kafka_consumer:
            logging.info("%s:%d:%d: key=%s value=%s" % (message.topic,
                                                        message.partition,
                                                        message.offset,
                                                        message.key,
                                                        message.value))
            store_message(db_driver, message.key, message.value)
    except KeyboardInterrupt:
        logging.info('Exiting Kafka Consumer...')
        db_session.close()