Skip to content
Snippets Groups Projects
Commit 62449f67 authored by Tim Kreuzer's avatar Tim Kreuzer
Browse files

use external forwardspawner

parent 1a4cba69
No related branches found
No related tags found
No related merge requests found
Pipeline #158555 failed
...@@ -21,6 +21,7 @@ dependencies = [ ...@@ -21,6 +21,7 @@ dependencies = [
"jupyterhub>=4.0.0", "jupyterhub>=4.0.0",
"traitlets", "traitlets",
"urllib3", "urllib3",
"jupyterhub-forwardbasespawner",
"pyunicore" "pyunicore"
] ]
dynamic = ["version"] dynamic = ["version"]
......
from ._version import __version__ from ._version import __version__
from .spawner import UnicoreSpawner from .spawner import UnicoreSpawner
from .spawner import UnicoreForwardSpawner
from jupyterhub.utils import maybe_future from jupyterhub.utils import maybe_future
from jupyterhub.utils import url_path_join from jupyterhub.utils import url_path_join
from jupyterhub.spawner import Spawner
import copy
import pyunicore.client as pyunicore import pyunicore.client as pyunicore
import time import time
import json import json
from traitlets import Any from traitlets import Any
from traitlets import Bool
from traitlets import Dict from traitlets import Dict
from traitlets import Integer from traitlets import Integer
from traitlets import Unicode from traitlets import Unicode
from .forward import ForwardBaseSpawner
class UnicoreSpawner(Spawner): class UnicoreSpawner(Spawner):
job_descriptions = Dict( job_descriptions = Dict(
config=True, config=True,
...@@ -20,7 +25,7 @@ class UnicoreSpawner(Spawner): ...@@ -20,7 +25,7 @@ class UnicoreSpawner(Spawner):
to get one of the defined jobs. Otherwise the job with key `default` to get one of the defined jobs. Otherwise the job with key `default`
will be used. will be used.
Replacable variables can be added with curvy brackets in Replacable variables can be added with angle brackets (chevrons) in
the job_description. the job_description.
UnicoreSpawner will replace these variables with their actual value. UnicoreSpawner will replace these variables with their actual value.
Replacable keys are: Replacable keys are:
...@@ -114,7 +119,7 @@ class UnicoreSpawner(Spawner): ...@@ -114,7 +119,7 @@ class UnicoreSpawner(Spawner):
additional_replacements = self.additional_replacements additional_replacements = self.additional_replacements
return additional_replacements return additional_replacements
unicore_job_delete = Boolean( unicore_job_delete = Bool(
config=True, config=True,
default_value = True, default_value = True,
help=""" help="""
...@@ -144,38 +149,6 @@ class UnicoreSpawner(Spawner): ...@@ -144,38 +149,6 @@ class UnicoreSpawner(Spawner):
download_path = self.download_path download_path = self.download_path
return download_path return download_path
start_server_location = Any(
config=True,
help="""
Unicore Batch jobs will be port-forwarded automatically, if
`UnicoreSpawner.automatic_port_forwarding` is true.
If this is false (default), or it's an interactive job, you have to
use `UnicoreSpawner.start_server_location` to define, where
JupyterHub should look for the single-user server.
Example::
async def start_server_location(spawner):
# do other thing
return "http://custom-svc.default.svc:8080"
c.UnicoreSpawner.start_server_location = start_server_location
"""
)
async def get_start_server_location(self):
"""Get start_server_location
Returns:
start_server_location (string): Used in Spawner.start
"""
if callable(self.start_wait_for_job):
start_wait_for_job = await maybe_future(self.start_wait_for_job(self))
else:
start_wait_for_job = self.start_wait_for_job
return start_wait_for_job
unicore_site_url = Any( unicore_site_url = Any(
config=True, config=True,
help=""" help="""
...@@ -191,14 +164,6 @@ class UnicoreSpawner(Spawner): ...@@ -191,14 +164,6 @@ class UnicoreSpawner(Spawner):
""" """
) )
public_api_url = Unicode(
config=True,
default_value="",
help="""
Public JupyterHub API Url.
"""
)
async def get_unicore_site_url(self): async def get_unicore_site_url(self):
"""Get unicore site url """Get unicore site url
...@@ -351,13 +316,12 @@ class UnicoreSpawner(Spawner): ...@@ -351,13 +316,12 @@ class UnicoreSpawner(Spawner):
return state return state
def load_state(self, state): def load_state(self, state):
super().load_state(state)
if 'resource_url' in state: if 'resource_url' in state:
self.resource_url = state['resource_url'] self.resource_url = state['resource_url']
def get_env(self): def get_env(self):
env = super().get_env() env = super().get_env()
env["PORT"] = self.port
env["JUPYTERHUB_SERVER_NAME"] = self._log_name
if self.public_api_url: if self.public_api_url:
env["JUPYTERHUB_API_URL"] = self.public_api_url env["JUPYTERHUB_API_URL"] = self.public_api_url
...@@ -367,14 +331,7 @@ class UnicoreSpawner(Spawner): ...@@ -367,14 +331,7 @@ class UnicoreSpawner(Spawner):
return env return env
def start(self): def start(self):
"""Thin wrapper around self._start return super().start()
so we can hold onto a reference for the Future
start returns, which we can use to terminate
.progress()
"""
self._start_future = asyncio.ensure_future(self._start())
return self._start_future
async def _start(self): async def _start(self):
job = self.get_string(self.user_options.get("job", ["default"])) job = self.get_string(self.user_options.get("job", ["default"]))
...@@ -386,33 +343,54 @@ class UnicoreSpawner(Spawner): ...@@ -386,33 +343,54 @@ class UnicoreSpawner(Spawner):
env = self.get_env() env = self.get_env()
job_description = json.dumps(job_description) job_description = json.dumps(job_description)
for key, value in self.user_options.items(): for key, value in self.user_options.items():
job_description = job_description.replace(f"{{{key}}}", self.get_string(value).replace('"', '\\"')) job_description = job_description.replace(f"<{key}>", self.get_string(value).replace('"', '\\"'))
for key, value in env.items(): for key, value in env.items():
if type(value) == int: if type(value) == int:
job_description = job_description.replace(f"{{{key}}}", str(value).replace('"', '\\"')) job_description = job_description.replace(f"<{key}>", str(value).replace('"', '\\"'))
else: else:
job_description = job_description.replace(f"{{{key}}}", value.replace('"', '\\"')) job_description = job_description.replace(f"<{key}>", value.replace('"', '\\"'))
additional_replacements = await self.get_additional_replacements() additional_replacements = await self.get_additional_replacements()
for ukey, _uvalue in self.user_options.items(): for ukey, _uvalue in self.user_options.items():
uvalue = self.get_string(_uvalue) uvalue = self.get_string(_uvalue)
for key, value in additional_replacements.get(ukey, {}).get(uvalue, {}).items(): for key, value in additional_replacements.get(ukey, {}).get(uvalue, {}).items():
job_description = job_description.replace(f"{{key}}", value) job_description = job_description.replace(f"<{key}>", value)
job_description = json.loads(job_description) job_description = json.loads(job_description)
jd_env = job_description.get("Environment", {}).copy()
# Remove keys that might disturb new JupyterLabs (like PATH, PYTHONPATH)
for key in set(env.keys()):
if not (key.startswith("JUPYTER_") or key.startswith("JUPYTERHUB_")):
self.log.info(f"Remove {key} from env")
del env[key]
jd_env.update(env)
job_description["Environment"] = jd_env
client = await self._get_client() client = await self._get_client()
unicore_job = self.timed_func_call(client.new_job, job_description) unicore_job = self.timed_func_call(client.new_job, job_description)
self.resource_url = unicore_job.resource_url self.resource_url = unicore_job.resource_url
if job_description.get("Job Type", "batch") in ["batch", "normal"]: # UNICORE/X supports port-forwarding for batch jobs, but not
from pyunicore.forwarder import open_tunnel # interactive jobs yet.
sock = open_tunnel(unicore_job, self.port) # Until it supports it for all jobs, we rely on the base class
return ("localhost", sock.getsockname()[1]) # to create a port-forward process and add the correct return value.
# --
# if job_description.get("Job Type", "batch") in ["batch", "normal"]:
# from pyunicore.forwarder import open_tunnel
# sock = open_tunnel(unicore_job, self.port)
# return ("localhost", sock.getsockname()[1])
return await self.get_start_server_location() return ""
async def poll(self): async def poll(self):
return await super().poll()
async def _poll(self):
if not self.resource_url:
return 0
job = await self._get_job() job = await self._get_job()
is_running = self.timed_func_call(job.is_running) is_running = self.timed_func_call(job.is_running)
if is_running: if is_running:
...@@ -431,6 +409,12 @@ class UnicoreSpawner(Spawner): ...@@ -431,6 +409,12 @@ class UnicoreSpawner(Spawner):
return s.data.decode() return s.data.decode()
async def stop(self, now, **kwargs): async def stop(self, now, **kwargs):
return await super().stop(now, **kwargs)
async def _stop(self, now, **kwargs):
if not self.resource_url:
return
job = await self._get_job() job = await self._get_job()
job.abort() job.abort()
stderr = self.download_file(job, "stderr") stderr = self.download_file(job, "stderr")
...@@ -439,3 +423,14 @@ class UnicoreSpawner(Spawner): ...@@ -439,3 +423,14 @@ class UnicoreSpawner(Spawner):
self.log.info(f"{self._log_name} stdout:\n{stdout}") self.log.info(f"{self._log_name} stdout:\n{stdout}")
if self.unicore_job_delete: if self.unicore_job_delete:
job.delete() job.delete()
class UnicoreForwardSpawner(UnicoreSpawner, ForwardBaseSpawner):
async def start(self):
return await ForwardBaseSpawner.start(self)
async def poll(self):
return await ForwardBaseSpawner.poll(self)
async def stop(self, now=False, **kwargs):
return await ForwardBaseSpawner.stop(self, now=now, **kwargs)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment