diff --git a/pyproject.toml b/pyproject.toml index b4fe51096406330e060b00cdbfb4805a38147bab..e9ac4be785a510ce75110bcd736ed98a4fa7b081 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ dependencies = [ "jupyterhub>=4.0.0", "traitlets", "urllib3", + "jupyterhub-forwardbasespawner", "pyunicore" ] dynamic = ["version"] diff --git a/unicorespawner/__init__.py b/unicorespawner/__init__.py index 7a75c9ece1d1429b770932f5c8cb57c29e8e0ba0..6fbb13685a528af49c83e0cb9cd3fc0b642fce8d 100644 --- a/unicorespawner/__init__.py +++ b/unicorespawner/__init__.py @@ -1,2 +1,3 @@ from ._version import __version__ from .spawner import UnicoreSpawner +from .spawner import UnicoreForwardSpawner diff --git a/unicorespawner/spawner.py b/unicorespawner/spawner.py index 64b225c2467b5231108df130ac84c206f82e4e24..4d69b06b3a9a1cb4d355059aa6036a611b27a34d 100644 --- a/unicorespawner/spawner.py +++ b/unicorespawner/spawner.py @@ -1,15 +1,20 @@ from jupyterhub.utils import maybe_future from jupyterhub.utils import url_path_join +from jupyterhub.spawner import Spawner +import copy import pyunicore.client as pyunicore import time import json from traitlets import Any +from traitlets import Bool from traitlets import Dict from traitlets import Integer from traitlets import Unicode +from .forward import ForwardBaseSpawner + class UnicoreSpawner(Spawner): job_descriptions = Dict( config=True, @@ -20,7 +25,7 @@ class UnicoreSpawner(Spawner): to get one of the defined jobs. Otherwise the job with key `default` will be used. - Replacable variables can be added with curvy brackets in + Replacable variables can be added with angle brackets (chevrons) in the job_description. UnicoreSpawner will replace these variables with their actual value. Replacable keys are: @@ -114,7 +119,7 @@ class UnicoreSpawner(Spawner): additional_replacements = self.additional_replacements return additional_replacements - unicore_job_delete = Boolean( + unicore_job_delete = Bool( config=True, default_value = True, help=""" @@ -144,38 +149,6 @@ class UnicoreSpawner(Spawner): download_path = self.download_path return download_path - start_server_location = Any( - config=True, - help=""" - Unicore Batch jobs will be port-forwarded automatically, if - `UnicoreSpawner.automatic_port_forwarding` is true. - If this is false (default), or it's an interactive job, you have to - use `UnicoreSpawner.start_server_location` to define, where - JupyterHub should look for the single-user server. - - Example:: - - async def start_server_location(spawner): - # do other thing - return "http://custom-svc.default.svc:8080" - - c.UnicoreSpawner.start_server_location = start_server_location - """ - ) - - async def get_start_server_location(self): - """Get start_server_location - - Returns: - start_server_location (string): Used in Spawner.start - """ - - if callable(self.start_wait_for_job): - start_wait_for_job = await maybe_future(self.start_wait_for_job(self)) - else: - start_wait_for_job = self.start_wait_for_job - return start_wait_for_job - unicore_site_url = Any( config=True, help=""" @@ -191,14 +164,6 @@ class UnicoreSpawner(Spawner): """ ) - public_api_url = Unicode( - config=True, - default_value="", - help=""" - Public JupyterHub API Url. - """ - ) - async def get_unicore_site_url(self): """Get unicore site url @@ -351,13 +316,12 @@ class UnicoreSpawner(Spawner): return state def load_state(self, state): + super().load_state(state) if 'resource_url' in state: self.resource_url = state['resource_url'] def get_env(self): env = super().get_env() - env["PORT"] = self.port - env["JUPYTERHUB_SERVER_NAME"] = self._log_name if self.public_api_url: env["JUPYTERHUB_API_URL"] = self.public_api_url @@ -365,16 +329,9 @@ class UnicoreSpawner(Spawner): "JUPYTERHUB_ACTIVITY_URL" ] = f"{env['JUPYTERHUB_API_URL'].rstrip('/')}/users/{self.user.name}/activity" return env - - def start(self): - """Thin wrapper around self._start - - so we can hold onto a reference for the Future - start returns, which we can use to terminate - .progress() - """ - self._start_future = asyncio.ensure_future(self._start()) - return self._start_future + + def start(self): + return super().start() async def _start(self): job = self.get_string(self.user_options.get("job", ["default"])) @@ -386,33 +343,54 @@ class UnicoreSpawner(Spawner): env = self.get_env() job_description = json.dumps(job_description) for key, value in self.user_options.items(): - job_description = job_description.replace(f"{{{key}}}", self.get_string(value).replace('"', '\\"')) + job_description = job_description.replace(f"<{key}>", self.get_string(value).replace('"', '\\"')) for key, value in env.items(): if type(value) == int: - job_description = job_description.replace(f"{{{key}}}", str(value).replace('"', '\\"')) + job_description = job_description.replace(f"<{key}>", str(value).replace('"', '\\"')) else: - job_description = job_description.replace(f"{{{key}}}", value.replace('"', '\\"')) + job_description = job_description.replace(f"<{key}>", value.replace('"', '\\"')) additional_replacements = await self.get_additional_replacements() for ukey, _uvalue in self.user_options.items(): uvalue = self.get_string(_uvalue) for key, value in additional_replacements.get(ukey, {}).get(uvalue, {}).items(): - job_description = job_description.replace(f"{{key}}", value) + job_description = job_description.replace(f"<{key}>", value) job_description = json.loads(job_description) + jd_env = job_description.get("Environment", {}).copy() + + # Remove keys that might disturb new JupyterLabs (like PATH, PYTHONPATH) + for key in set(env.keys()): + if not (key.startswith("JUPYTER_") or key.startswith("JUPYTERHUB_")): + self.log.info(f"Remove {key} from env") + del env[key] + jd_env.update(env) + job_description["Environment"] = jd_env + client = await self._get_client() unicore_job = self.timed_func_call(client.new_job, job_description) self.resource_url = unicore_job.resource_url - if job_description.get("Job Type", "batch") in ["batch", "normal"]: - from pyunicore.forwarder import open_tunnel - sock = open_tunnel(unicore_job, self.port) - return ("localhost", sock.getsockname()[1]) + # UNICORE/X supports port-forwarding for batch jobs, but not + # interactive jobs yet. + # Until it supports it for all jobs, we rely on the base class + # to create a port-forward process and add the correct return value. + # -- + # if job_description.get("Job Type", "batch") in ["batch", "normal"]: + # from pyunicore.forwarder import open_tunnel + # sock = open_tunnel(unicore_job, self.port) + # return ("localhost", sock.getsockname()[1]) - return await self.get_start_server_location() + return "" async def poll(self): + return await super().poll() + + async def _poll(self): + if not self.resource_url: + return 0 + job = await self._get_job() is_running = self.timed_func_call(job.is_running) if is_running: @@ -431,6 +409,12 @@ class UnicoreSpawner(Spawner): return s.data.decode() async def stop(self, now, **kwargs): + return await super().stop(now, **kwargs) + + async def _stop(self, now, **kwargs): + if not self.resource_url: + return + job = await self._get_job() job.abort() stderr = self.download_file(job, "stderr") @@ -439,3 +423,14 @@ class UnicoreSpawner(Spawner): self.log.info(f"{self._log_name} stdout:\n{stdout}") if self.unicore_job_delete: job.delete() + + +class UnicoreForwardSpawner(UnicoreSpawner, ForwardBaseSpawner): + async def start(self): + return await ForwardBaseSpawner.start(self) + + async def poll(self): + return await ForwardBaseSpawner.poll(self) + + async def stop(self, now=False, **kwargs): + return await ForwardBaseSpawner.stop(self, now=now, **kwargs) \ No newline at end of file