Skip to content
Snippets Groups Projects
Commit 0de0ba02 authored by Tim Kreuzer's avatar Tim Kreuzer
Browse files

Use unicore internal port forwarding

parent 826f8ca9
Branches
Tags
No related merge requests found
......@@ -8,7 +8,6 @@ from cryptography.hazmat.backends import default_backend
from cryptography.x509 import load_pem_x509_certificate
from jupyterhub.apihandlers import default_handlers
from jupyterhub.apihandlers.base import APIHandler
from tornado.httpclient import HTTPRequest
class SpawnEventsUnicoreAPIHandler(APIHandler):
......@@ -42,7 +41,7 @@ class SpawnEventsUnicoreAPIHandler(APIHandler):
body = self.request.body.decode("utf8")
body = json.loads(body) if body else {}
self.log.info(
f"{spawner._log_name} - Unicore Status Update received",
f"{spawner._log_name} - Unicore Status Update received - {body.get('status', '')}",
extra={
"uuidcode": spawner.name,
"username": user.name,
......@@ -64,13 +63,13 @@ class SpawnEventsUnicoreAPIHandler(APIHandler):
},
)
if bool(spawner._spawn_pending or spawner.ready):
asyncio.create_task(
spawner.stop(cancel=True, event=spawner.unicore_stop_event)
)
event = await spawner.unicore_stop_event()
asyncio.create_task(spawner.stop(cancel=True, event=event))
else:
bssStatus = body.get("bssStatus", "")
# It's in Running (UNICORE wise) state. We can now check for bssStatus to get more details
for key, bssDetails in spawner.bss_notification_config.items():
bss_config = await spawner.get_bss_notification_config()
for key, bssDetails in bss_config.items():
if key == bssStatus:
now = datetime.datetime.now().strftime("%Y_%m_%d %H:%M:%S.%f")[:-3]
summary = bssDetails.get("summary", f"Slurm status: {key}")
......
import copy
import asyncio
import html
import json
import re
import time
from datetime import datetime
import pyunicore.client as pyunicore
from forwardbasespawner import ForwardBaseSpawner
from jupyterhub.utils import maybe_future
from jupyterhub.utils import url_escape_path
from jupyterhub.utils import url_path_join
from pyunicore.forwarder import Forwarder
from requests.exceptions import HTTPError
from traitlets import Any
from traitlets import Bool
from traitlets import Dict
from traitlets import Integer
from traitlets import Unicode
class UnicoreSpawner(ForwardBaseSpawner):
......@@ -261,7 +266,24 @@ class UnicoreSpawner(ForwardBaseSpawner):
""",
)
get_bss_notification_config = Dict(
unicore_internal_forwarding = Any(
config=True,
default_value=True,
elp="""
...
""",
)
async def get_unicore_internal_forwarding(self):
if callable(self.unicore_internal_forwarding):
unicore_internal_forwarding = await maybe_future(
self.unicore_internal_forwarding(self)
)
else:
unicore_internal_forwarding = self.unicore_internal_forwarding
return unicore_internal_forwarding
bss_notification_config = Any(
config=True,
default_value={
"PENDING": {
......@@ -280,6 +302,15 @@ class UnicoreSpawner(ForwardBaseSpawner):
""",
)
async def get_bss_notification_config(self):
if callable(self.bss_notification_config):
bss_notification_config = await maybe_future(
self.bss_notification_config(self)
)
else:
bss_notification_config = self.bss_notification_config
return bss_notification_config
download_max_bytes = Integer(
config=True,
default_value=4096,
......@@ -397,22 +428,29 @@ class UnicoreSpawner(ForwardBaseSpawner):
job = self.timed_func_call(pyunicore.Job, transport, self.resource_url)
return job
def _prettify_error_logs(log_list, lines, summary):
def short_logs(self, log_list, lines):
if type(log_list) == str:
log_list = log_list.split("\n")
if type(log_list) == list:
log_list = [x.split("\n") for x in log_list]
log_list_clear = []
for l in log_list:
if type(l) == list:
log_list_clear.extend(l)
else:
log_list_clear.append(l)
if lines > 0:
log_list_short = log_list[-lines:]
if lines < len(log_list):
log_list_short.insert(0, "...")
log_list_clear = log_list_clear[-lines:]
if lines < len(log_list_clear):
log_list_clear.insert(0, "...")
return log_list_clear
def _prettify_error_logs(self, log_list, lines, summary):
log_list_short = self.short_logs(log_list, lines)
log_list_short_escaped = list(map(lambda x: html.escape(x), log_list_short))
logs_s = "<br>".join(log_list_short_escaped)
else:
logs_s = log_list.split()
logs_s = html.escape(logs_s)
return f"<details><summary>&nbsp&nbsp&nbsp&nbsp{summary}</summary>{logs_s}</details>"
return f"<details><summary>&nbsp&nbsp&nbsp&nbsp{summary}(click here to expand):</summary>{logs_s}</details>"
def download_file(job, file):
def download_file(self, job, file):
file_path = job.working_dir.stat(file)
file_size = file_path.properties["size"]
if file_size == 0:
......@@ -422,30 +460,30 @@ class UnicoreSpawner(ForwardBaseSpawner):
return s.data.decode()
async def unicore_stop_event(self):
stderr = download_file(job, "stderr")
stdout = download_file(job, "stdout")
now = datetime.datetime.now().strftime("%Y_%m_%d %H:%M:%S.%f")[:-3]
job = await self._get_job()
unicore_stderr = self.download_file(job, "stderr")
unicore_stdout = self.download_file(job, "stdout")
now = datetime.now().strftime("%Y_%m_%d %H:%M:%S.%f")[:-3]
summary = f"UNICORE Job stopped with exitCode: {job.properties.get('exitCode', 'unknown exitCode')}"
unicore_status_message = job_properties.get(
unicore_status_message = job.properties.get(
"statusMessage", "unknown statusMessage"
)
unicore_logs_details = _prettify_error_logs(
job_properties.get("log", []), 20, "UNICORE logs:"
unicore_logs_details = self._prettify_error_logs(
job.properties.get("log", []), 20, "UNICORE logs"
)
unicore_stdout_details = _prettify_error_logs(
unicore_stdout_details = self._prettify_error_logs(
unicore_stdout,
20,
"Job stdout:",
"Job stdout",
)
unicore_stderr_details = _prettify_error_logs(
unicore_stderr_details = self._prettify_error_logs(
unicore_stderr,
20,
"Job stderr:",
"Job stderr",
)
details = "".join(
......@@ -494,6 +532,14 @@ class UnicoreSpawner(ForwardBaseSpawner):
env[
"JUPYTERHUB_ACTIVITY_URL"
] = f"{env['JUPYTERHUB_API_URL'].rstrip('/')}/users/{self.user.name}/activity"
# Add URL to receive UNICORE status updates
url_parts = ["users", "progress", "updateunicore", self.user.escaped_name]
if self.name:
url_parts.append(self.name)
env[
"JUPYTERHUB_UNICORE_NOTIFICATION_URL"
] = f"{env['JUPYTERHUB_API_URL']}/{url_path_join(*url_parts)}"
return env
async def _start(self):
......@@ -542,28 +588,93 @@ class UnicoreSpawner(ForwardBaseSpawner):
unicore_job = self.timed_func_call(client.new_job, job_description)
self.resource_url = unicore_job.resource_url
# UNICORE/X supports port-forwarding for batch jobs, but not
# interactive jobs yet.
# Until it supports it for all jobs, we rely on the base class
# to create a port-forward process and add the correct return value.
# --
# if job_description.get("Job Type", "batch") in ["batch", "normal"]:
# from pyunicore.forwarder import open_tunnel
# sock = open_tunnel(unicore_job, self.port)
# return ("localhost", sock.getsockname()[1])
unicore_forwarding = await self.get_unicore_internal_forwarding()
if unicore_forwarding:
await self.run_ssh_forward()
return f"http://{self.svc_name}:{self.port}"
else:
return ""
async def ssh_default_forward(self):
unicore_forwarding = await self.get_unicore_internal_forwarding()
if unicore_forwarding:
unicore_job = await self._get_job()
while unicore_job.is_running():
if unicore_job.properties.get("status", "") != "RUNNING":
self.log.debug(f"{self._log_name} - Wait for JupyterLab ...")
await asyncio.sleep(5)
continue
# Download stderr to receive port + address
unicore_stderr = self.download_file(unicore_job, "stderr")
if type(unicore_stderr) == str:
unicore_stderr = unicore_stderr.split("\n")
log_line = [
x
for x in unicore_stderr
if f"/user/{self.user.escaped_name}/{url_escape_path(self.name)}/"
in x
]
if log_line:
log_line = log_line[0]
result = re.search("(http|https)://([^:]+):([^/]+)", log_line)
address = result.group(2)
port = result.group(3)
loop = asyncio.get_running_loop()
def run_forward():
while unicore_job.is_running():
try:
endpoint = unicore_job.links["forwarding"]
tr = unicore_job.transport._clone()
tr.use_security_sessions = False
self.forwarder = Forwarder(
tr,
endpoint,
service_port=int(port),
service_host=address,
debug=True,
)
self.forwarder.run(self.port)
except:
self.log.exception(
f"{self._log_name} - Could not start unicore forward"
)
time.sleep(2)
self.unicore_forwarder = loop.run_in_executor(None, run_forward)
self.log.info(
f"{self._log_name} - Unicore Forwarding created - {self.port}:{address}:{port}"
)
break
await asyncio.sleep(2)
else:
return await super().ssh_default_forward()
async def ssh_default_forward_remove(self):
unicore_forwarding = await self.get_unicore_internal_forwarding()
if unicore_forwarding:
unicore_forwarder = getattr(self, "unicore_forwarder", None)
if unicore_forwarder:
try:
unicore_forwarder.cancel()
await maybe_future(unicore_forwarder)
except asyncio.CancelledError:
pass
forwarder = getattr(self, "forwarder", None)
if forwarder:
forwarder.stop_forwarding()
else:
return await super().ssh_default_forward_remove()
async def _poll(self):
if not self.resource_url:
return 0
transport_kwargs = await self.get_unicore_transport_kwargs()
preferences = await self.get_unicore_transport_preferences()
job = await self._get_job()
try:
is_running = self.timed_func_call(job.is_running)
self.log.info(
self.log.debug(
f"{self._log_name} - Poll is running: {is_running} for {self.resource_url}"
)
except HTTPError as e:
......@@ -591,9 +702,20 @@ class UnicoreSpawner(ForwardBaseSpawner):
if not self.resource_url:
return
transport_kwargs = await self.get_unicore_transport_kwargs()
preferences = await self.get_unicore_transport_preferences()
job = await self._get_job()
unicore_stdout = self.download_file(job, "stdout")
unicore_stderr = self.download_file(job, "stderr")
unicore_logs = job.properties.get("log", [])
self.log.info(
f"{self._log_name} - Stop job. unicore log:\n{self.short_logs(unicore_logs, 20)}"
)
self.log.info(
f"{self._log_name} - Stop job. stdout:\n{self.short_logs(unicore_stdout, 20)}"
)
self.log.info(
f"{self._log_name} - Stop job. stderr:\n{self.short_logs(unicore_stderr, 20)}"
)
job.abort()
if self.unicore_job_delete:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment