Skip to content
Snippets Groups Projects
Commit 605fc6aa authored by Tim Kreuzer's avatar Tim Kreuzer
Browse files

add unicore notification handler

parent dbe62f68
Branches
No related tags found
No related merge requests found
from ._version import __version__ from ._version import __version__
from .spawner import UnicoreSpawner from .api_notifications import SpawnEventsUnicoreAPIHandler
from .spawner import UnicoreForwardSpawner from .spawner import UnicoreForwardSpawner
from .spawner import UnicoreSpawner
import asyncio
import datetime
import json
import jwt
import requests
from cryptography.hazmat.backends import default_backend
from cryptography.x509 import load_pem_x509_certificate
from jupyterhub.apihandlers import default_handlers
from jupyterhub.apihandlers.base import APIHandler
from tornado.httpclient import HTTPRequest
from unicorespawner import utils
class SpawnEventsUnicoreAPIHandler(APIHandler):
def check_xsrf_cookie(self):
pass
async def post(self, user_name, server_name=""):
user = self.find_user(user_name)
if user is None:
self.set_status(404)
return
if server_name not in user.spawners:
self.set_status(404)
return
spawner = user.spawners[server_name]
cert_path = await spawner.get_unicore_cert_path()
cert_url = await spawner.get_unicore_cert_url()
with requests.get(
cert_url, headers={"accept": "text/plain"}, verify=cert_path
) as r:
r.raise_for_status()
cert = r.content
# Validate certifica
cert_obj = load_pem_x509_certificate(cert, default_backend())
token = self.request.headers.get("Authorization", "Bearer -").split()[1]
jwt.decode(token, cert_obj.public_key(), algorithms=["RS256"])
body = self.request.body.decode("utf8")
body = json.loads(body) if body else {}
self.log.info(
f"{spawner._log_name} - Unicore Status Update received",
extra={
"uuidcode": spawner.name,
"username": user.name,
"userid": user.id,
"action": "unicoreupdate",
"body": body,
},
)
if body.get("status", "") in ["FAILED", "SUCCESSFUL", "DONE"]:
# spawner.poll will check the current status via UnicoreMgr.
# This will download the logs and show them to the user.
# It will also cancel the current spawn attempt.
self.log.debug(
f"{spawner._log_name} - Cancel spawner",
extra={
"uuidcode": spawner.name,
"username": user.name,
"userid": user.id,
},
)
if bool(spawner._spawn_pending or spawner.ready):
asyncio.create_task(
spawner.stop(cancel=True, event=spawner.unicore_stop_event)
)
else:
bssStatus = body.get("bssStatus", "")
# It's in Running (UNICORE wise) state. We can now check for bssStatus to get more details
for key, bssDetails in spawner.bss_notification_config.items():
if key == bssStatus:
now = datetime.datetime.now().strftime("%Y_%m_%d %H:%M:%S.%f")[:-3]
summary = bssDetails.get("summary", f"Slurm status: {key}")
details = bssDetails.get(
"details",
"You'll receive more information, when your slurm job proceeds.",
)
progress = int(bssDetails.get("progress", 35))
event = {
"failed": False,
"progress": progress,
"html_message": f"<details><summary>{now}: {summary}</summary>{details}</details>",
}
spawner.latest_events.append(event)
self.set_status(200)
default_handlers.append(
(r"/api/users/progress/updateunicore/([^/]+)", SpawnEventsUNICOREAPIHandler)
)
default_handlers.append(
(r"/api/users/progress/updateunicore/([^/]+)/([^/]+)", SpawnEventsUNICOREAPIHandler)
)
...@@ -2,9 +2,7 @@ import copy ...@@ -2,9 +2,7 @@ import copy
import json import json
import time import time
import pyunicore.client as pyunicore
from forwardbasespawner import ForwardBaseSpawner from forwardbasespawner import ForwardBaseSpawner
from jupyterhub.spawner import Spawner
from jupyterhub.utils import maybe_future from jupyterhub.utils import maybe_future
from jupyterhub.utils import url_path_join from jupyterhub.utils import url_path_join
from requests.exceptions import HTTPError from requests.exceptions import HTTPError
...@@ -15,7 +13,7 @@ from traitlets import Integer ...@@ -15,7 +13,7 @@ from traitlets import Integer
from traitlets import Unicode from traitlets import Unicode
class UnicoreSpawner(Spawner): class UnicoreSpawner(ForwardBaseSpawner):
job_descriptions = Dict( job_descriptions = Dict(
config=True, config=True,
help=""" help="""
...@@ -151,6 +149,82 @@ class UnicoreSpawner(Spawner): ...@@ -151,6 +149,82 @@ class UnicoreSpawner(Spawner):
download_path = self.download_path download_path = self.download_path
return download_path return download_path
async def get_unicore_cert_path(self):
"""Get unicore cert path
Returns:
path (string or false): Used in Unicore communication
"""
if callable(self.unicore_cert_path):
unicore_cert_path = await maybe_future(self.unicore_cert_path(self))
else:
unicore_cert_path = self.unicore_cert_path
return unicore_cert_path
unicore_cert_path = Any(
config=True,
default_value=False,
help="""
UNICORE ca. Used in communication with Unicore.
String, False or Callable
Example::
async def unicore_cert_path(spawner):
if spawner.user_options["system"][0] == "abc":
return "/mnt/certs/geant.crt"
c.UnicoreSpawner.unicore_cert_path = unicore_cert_path
""",
)
async def get_unicore_cert_path(self):
"""Get unicore cert path
Returns:
path (string or false): Used in Unicore communication
"""
if callable(self.unicore_cert_path):
unicore_cert_path = await maybe_future(self.unicore_cert_path(self))
else:
unicore_cert_path = self.unicore_cert_path
return unicore_cert_path
unicore_cert_url = Any(
config=True,
default_value=False,
help="""
UNICORE certificate url. Used for verficiation with Unicore notifications.
String or Callable
default: f"{self.unicore_resource_url}/certificate
Example::
async def unicore_cert_url(spawner):
site_url = await spawner.get_unicore_site_url()
return f"{site_url}/certificate"
c.UnicoreSpawner.unicore_cert_url = unicore_cert_url
""",
)
async def get_unicore_cert_url(self):
"""Get unicore cert url
Returns:
path (string): Used in verification with Unicore notification
"""
if callable(self.unicore_cert_url):
unicore_cert_url = await maybe_future(self.unicore_cert_url(self))
elif self.unicore_cert_url:
unicore_cert_url = self.unicore_cert_url
else:
site_url = await self.get_unicore_site_url()
unicore_cert_url = f"{site_url.rstrip('/')}/certificate"
return unicore_cert_url
unicore_site_url = Any( unicore_site_url = Any(
config=True, config=True,
help=""" help="""
...@@ -187,6 +261,25 @@ class UnicoreSpawner(Spawner): ...@@ -187,6 +261,25 @@ class UnicoreSpawner(Spawner):
""", """,
) )
get_bss_notification_config = Dict(
config=True,
default_value={
"PENDING": {
"progress": 33,
"summary": "Your slurm job is currently in status PENDING.",
"details": "Job is awaiting resource allocation.",
},
"CONFIGURING": {
"progress": 35,
"summary": "Your slurm job is currently in status CONFIGURING. This may take up to 7 minutes.",
"details": "Job has been allocated resources, but are waiting for them to become ready for use (e.g. booting).",
},
},
help="""
Configure the events shown, when UNICORE gives an bss status update to api_notifications handler.
""",
)
download_max_bytes = Integer( download_max_bytes = Integer(
config=True, config=True,
default_value=4096, default_value=4096,
...@@ -268,21 +361,16 @@ class UnicoreSpawner(Spawner): ...@@ -268,21 +361,16 @@ class UnicoreSpawner(Spawner):
preferences = self.unicore_transport_preferences preferences = self.unicore_transport_preferences
return preferences return preferences
def get_string(self, value):
if type(value) != list:
value = [value]
if len(value) == 0:
return ""
else:
return str(value[0])
def timed_func_call(self, func, *args, **kwargs): def timed_func_call(self, func, *args, **kwargs):
tic = time.time() tic = time.time()
try: try:
ret = func(*args, **kwargs) ret = func(*args, **kwargs)
finally: finally:
toc = time.time() - tic toc = time.time() - tic
extra = {"tictoc": f"{func.__module__},{func.__name__}", "duration": toc} extra = {
"tictoc": f"{func.__module__},{func.__name__}",
"duration": toc,
}
self.log.debug( self.log.debug(
f"{self._log_name} - UNICORE communication", f"{self._log_name} - UNICORE communication",
extra=extra, extra=extra,
...@@ -291,16 +379,17 @@ class UnicoreSpawner(Spawner): ...@@ -291,16 +379,17 @@ class UnicoreSpawner(Spawner):
async def _get_transport(self): async def _get_transport(self):
transport_kwargs = await self.get_unicore_transport_kwargs() transport_kwargs = await self.get_unicore_transport_kwargs()
transport = self.timed_func_call(pyunicore.Transport, **transport_kwargs)
preferences = await self.get_unicore_transport_preferences() preferences = await self.get_unicore_transport_preferences()
transport = self.timed_func_call(pyunicore.Transport, **transport_kwargs)
if preferences: if preferences:
transport.preferences = preferences transport.preferences = preferences
return transport return transport
async def _get_client(self): async def _get_client(self):
site_url = await self.get_unicore_site_url()
transport = await self._get_transport() transport = await self._get_transport()
url = await self.get_unicore_site_url() client = self.timed_func_call(pyunicore.Client, transport, site_url)
client = self.timed_func_call(pyunicore.Client, transport, url)
return client return client
async def _get_job(self): async def _get_job(self):
...@@ -308,6 +397,83 @@ class UnicoreSpawner(Spawner): ...@@ -308,6 +397,83 @@ class UnicoreSpawner(Spawner):
job = self.timed_func_call(pyunicore.Job, transport, self.resource_url) job = self.timed_func_call(pyunicore.Job, transport, self.resource_url)
return job return job
def _prettify_error_logs(log_list, lines, summary):
if type(log_list) == str:
log_list = log_list.split("\n")
if type(log_list) == list:
if lines > 0:
log_list_short = log_list[-lines:]
if lines < len(log_list):
log_list_short.insert(0, "...")
log_list_short_escaped = list(
map(lambda x: html.escape(x), log_list_short)
)
logs_s = "<br>".join(log_list_short_escaped)
else:
logs_s = log_list.split()
logs_s = html.escape(logs_s)
return f"<details><summary>&nbsp&nbsp&nbsp&nbsp{summary}</summary>{logs_s}</details>"
def download_file(job, file):
file_path = job.working_dir.stat(file)
file_size = file_path.properties["size"]
if file_size == 0:
return f"{file} is empty"
offset = max(0, file_size - self.download_max_bytes)
s = file_path.raw(offset=offset)
return s.data.decode()
async def unicore_stop_event(self):
stderr = download_file(job, "stderr")
stdout = download_file(job, "stdout")
now = datetime.datetime.now().strftime("%Y_%m_%d %H:%M:%S.%f")[:-3]
job = await self._get_job()
summary = f"UNICORE Job stopped with exitCode: {job.properties.get('exitCode', 'unknown exitCode')}"
unicore_status_message = job_properties.get(
"statusMessage", "unknown statusMessage"
)
unicore_logs_details = _prettify_error_logs(
job_properties.get("log", []), 20, "UNICORE logs:"
)
unicore_stdout_details = _prettify_error_logs(
unicore_stdout,
20,
"Job stdout:",
)
unicore_stderr_details = _prettify_error_logs(
unicore_stderr,
20,
"Job stderr:",
)
details = "".join(
[
unicore_status_message,
unicore_logs_details,
unicore_stdout_details,
unicore_stderr_details,
]
)
event = {
"failed": True,
"progress": 100,
"html_message": f"<details><summary>{now}: {summary}</summary>{details}</details>",
}
return event
def get_string(self, value):
if type(value) != list:
value = [value]
if len(value) == 0:
return ""
else:
return str(value[0])
def clear_state(self): def clear_state(self):
super().clear_state() super().clear_state()
self.resource_url = "" self.resource_url = ""
...@@ -332,9 +498,6 @@ class UnicoreSpawner(Spawner): ...@@ -332,9 +498,6 @@ class UnicoreSpawner(Spawner):
] = f"{env['JUPYTERHUB_API_URL'].rstrip('/')}/users/{self.user.name}/activity" ] = f"{env['JUPYTERHUB_API_URL'].rstrip('/')}/users/{self.user.name}/activity"
return env return env
def start(self):
return super().start()
async def _start(self): async def _start(self):
job = self.get_string(self.user_options.get("job", ["default"])) job = self.get_string(self.user_options.get("job", ["default"]))
job_description = self.job_descriptions[job] job_description = self.job_descriptions[job]
...@@ -393,13 +556,12 @@ class UnicoreSpawner(Spawner): ...@@ -393,13 +556,12 @@ class UnicoreSpawner(Spawner):
return "" return ""
async def poll(self):
return await super().poll()
async def _poll(self): async def _poll(self):
if not self.resource_url: if not self.resource_url:
return 0 return 0
transport_kwargs = await self.get_unicore_transport_kwargs()
preferences = await self.get_unicore_transport_preferences()
job = await self._get_job() job = await self._get_job()
try: try:
is_running = self.timed_func_call(job.is_running) is_running = self.timed_func_call(job.is_running)
...@@ -427,38 +589,14 @@ class UnicoreSpawner(Spawner): ...@@ -427,38 +589,14 @@ class UnicoreSpawner(Spawner):
else: else:
return 0 return 0
def download_file(self, job, file):
file_path = job.working_dir.stat(file)
file_size = file_path.properties["size"]
if file_size == 0:
return f"{file} is empty"
offset = max(0, file_size - self.download_max_bytes)
s = file_path.raw(offset=offset)
return s.data.decode()
async def stop(self, now, **kwargs):
return await super().stop(now, **kwargs)
async def _stop(self, now, **kwargs): async def _stop(self, now, **kwargs):
if not self.resource_url: if not self.resource_url:
return return
transport_kwargs = await self.get_unicore_transport_kwargs()
preferences = await self.get_unicore_transport_preferences()
job = await self._get_job() job = await self._get_job()
job.abort() job.abort()
stderr = self.download_file(job, "stderr")
stdout = self.download_file(job, "stdout")
self.log.info(f"{self._log_name} - Stop stderr:\n{stderr}")
self.log.info(f"{self._log_name} - Stop stdout:\n{stdout}")
if self.unicore_job_delete: if self.unicore_job_delete:
job.delete() job.delete()
class UnicoreForwardSpawner(UnicoreSpawner, ForwardBaseSpawner):
async def start(self):
return await ForwardBaseSpawner.start(self)
async def poll(self):
return await ForwardBaseSpawner.poll(self)
async def stop(self, now=False, **kwargs):
return await ForwardBaseSpawner.stop(self, now=now, **kwargs)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment