Skip to content
Snippets Groups Projects
Commit 549e0c59 authored by Tim Kreuzer's avatar Tim Kreuzer
Browse files

first attempt to support spawn progress messages in global sse

parent 98b9c2d3
Branches unifiedsse
No related tags found
No related merge requests found
...@@ -11,11 +11,13 @@ from tornado import web ...@@ -11,11 +11,13 @@ from tornado import web
from tornado.iostream import StreamClosedError from tornado.iostream import StreamClosedError
from .utils import get_general_spawn_event from .utils import get_general_spawn_event
from .utils import get_user_progress_event
class UserSpawnNotificationAPIHandler(APIHandler): class UserSpawnNotificationAPIHandler(APIHandler):
"""EventStream handler for active spawns for a specific user""" """EventStream handler for active spawns for a specific user"""
collecting_progresses = []
keepalive_interval = 8 keepalive_interval = 8
def get_content_type(self): def get_content_type(self):
...@@ -52,6 +54,128 @@ class UserSpawnNotificationAPIHandler(APIHandler): ...@@ -52,6 +54,128 @@ class UserSpawnNotificationAPIHandler(APIHandler):
await asyncio.wait([self._finish_future], timeout=self.keepalive_interval) await asyncio.wait([self._finish_future], timeout=self.keepalive_interval)
async def collect_progress_events(self, user, server_name):
try:
spawner = user.spawners[server_name]
failed_event = {"progress": 100, "failed": True, "message": "Spawn failed"}
async def get_ready_event():
url = url_path_join(user.url, url_escape_path(server_name), "/")
ready_event = {
"progress": 100,
"ready": True,
"message": f"Server ready at {url}",
"html_message": 'Server ready at <a href="{0}">{0}</a>'.format(url),
"url": url,
}
original_ready_event = ready_event.copy()
if spawner.progress_ready_hook:
try:
ready_event = spawner.progress_ready_hook(spawner, ready_event)
if inspect.isawaitable(ready_event):
ready_event = await ready_event
except Exception as e:
self.log.exception(f"Error in ready_event hook: {e}")
ready_event = original_ready_event
return ready_event
if spawner.ready:
# spawner already ready. Trigger progress-completion immediately
self.log.info("Server %s is already started", spawner._log_name)
ready_event = await get_ready_event()
await self.send_event(
{"progress_update": True, server_name: ready_event}
)
return
spawn_future = spawner._spawn_future
if not spawner._spawn_pending:
# not pending, no progress to fetch
# check if spawner has just failed
f = spawn_future
if f and f.done() and f.exception():
exc = f.exception()
message = getattr(exc, "jupyterhub_message", str(exc))
failed_event["message"] = f"Spawn failed: {message}"
html_message = getattr(exc, "jupyterhub_html_message", "")
if html_message:
failed_event["html_message"] = html_message
await self.send_event(
{"progress_update": True, server_name: failed_event}
)
return
else:
raise web.HTTPError(400, "%s is not starting...", spawner._log_name)
# retrieve progress events from the Spawner
async with aclosing(
iterate_until(spawn_future, spawner._generate_progress())
) as events:
try:
async for event in events:
# don't allow events to sneakily set the 'ready' flag
if "ready" in event:
event.pop("ready", None)
await self.send_event(
{"progress_update": True, server_name: event}
)
except asyncio.CancelledError:
pass
# progress finished, wait for spawn to actually resolve,
# in case progress finished early
# (ignore errors, which will be logged elsewhere)
await asyncio.wait([spawn_future])
# progress and spawn finished, check if spawn succeeded
if spawner.ready:
# spawner is ready, signal completion and redirect
self.log.info("Server %s is ready", spawner._log_name)
ready_event = await get_ready_event()
await self.send_event(
{"progress_update": True, server_name: ready_event}
)
else:
# what happened? Maybe spawn failed?
f = spawn_future
if f and f.done() and f.exception():
exc = f.exception()
message = getattr(exc, "jupyterhub_message", str(exc))
failed_event["message"] = f"Spawn failed: {message}"
html_message = getattr(exc, "jupyterhub_html_message", "")
if html_message:
failed_event["html_message"] = html_message
else:
self.log.warning(
"Server %s didn't start for unknown reason", spawner._log_name
)
await self.send_event(
{"progress_update": True, server_name: failed_event}
)
finally:
if server_name in self.collecting_progresses:
self.collecting_progresses.remove(server_name)
async def send_progress_events(self, user):
progress_event = get_user_progress_event(user.orm_user.id)
while True:
# Collect all spawning labs at least once. If SSE is requested while something
# is already spawning, it would miss it otherwise.
spawning = [s.name for s in user.spawners.values() if s.pending == "spawn"]
for server_name in s.name:
if server_name not in self.collecting_progresses:
self.collecting_progress.append(server_name)
asyncio.ensure_future(
self.collect_progress_events(user, server_name)
)
if progress_event.is_set():
progress_event.clear()
# Wait until a Lab is spawning. If that's the case send the progress events
# to the frontend
await progress_event.wait()
async def get_event_data(self, user): async def get_event_data(self, user):
if user is None: if user is None:
return {} return {}
...@@ -59,6 +183,7 @@ class UserSpawnNotificationAPIHandler(APIHandler): ...@@ -59,6 +183,7 @@ class UserSpawnNotificationAPIHandler(APIHandler):
spawners = user.spawners.values() spawners = user.spawners.values()
event_data = { event_data = {
# Set active spawners as event data # Set active spawners as event data
"progress_update": False,
"spawning": [s.name for s in spawners if s.pending == "spawn"], "spawning": [s.name for s in spawners if s.pending == "spawn"],
"stopping": [s.name for s in spawners if s.pending == "stop"], "stopping": [s.name for s in spawners if s.pending == "stop"],
"active": [s.name for s in spawners if s.active], "active": [s.name for s in spawners if s.active],
...@@ -100,6 +225,8 @@ class UserSpawnNotificationAPIHandler(APIHandler): ...@@ -100,6 +225,8 @@ class UserSpawnNotificationAPIHandler(APIHandler):
# start sending keepalive to avoid proxies closing the connection # start sending keepalive to avoid proxies closing the connection
asyncio.ensure_future(self.stop_after_n_seconds()) asyncio.ensure_future(self.stop_after_n_seconds())
asyncio.ensure_future(self.keepalive()) asyncio.ensure_future(self.keepalive())
if user:
asyncio.ensure_future(self.send_progress_events(user))
async with aclosing( async with aclosing(
iterate_until(self._generator_future, self.event_generator_wrap(user)) iterate_until(self._generator_future, self.event_generator_wrap(user))
......
...@@ -17,6 +17,7 @@ from ..apihandler.user_count import get_user_count ...@@ -17,6 +17,7 @@ from ..apihandler.user_count import get_user_count
from ..misc import get_custom_config from ..misc import get_custom_config
from ..misc import get_incidents from ..misc import get_incidents
from .utils import get_general_spawn_event from .utils import get_general_spawn_event
from .utils import get_user_progress_event
class CustomJSCSpawner(OutpostSpawner, UnicoreSpawner): class CustomJSCSpawner(OutpostSpawner, UnicoreSpawner):
...@@ -443,6 +444,15 @@ def custom_port(spawner, user_options): ...@@ -443,6 +444,15 @@ def custom_port(spawner, user_options):
async def pre_spawn_hook(spawner): async def pre_spawn_hook(spawner):
user_progress_event = get_user_progress_event(spawner.user.orm_user.id)
while user_progress_event.is_set():
spawner.log.debug(f"{spawner.name} - Wait before SSE event is clear")
await asyncio.sleep(1)
# Inform API Notifications SSE, that we're starting a new service
# Waiting until it's clear, otherwise a race condition might lead to
# missing progress_events
user_progress_event.set()
custom_config = get_custom_config() custom_config = get_custom_config()
service, version = spawner.user_options.get("profile", "").split("/") service, version = spawner.user_options.get("profile", "").split("/")
system = spawner.user_options.get("system", "") system = spawner.user_options.get("system", "")
...@@ -473,7 +483,7 @@ async def pre_spawn_hook(spawner): ...@@ -473,7 +483,7 @@ async def pre_spawn_hook(spawner):
pass pass
def post_spawn_request_hook(spawner, resp_json): async def post_spawn_request_hook(spawner, resp_json):
db = inspect(spawner.user.orm_user).session db = inspect(spawner.user.orm_user).session
get_user_count(db, force=True) get_user_count(db, force=True)
spawn_event = get_general_spawn_event() spawn_event = get_general_spawn_event()
......
...@@ -3,6 +3,14 @@ import asyncio ...@@ -3,6 +3,14 @@ import asyncio
from ..misc import get_custom_config from ..misc import get_custom_config
_general_spawn_event = asyncio.Event() _general_spawn_event = asyncio.Event()
_user_progress_events = {}
def get_user_progress_event(user_id):
global _user_progress_events
if not user_id in _user_progress_events.keys():
_user_progress_events[user_id] = asyncio.Event()
return _user_progress_events[user_id]
def get_general_spawn_event(): def get_general_spawn_event():
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment