diff --git a/src/pages/dashboard.py b/src/pages/dashboard.py index 7ed5fc3be96c7c715e11ba14d1da5570dc3c91ed..8254b122519acd83d413a19cab565a04fc3f015e 100644 --- a/src/pages/dashboard.py +++ b/src/pages/dashboard.py @@ -616,6 +616,59 @@ def create_description_content(language_id): ) +def get_job_status(jobid, convoc_status_dict): + base_url = f"{UNICORE_BASE}JURECA/rest/core" + + # Authentification/Authorization + credentials = uc_credentials.UsernamePassword(UNICORE_USER, UNICORE_PASSWORD) + client = uc_client.Client(credentials, base_url) + + # query status of ecflow jobs + + executable = "ecflow_client" + environment = [ "PYTHONPATH=$PYTHONPATH:/p/project/cjicg21/jicg2126/ecflow/ecFlow-5.11.0-Source/lib/python3.8/site-packages/ecflow", + "PATH=$PATH:/p/project/cjicg21/jicg2126/ecflow/ecFlow-5.11.0-Source/bin/", + "ECFLOW_DIR=/p/project/cjicg21/jicg2126/ecflow/ecFlow-5.11.0-Source", + "ECF_HOST=jrlogin05.jureca", + "ECF_PORT=4960" ] + + arguments = [ f"--query dstate /{jobid}" ] + job_description = {"Executable": executable, + "Job type": "ON_LOGIN_NODE", + "Environment": environment, + "Arguments": arguments, } + job = client.new_job(job_description) + + # let's wait while the job is still running + job.poll() + + working_dir = job.working_dir + stdout_content = working_dir.stat("stdout").raw().readlines() + new_status = stdout_content[0].decode('utf-8').strip() + + # translate response from UNICORE to controlled vocabulary + if new_status == 'complete': + new_status_code = convoc_status_dict['finished'] + elif new_status == 'active': + new_status_code = convoc_status_dict['active'] + elif new_status == 'queued': + new_status_code = convoc_status_dict['waiting'] + elif new_status == 'aborted': + new_status_code = convoc_status_dict['aborted'] + else: + new_status_code = -1 + + return new_status_code + + +def change_status_of_job(jobid, new_status): + conn = sqlite3.connect(DATA_PATH.joinpath('destine_de370c_users.db')) + cur = conn.cursor() + cur.execute(f"UPDATE jobs SET status={new_status} WHERE id='{jobid}'") + conn.commit() + conn.close() + + def get_my_jobs_from_db(user_id=None, language_id=0): data_from_db = [] if user_id: @@ -623,16 +676,27 @@ def get_my_jobs_from_db(user_id=None, language_id=0): cur = conn.cursor() cur.execute(f"SELECT application, status, start_date, forecast_length, region, species, metric, emis_scen, creation_date, id FROM jobs WHERE user_id={user_id}") data_rows_from_db = cur.fetchall() + cur.execute("SELECT job_status, id FROM convoc_status") + convoc_status_from_db = cur.fetchall() conn.close() + convoc_status_dict = dict(convoc_status_from_db) + # if status != 'finished': # use /p/project/cjicg21/schroeder5/Destine_AQ/SCRIPTS/ECFLOW/query_status.bash jobnr # to determine the actual status # ==> there should be a refresh, whenever this tab is reloaded! for job in data_rows_from_db: + job_status = job[1] + if (job_status == convoc_status_dict['active']) or (job_status == convoc_status_dict['waiting']): + new_status = get_job_status(job[9].upper(), convoc_status_dict) + if job_status != new_status: + change_status_of_job(job[9], new_status) + job_status = new_status + data_from_db.append({guitr.jobs_columns[language_id][0]: guitr.application_text[language_id][job[0]], - guitr.jobs_columns[language_id][1]: guitr.status_text[language_id][job[1]], + guitr.jobs_columns[language_id][1]: guitr.status_text[language_id][job_status], guitr.jobs_columns[language_id][2]: dt.datetime.strptime(job[2],'%Y-%m-%d %H:%M').strftime(guitr.date_format2[language_id]), guitr.jobs_columns[language_id][3]: "{} {}{}".format(job[3], guitr.day_label[language_id], guitr.day_plural_label[language_id] if job[3] > 1 else ""), guitr.jobs_columns[language_id][4]: guitr.region_text[language_id][job[4]], @@ -881,7 +945,6 @@ def eurad_im_job_run(run_button, region, startdate, forecast_length, user_dict): new_job_dict['metric'] = None # EURAD-IM does not have any emission scenario new_job_dict['emis_scen'] = None - create_db_job_entry(user_id, new_job_dict) # submit job base_url = f"{UNICORE_BASE}JURECA/rest/core" @@ -890,6 +953,13 @@ def eurad_im_job_run(run_button, region, startdate, forecast_length, user_dict): job_description = {'Executable': "/p/project/cjicg21/schroeder5/Destine_AQ/start_destine_demonstrator.sh", "Job type": "ON_LOGIN_NODE", 'Arguments':[jobnr, "0"], } job = client.new_job(job_description) + # let's wait while the job is still running + # otherwise, the job status will not be able to be determined + job.poll() + + # now create job in db (otherwise the status cannot be determined!) + create_db_job_entry(user_id, new_job_dict) + # return updated values retval = ctx.triggered_id == "eurad_im_open" return retval,\