diff --git a/jsc_custom/spawner/api_notifications_sse.py b/jsc_custom/spawner/api_notifications_sse.py index 25c85a3ec0a493cfc5270e082c63242ffa2ba580..170aea4e2561aa7dfca993ccaa8117f22c236932 100644 --- a/jsc_custom/spawner/api_notifications_sse.py +++ b/jsc_custom/spawner/api_notifications_sse.py @@ -11,11 +11,13 @@ from tornado import web from tornado.iostream import StreamClosedError from .utils import get_general_spawn_event +from .utils import get_user_progress_event class UserSpawnNotificationAPIHandler(APIHandler): """EventStream handler for active spawns for a specific user""" + collecting_progresses = [] keepalive_interval = 8 def get_content_type(self): @@ -52,6 +54,128 @@ class UserSpawnNotificationAPIHandler(APIHandler): await asyncio.wait([self._finish_future], timeout=self.keepalive_interval) + async def collect_progress_events(self, user, server_name): + try: + spawner = user.spawners[server_name] + failed_event = {"progress": 100, "failed": True, "message": "Spawn failed"} + + async def get_ready_event(): + url = url_path_join(user.url, url_escape_path(server_name), "/") + ready_event = { + "progress": 100, + "ready": True, + "message": f"Server ready at {url}", + "html_message": 'Server ready at <a href="{0}">{0}</a>'.format(url), + "url": url, + } + original_ready_event = ready_event.copy() + if spawner.progress_ready_hook: + try: + ready_event = spawner.progress_ready_hook(spawner, ready_event) + if inspect.isawaitable(ready_event): + ready_event = await ready_event + except Exception as e: + self.log.exception(f"Error in ready_event hook: {e}") + ready_event = original_ready_event + return ready_event + + if spawner.ready: + # spawner already ready. Trigger progress-completion immediately + self.log.info("Server %s is already started", spawner._log_name) + ready_event = await get_ready_event() + await self.send_event( + {"progress_update": True, server_name: ready_event} + ) + return + + spawn_future = spawner._spawn_future + + if not spawner._spawn_pending: + # not pending, no progress to fetch + # check if spawner has just failed + f = spawn_future + if f and f.done() and f.exception(): + exc = f.exception() + message = getattr(exc, "jupyterhub_message", str(exc)) + failed_event["message"] = f"Spawn failed: {message}" + html_message = getattr(exc, "jupyterhub_html_message", "") + if html_message: + failed_event["html_message"] = html_message + await self.send_event( + {"progress_update": True, server_name: failed_event} + ) + return + else: + raise web.HTTPError(400, "%s is not starting...", spawner._log_name) + + # retrieve progress events from the Spawner + async with aclosing( + iterate_until(spawn_future, spawner._generate_progress()) + ) as events: + try: + async for event in events: + # don't allow events to sneakily set the 'ready' flag + if "ready" in event: + event.pop("ready", None) + await self.send_event( + {"progress_update": True, server_name: event} + ) + except asyncio.CancelledError: + pass + + # progress finished, wait for spawn to actually resolve, + # in case progress finished early + # (ignore errors, which will be logged elsewhere) + await asyncio.wait([spawn_future]) + + # progress and spawn finished, check if spawn succeeded + if spawner.ready: + # spawner is ready, signal completion and redirect + self.log.info("Server %s is ready", spawner._log_name) + ready_event = await get_ready_event() + await self.send_event( + {"progress_update": True, server_name: ready_event} + ) + else: + # what happened? Maybe spawn failed? + f = spawn_future + if f and f.done() and f.exception(): + exc = f.exception() + message = getattr(exc, "jupyterhub_message", str(exc)) + failed_event["message"] = f"Spawn failed: {message}" + html_message = getattr(exc, "jupyterhub_html_message", "") + if html_message: + failed_event["html_message"] = html_message + else: + self.log.warning( + "Server %s didn't start for unknown reason", spawner._log_name + ) + await self.send_event( + {"progress_update": True, server_name: failed_event} + ) + finally: + if server_name in self.collecting_progresses: + self.collecting_progresses.remove(server_name) + + async def send_progress_events(self, user): + progress_event = get_user_progress_event(user.orm_user.id) + while True: + # Collect all spawning labs at least once. If SSE is requested while something + # is already spawning, it would miss it otherwise. + spawning = [s.name for s in user.spawners.values() if s.pending == "spawn"] + for server_name in s.name: + if server_name not in self.collecting_progresses: + self.collecting_progress.append(server_name) + asyncio.ensure_future( + self.collect_progress_events(user, server_name) + ) + + if progress_event.is_set(): + progress_event.clear() + # Wait until a Lab is spawning. If that's the case send the progress events + # to the frontend + await progress_event.wait() + async def get_event_data(self, user): if user is None: return {} @@ -59,6 +183,7 @@ class UserSpawnNotificationAPIHandler(APIHandler): spawners = user.spawners.values() event_data = { # Set active spawners as event data + "progress_update": False, "spawning": [s.name for s in spawners if s.pending == "spawn"], "stopping": [s.name for s in spawners if s.pending == "stop"], "active": [s.name for s in spawners if s.active], @@ -100,6 +225,8 @@ class UserSpawnNotificationAPIHandler(APIHandler): # start sending keepalive to avoid proxies closing the connection asyncio.ensure_future(self.stop_after_n_seconds()) asyncio.ensure_future(self.keepalive()) + if user: + asyncio.ensure_future(self.send_progress_events(user)) async with aclosing( iterate_until(self._generator_future, self.event_generator_wrap(user)) diff --git a/jsc_custom/spawner/spawner.py b/jsc_custom/spawner/spawner.py index 95fd901d794acefb763198c898eef2f26c630aff..82bb91ae0d03faeab911903778902611a02118c4 100644 --- a/jsc_custom/spawner/spawner.py +++ b/jsc_custom/spawner/spawner.py @@ -17,6 +17,7 @@ from ..apihandler.user_count import get_user_count from ..misc import get_custom_config from ..misc import get_incidents from .utils import get_general_spawn_event +from .utils import get_user_progress_event class CustomJSCSpawner(OutpostSpawner, UnicoreSpawner): @@ -443,6 +444,15 @@ def custom_port(spawner, user_options): async def pre_spawn_hook(spawner): + user_progress_event = get_user_progress_event(spawner.user.orm_user.id) + while user_progress_event.is_set(): + spawner.log.debug(f"{spawner.name} - Wait before SSE event is clear") + await asyncio.sleep(1) + # Inform API Notifications SSE, that we're starting a new service + # Waiting until it's clear, otherwise a race condition might lead to + # missing progress_events + user_progress_event.set() + custom_config = get_custom_config() service, version = spawner.user_options.get("profile", "").split("/") system = spawner.user_options.get("system", "") @@ -473,7 +483,7 @@ async def pre_spawn_hook(spawner): pass -def post_spawn_request_hook(spawner, resp_json): +async def post_spawn_request_hook(spawner, resp_json): db = inspect(spawner.user.orm_user).session get_user_count(db, force=True) spawn_event = get_general_spawn_event() diff --git a/jsc_custom/spawner/utils.py b/jsc_custom/spawner/utils.py index 201a673785a92173a3318e392b9f68925275b888..b8eb5fd274811f484858c4e8815e49b43f6bd506 100644 --- a/jsc_custom/spawner/utils.py +++ b/jsc_custom/spawner/utils.py @@ -3,6 +3,14 @@ import asyncio from ..misc import get_custom_config _general_spawn_event = asyncio.Event() +_user_progress_events = {} + + +def get_user_progress_event(user_id): + global _user_progress_events + if not user_id in _user_progress_events.keys(): + _user_progress_events[user_id] = asyncio.Event() + return _user_progress_events[user_id] def get_general_spawn_event():