From 65788bc9e9935da22b691899750741edaac36dd7 Mon Sep 17 00:00:00 2001 From: jrybicki-jsc <j.rybicki@fz-juelich.de> Date: Tue, 2 Nov 2021 11:21:05 +0100 Subject: [PATCH] upload method first version --- dags/b2shareoperator.py | 33 +++++++++++++++++++++++++++++++++ tests/test_b2shareoperator.py | 24 +++++++++++++++++++++++- 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/dags/b2shareoperator.py b/dags/b2shareoperator.py index 90d07ae..a509358 100644 --- a/dags/b2shareoperator.py +++ b/dags/b2shareoperator.py @@ -1,3 +1,5 @@ +import json +import os import tempfile import urllib from urllib.parse import urljoin @@ -29,6 +31,37 @@ def download_file(url: str, target_dir: str): urllib.request.urlretrieve(url=url, filename=fname) return fname +def get_record_template(): + return {"titles":[{"title":"DLS dataset record"}], + "creators":[{"creator_name": "eflows4HPC"}], + "descriptions": + [{"description": "Output of eflows4HPC DLS", "description_type": "Abstract"}], + "community": "a9217684-945b-4436-8632-cac271f894ed", + 'community_specific': + {'91ae5d2a-3848-4693-9f7d-cbd141172ef0': {'helmholtz centre': ['Forschungszentrum Jülich']}}, + "open_access": True} + +def create_draft_record(server: str, token: str, record): + response = requests.post( url=urljoin(server, 'api/records/'), + headers={'Content-Type':'application/json'}, + data=json.dumps(record), params={'access_token': token}) + return response.json() + +# the simplest version, target should be chunked +def add_file(record, fname: str, token: str): + jf = os.path.split(fname)[-1] + return requests.put(url=f"{record['links']['files']}/{jf}", + params={'access_token': token}, + headers={"Content-Type":"application/octet-stream"}, + data=open(fname, 'rb')) + +def submit_draft(record, token): + pub = [{"op": "add", "path":"/publication_state", "value": "submitted"}] + response = requests.patch(record['links']['self'], + headers={"Content-Type":"application/json-patch+json"}, + data=json.dumps(pub), params={'access_token': token}) + return response.json() + class B2ShareOperator(BaseOperator): template_fields = ('target_dir',) diff --git a/tests/test_b2shareoperator.py b/tests/test_b2shareoperator.py index 0709d34..9ecebf3 100644 --- a/tests/test_b2shareoperator.py +++ b/tests/test_b2shareoperator.py @@ -1,5 +1,7 @@ import unittest from unittest.mock import Mock, patch +import tempfile +import os from airflow import DAG from airflow.models.taskinstance import TaskInstance @@ -7,7 +9,8 @@ from airflow.utils.dates import days_ago from airflow.utils.state import State from dags.b2shareoperator import (B2ShareOperator, download_file, - get_file_list, get_object_md, get_objects) + get_file_list, get_object_md, get_objects, + get_record_template, create_draft_record, add_file, submit_draft) DEFAULT_DATE = '2019-10-03' TEST_DAG_ID = 'test_my_custom_operator' @@ -83,3 +86,22 @@ class B2ShareOperatorTest(unittest.TestCase): get.return_value = m r = get_objects(server='foo') self.assertListEqual(['a', 'b'], r) + + def test_upload(self): + template = get_record_template() + server='https://b2share-testing.fz-juelich.de/' + token = '' + with patch('dags.b2shareoperator.requests.post') as post: + r = create_draft_record(server=server, token=token, record=template) + + r = dict() + r['links']={'files':server, 'self': server} + with patch('dags.b2shareoperator.requests.post') as put: + a = tempfile.NamedTemporaryFile() + a.write(b"some content") + up = add_file(record=r, fname=a.name, token=token) + + + with patch('dags.b2shareoperator.requests.patch') as p: + submitted = submit_draft(record=r, token=token) + -- GitLab