from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.models.connection import Connection
from airflow.models.dagrun import DagRun
import requests
import urllib.request
import tempfile
from b2shareoperator import get_file_list, download_file, get_object_md, get_objects

default_args = {
    'owner': 'airflow',
}

@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def taskflow_example(**kwargs):
    @task(multiple_outputs=True)
    def extract(**kwargs):
        connection = Connection.get_connection_from_secrets('default_b2share')
        server = connection.get_uri()
        print(f"Rereiving data from {server}")

        params = kwargs['params']
        if 'oid' not in params:
            print(f"Missing object id in pipeline parameters")
            lst = get_objects(server=server)
            flist = {o['id']: [f['key'] for f in o['files']] for o in lst}
            print(f"Objects on server: {flist}")
            return {} 
        else:
            oid = params['oid']

        obj = get_object_md(server=server, oid=oid)
        print(f"Retrieved object {oid}: {obj}")
        flist = get_file_list(obj)
        return flist

    @task(multiple_outputs=True)
    def transform(flist: dict):
        name_mappings = {}
        for fname, url in flist.items():
            print(f"Processing: {fname} --> {url}")
            tmpname = download_file(url=url, target_dir='/tmp/')
            name_mappings[fname]=tmpname
        return name_mappings

    @task()
    def load(files: dict):
        print(f"Total files downloaded: {len(files)}")

    
    data = extract()
    files = transform(data)
    load(files)
    
dag = taskflow_example()