Skip to content
Snippets Groups Projects
Commit 41174940 authored by Jedrzej Rybicki's avatar Jedrzej Rybicki
Browse files

better deleting, relation names changed, differentation between container and object creation

parent 5f2f7449
No related branches found
No related tags found
No related merge requests found
Pipeline #
...@@ -46,7 +46,8 @@ def connect_neo(): ...@@ -46,7 +46,8 @@ def connect_neo():
test_session = driver.session() test_session = driver.session()
test_session.close() test_session.close()
except ServiceUnavailable: except ServiceUnavailable:
logging.info('Database not jet available at: {}'.format(db_url)) logging.info(
'Database not jet available at: {}'.format(db_url))
sleep(5) sleep(5)
except KeyboardInterrupt: except KeyboardInterrupt:
logging.info('Exiting Neo Consumer...') logging.info('Exiting Neo Consumer...')
...@@ -66,37 +67,60 @@ def create_neo_constrains(db_driver): ...@@ -66,37 +67,60 @@ def create_neo_constrains(db_driver):
session.close() session.close()
def store_message(db_driver, key, value): def execute_query(db_driver, query, params):
try:
db_session = db_driver.session() db_session = db_driver.session()
result = db_session.run(query, params)
logging.info('Ran %s', result.statement)
except:
logging.error('Catching errors currently not implemented')
else:
db_session.close()
def store_message(db_driver, key, value):
if key in ['PUT', 'POST']: if key in ['PUT', 'POST']:
try: if value[u'Object']:
result = db_session.run( execute_query(db_driver=db_driver,
"MERGE (a:Account {name: {aname}})" query='MERGE (a:Account {name: {aname}})'
"MERGE (c:Container {name: {cname}})-[:CON_OF_ACC]->(a)" 'MERGE (c:Container {name: {cname}})<-[:OWNS]-(a)'
"MERGE (o:Object {name: {oname}})-[:OBJ_OF_CON]->(c)" 'MERGE (o:Object {name: {oname}})-[:BELONGS_TO]->(c)'
"SET o.updated={updated},o.checksum={etag};", 'SET o.updated={updated},o.checksum={etag};',
params=
{ {
"aname": value[u'Account'], 'aname': value[u'Account'],
"cname": value[u'Container'], 'cname': value[u'Container'],
"oname": value[u'Object'], 'oname': value[u'Object'],
"updated": value[u'Last-Modified'], 'updated': value[u'Last-Modified'],
"etag": value[u'Etag'], 'etag': value[u'Etag'],
}
)
else:
execute_query(db_driver=db_driver,
query='MERGE (a:Account {name: {aname}})'
'MERGE (c:Container {name: {cname}})<-['
':OWNS]-(a);',
params=
{
'aname': value[u'Account'],
'cname': value[u'Container'],
} }
) )
logging.info('Ran %s', result.statement)
except KeyError:
logging.error('Catching errors currently not implemented')
elif key in ['DELETE']: elif key in ['DELETE']:
try: execute_query(db_driver=db_driver,
result = db_session.run('MATCH (o:Object {name: {oname}})' query='MATCH (o:Object {name: {oname}})-['
'DETACH DELETE o', ':BELONGS_TO]->'
{'oname': value[u'Object']}) '(c:Container {name: {cname}})<-[:OWNS]-'
logging.info('Ran %s', result.statement) '(a:Account {name: {aname}})'
except KeyError: 'DETACH DELETE o;',
logging.error('Catching errors currently not implemented') params=
{'oname': value[u'Object'],
'aname': value[u'Account'],
'cname': value[u'Container'],
})
else: else:
logging.info('Unsupported method %s', key) logging.info('Unsupported method %s', key)
db_session.close()
if __name__ == '__main__': if __name__ == '__main__':
logging.getLogger().setLevel('INFO') logging.getLogger().setLevel('INFO')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment