Skip to content
Snippets Groups Projects
Commit b73af0d2 authored by Tim Kreuzer's avatar Tim Kreuzer
Browse files

Init

parents
Branches
Tags
No related merge requests found
tests
.vscode
jupyterhub-proxy.pid
jupyterhub.sqlite
jupyterhub_cookie_secret
# Created by https://www.toptal.com/developers/gitignore/api/python,jupyternotebooks
# Edit at https://www.toptal.com/developers/gitignore?templates=python,jupyternotebooks
### JupyterNotebooks ###
# gitignore template for Jupyter Notebooks
# website: http://jupyter.org/
.ipynb_checkpoints
*/.ipynb_checkpoints/*
# IPython
profile_default/
ipython_config.py
# Remove previous ipynb_checkpoints
# git rm -r .ipynb_checkpoints/
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
# IPython
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# End of https://www.toptal.com/developers/gitignore/api/python,jupyternotebooks
stages:
- pypi
workflow:
rules:
- if: $CI_COMMIT_TAG
- when: never
pypi:
stage: pypi
image:
name: python:3.11-alpine3.18
before_script:
- pip install twine build
script:
- python -m build
- twine upload -u __token__ -p ${PYPI_JUPYTERJSC_TOKEN} dist/*
rules:
- if: $CI_COMMIT_TAG
repos:
- repo: https://github.com/asottile/reorder_python_imports
rev: v2.6.0
hooks:
- id: reorder-python-imports
- repo: https://github.com/psf/black
rev: 22.3.0
hooks:
- id: black
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.4.0
hooks:
- id: end-of-file-fixer
- id: check-json
- id: check-case-conflict
- id: check-executables-have-shebangs
LICENSE 0 → 100644
BSD 3-Clause License
Copyright (c) 2023, Forschungszentrum Juelich GmbH
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# UnicoreSpawner
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "jupyterhub-unicorespawner"
description = "JupyterHub Spawner for UNICORE"
readme = "README.md"
requires-python = ">=3.9"
licesse = {file = "LICENSE"}
keywords = ["jupyterhub", "spawner"]
authors = [
{name = "Tim Kreuzer", email = "t.kreuzer@fz-juelich.de"},
{name = "Alice Grosch", email = "a.grosch@fz-juelich.de"}
]
dependencies = [
"escapism",
"jinja2",
"jupyterhub>=4.0.0",
"traitlets",
"urllib3",
"pyunicore"
]
dynamic = ["version"]
[tool.hatch.build]
artifacts = [
"unicorespawner/templates/*"
]
[tool.hatch.version]
path = "unicorespawner/_version.py"
[tool.isort]
profile = "black"
[tool.tbump]
# Uncomment this if your project is hosted on GitHub:
github_url = "https://github.com/kreuzert/jupyterhub-unicorespawner"
[tool.tbump.version]
current = "0.0.1.dev1"
# Example of a semver regexp.
# Make sure this matches current_version before
# using tbump
regex = '''
(?P<major>\d+)
\.
(?P<minor>\d+)
\.
(?P<patch>\d+)
(?P<pre>((a|b|rc)\d+)|)
\.?
(?P<dev>(?<=\.)dev\d*|)
'''
[tool.tbump.git]
message_template = "Bump to {new_version}"
tag_template = "{new_version}"
# For each file to patch, add a [[tool.tbump.file]] config
# section containing the path of the file, relative to the
# pyproject.toml location.
[[tool.tbump.file]]
src = "pyproject.toml"
search = 'version = "{current_version}"'
[[tool.tbump.file]]
src = "jupyterhub/_version.py"
version_template = '({major}, {minor}, {patch}, "{pre}", "{dev}")'
search = "version_info = {current_version}"
[[tool.tbump.file]]
src = "docs/source/_static/rest-api.yml"
search = "version: {current_version}"
[tool.djlint]
indent = 2
profile = "jinja"
from ._version import __version__
from .spawner import UnicoreSpawner
# __version__ should be updated using tbump, based on configuration in
# pyproject.toml, according to instructions in RELEASE.md.
#
#version_info = (0, 0, 1, "dev1", "")
#__version__ = ".".join(map(str, version_info[:3])) + ".".join(version_info[3:]).rstrip(
# "."
# )
__version__ = "0.0.1.dev1"
from jupyterhub.utils import maybe_future
from jupyterhub.utils import url_path_join
import pyunicore.client as pyunicore
import time
import json
from traitlets import Any
from traitlets import Dict
from traitlets import Integer
from traitlets import Unicode
class UnicoreSpawner(Spawner):
job_descriptions = Dict(
config=True,
help="""
Multiple named job descriptions to start different UNICORE Jobs.
If `Spawner.user_options["job"]` is defined, it will be used
to get one of the defined jobs. Otherwise the job with key `default`
will be used.
Replacable variables can be added with curvy brackets in
the job_description.
UnicoreSpawner will replace these variables with their actual value.
Replacable keys are:
- any env variable
- any user_option key
- any key defined in Spawner.additional_replacements
Has to be a dict or a callable, which returns a dict.
More information about job_description:
https://unicore-docs.readthedocs.io/en/latest/user-docs/rest-api/job-description/index.html
Example::
import os
import json
async def get_job_description(spawner):
job = spawner.user_options.get("job", ["None"])
if type(job) != list:
job = [job]
job = job[0]
with open(f"/mnt/jobs/{job}/job_description.json", "r") as f:
job_description = json.load(f)
job_description["Imports"] = {}
for subdir, dirs, files in os.walk("/mnt/jobs/{job}/input"):
for file in files:
with open(os.path.join(subdir, file), "r") as f:
job_description["Imports"][file] = f.read()
return job_description
c.UnicoreSpawner.job_descriptions = {
"job-1": get_job_description,
"job-2": get_job_description
}
""",
)
additional_replacements = Any(
config=True,
default_value = {},
help="""
Define variables for each defined user_option key-value pair.
This variables will be replaced in the job_description.
With these replacements the same template job_description
can be used for multiple systems and versions.
In the example below all occurrences of `{{startmsg}}` or `{{version}}`
in the job description will be replaced, depending on
the defined user_options `system` and `job`. This reduces redundancy
in `Spawner.jobs` configuration (by using the same function for multiple
jobs) and in configuration files (by using variables within the
job description file).
Example::
{
"system": {
"local": {
"startmsg": "Starting job on local system"
},
"remote": {
"startmsg": "Starting job on remote system"
}
},
"job": {
"job-1": {
"version": "1.0.0"
},
"job-2": {
"version": "1.1.0"
}
}
}
"""
)
async def get_additional_replacements(self):
"""Get additional_replacements for job_description
Returns:
additional_replacements (dict): Used in Unicore Job description
"""
if callable(self.additional_replacements):
additional_replacements = await maybe_future(self.additional_replacements(self))
else:
additional_replacements = self.additional_replacements
return additional_replacements
unicore_job_delete = Boolean(
config=True,
default_value = True,
help="""
Whether unicore jobs should be deleted when stopped
"""
)
download_path = Any(
config=True,
default_value = "",
help="""
Function to define where to store stderr/stdout after stopping
the job
"""
)
async def get_download_path(self):
"""Get additional_replacements for job_description
Returns:
additional_replacements (dict): Used in Unicore Job description
"""
if callable(self.download_path):
download_path = await maybe_future(self.download_path(self))
else:
download_path = self.download_path
return download_path
start_server_location = Any(
config=True,
help="""
Unicore Batch jobs will be port-forwarded automatically, if
`UnicoreSpawner.automatic_port_forwarding` is true.
If this is false (default), or it's an interactive job, you have to
use `UnicoreSpawner.start_server_location` to define, where
JupyterHub should look for the single-user server.
Example::
async def start_server_location(spawner):
# do other thing
return "http://custom-svc.default.svc:8080"
c.UnicoreSpawner.start_server_location = start_server_location
"""
)
async def get_start_server_location(self):
"""Get start_server_location
Returns:
start_server_location (string): Used in Spawner.start
"""
if callable(self.start_wait_for_job):
start_wait_for_job = await maybe_future(self.start_wait_for_job(self))
else:
start_wait_for_job = self.start_wait_for_job
return start_wait_for_job
unicore_site_url = Any(
config=True,
help="""
UNICORE site url.
Example::
async def site_url(spawner):
if spawner.user_options["system"][0] == "abc":
return "https://abc.com:8080/DEMO-SITE/rest/core"
c.UnicoreSpawner.unicore_site_url = site_url
"""
)
public_api_url = Unicode(
config=True,
default_value="",
help="""
Public JupyterHub API Url.
"""
)
async def get_unicore_site_url(self):
"""Get unicore site url
Returns:
url (string): Used in Unicore communication
"""
if callable(self.unicore_site_url):
url = await maybe_future(self.unicore_site_url(self))
else:
url = self.unicore_site_url
return url
unicore_cert_path = Any(
config=True,
default_value=False,
help="""
UNICORE site certificate path. String or False
"""
)
download_max_bytes = Integer(
config=True,
default_value=4096,
help="""
UNICORE max_bytes for Download stderr and stdout
"""
)
unicore_transport_kwargs = Any(
config=True,
default_value = {},
help="""
kwargs used in pyunicore.Transport(**kwargs) call.
Check https://github.com/HumanBrainProject/pyunicore for more
information.
Example::
async def transport_kwargs(spawner):
auth_state = await spawner.user.get_auth_state()
return {
"credential": auth_state["access_token"],
"oidc": False,
"verify": "/mnt/unicore/cert.crt",
# "verify": False,
"timeout": 30
}
c.UnicoreSpawner.unicore_transport_kwargs = transport_kwargs
"""
)
async def get_unicore_transport_kwargs(self):
"""Get unicore transport kwargs
Returns:
kwargs (dict): Used in Unicore communication
"""
if callable(self.unicore_transport_kwargs):
kwargs = await maybe_future(self.unicore_transport_kwargs(self))
else:
kwargs = self.unicore_transport_kwargs
return kwargs
unicore_transport_preferences = Any(
config=True,
default_value=False,
help="""
Define preferences that should be set to transport object.
Example::
async def transport_preferences(spawner):
account = spawner.user_options.get("account", None)
if type(account) != list:
account = [account]
account = account[0]
project = spawner.user_options.get("project", None)
if type(project) != list:
project = [project]
project = project[0]
return f"uid:{account},group:{project}"
"""
)
async def get_unicore_transport_preferences(self):
"""Get unicore transport preferences
Returns:
preference (string): Used in Unicore communication
"""
if callable(self.unicore_transport_preferences):
preferences = await maybe_future(self.unicore_transport_preferences(self))
else:
preferences = self.unicore_transport_preferences
return preferences
def get_string(self, value):
if type(value) != list:
value = [value]
if len(value) == 0:
return ""
else:
return str(value[0])
def timed_func_call(self, func, *args, **kwargs):
tic = time.time()
try:
ret = func(*args, **kwargs)
finally:
toc = time.time() - tic
extra = {"tictoc": f"{func.__module__},{func.__name__}", "duration": toc}
self.log.debug(
f"UNICORE communication for {self._log_name}",
extra=extra,
)
return ret
async def _get_transport(self):
transport_kwargs = await self.get_unicore_transport_kwargs()
transport = self.timed_func_call(pyunicore.Transport, **transport_kwargs)
preferences = await self.get_unicore_transport_preferences()
if preferences:
transport.preferences = preferences
return transport
async def _get_client(self):
transport = await self._get_transport()
url = await self.get_unicore_site_url()
client = self.timed_func_call(pyunicore.Client, transport, url)
return client
async def _get_job(self):
transport = await self._get_transport()
job = self.timed_func_call(pyunicore.Job, transport, self.resource_url)
return job
def clear_state(self):
super().clear_state()
self.resource_url = ""
def get_state(self):
state = super().get_state()
state['resource_url'] = self.resource_url
return state
def load_state(self, state):
if 'resource_url' in state:
self.resource_url = state['resource_url']
def get_env(self):
env = super().get_env()
env["PORT"] = self.port
env["JUPYTERHUB_SERVER_NAME"] = self._log_name
if self.public_api_url:
env["JUPYTERHUB_API_URL"] = self.public_api_url
env[
"JUPYTERHUB_ACTIVITY_URL"
] = f"{env['JUPYTERHUB_API_URL'].rstrip('/')}/users/{self.user.name}/activity"
return env
def start(self):
"""Thin wrapper around self._start
so we can hold onto a reference for the Future
start returns, which we can use to terminate
.progress()
"""
self._start_future = asyncio.ensure_future(self._start())
return self._start_future
async def _start(self):
job = self.get_string(self.user_options.get("job", ["default"]))
job_description = self.job_descriptions[job]
if callable(job_description):
job_description = await maybe_future(job_description(self))
env = self.get_env()
job_description = json.dumps(job_description)
for key, value in self.user_options.items():
job_description = job_description.replace(f"{{{key}}}", self.get_string(value).replace('"', '\\"'))
for key, value in env.items():
if type(value) == int:
job_description = job_description.replace(f"{{{key}}}", str(value).replace('"', '\\"'))
else:
job_description = job_description.replace(f"{{{key}}}", value.replace('"', '\\"'))
additional_replacements = await self.get_additional_replacements()
for ukey, _uvalue in self.user_options.items():
uvalue = self.get_string(_uvalue)
for key, value in additional_replacements.get(ukey, {}).get(uvalue, {}).items():
job_description = job_description.replace(f"{{key}}", value)
job_description = json.loads(job_description)
client = await self._get_client()
unicore_job = self.timed_func_call(client.new_job, job_description)
self.resource_url = unicore_job.resource_url
if job_description.get("Job Type", "batch") in ["batch", "normal"]:
from pyunicore.forwarder import open_tunnel
sock = open_tunnel(unicore_job, self.port)
return ("localhost", sock.getsockname()[1])
return await self.get_start_server_location()
async def poll(self):
job = await self._get_job()
is_running = self.timed_func_call(job.is_running)
if is_running:
return None
else:
await self.stop(now=False, cancel=True)
return 0
def download_file(self, job, file):
file_path = job.working_dir.stat(file)
file_size = file_path.properties["size"]
if file_size == 0:
return f"{file} is empty"
offset = max(0, file_size - self.download_max_bytes)
s = file_path.raw(offset=offset)
return s.data.decode()
async def stop(self, now, **kwargs):
job = await self._get_job()
job.abort()
stderr = self.download_file(job, "stderr")
stdout = self.download_file(job, "stdout")
self.log.info(f"{self._log_name} stderr:\n{stderr}")
self.log.info(f"{self._log_name} stdout:\n{stdout}")
if self.unicore_job_delete:
job.delete()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment