diff --git a/unicorespawner/spawner.py b/unicorespawner/spawner.py index dbe6a5e640e0c69ec57db461cb60fab33bde318a..a70bda52f00ce30d5fb25710095ed3443f82f58e 100644 --- a/unicorespawner/spawner.py +++ b/unicorespawner/spawner.py @@ -8,11 +8,13 @@ from datetime import datetime import pyunicore.client as pyunicore from forwardbasespawner import ForwardBaseSpawner +from jupyterhub.utils import AnyTimeoutError from jupyterhub.utils import maybe_future from jupyterhub.utils import url_escape_path from jupyterhub.utils import url_path_join from pyunicore.forwarder import Forwarder from requests.exceptions import HTTPError +from tornado import gen from traitlets import Any from traitlets import Bool from traitlets import Dict @@ -438,24 +440,48 @@ class UnicoreSpawner(ForwardBaseSpawner): logs_s = "<br>".join(log_list_short_escaped) return f"<details><summary>    {summary}(click here to expand):</summary>{logs_s}</details>" - def download_file(self, job, file): + async def download_file(self, job, file): + self.log.info(f"{self._log_name} - Download {file}") try: file_path = job.working_dir.stat(file) file_size = file_path.properties["size"] if file_size == 0: + self.log.info(f"{self._log_name} - Download {file} is empty") return f"{file} is empty" offset = max(0, file_size - self.download_max_bytes) s = file_path.raw(offset=offset) + self.log.info(f"{self._log_name} - Download {file} successful") return s.data.decode() except: - self.log.exception(f"Could not load file {file}") + self.log.exception(f"{self._log_name} - Could not load file {file}") return f"{file} does not exist" async def unicore_stop_event(self, spawner): job = await self._get_job() - unicore_stderr = self.download_file(job, "stderr") - unicore_stdout = self.download_file(job, "stdout") + timeout = 3 + download_future = self.download_file(job, "stdout") + try: + unicore_stdout = await gen.with_timeout( + timedelta(seconds=timeout), download_future + ) + except AnyTimeoutError: + self.log.exception(f"{self._log_name} - Timeout while downloading stdout") + unicore_stdout = "Could not download file" + except: + self.log.exception(f"{self._log_name} - Error while downloading stdout") + unicore_stdout = "Could not download file" + download_future = self.download_file(job, "stderr") + try: + unicore_stderr = await gen.with_timeout( + timedelta(seconds=timeout), download_future + ) + except AnyTimeoutError: + self.log.exception(f"{self._log_name} - Timeout while downloading stderr") + unicore_stderr = "Could not download file" + except: + self.log.exception(f"{self._log_name} - Error while downloading stderr") + unicore_stderr = "Could not download file" now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] summary = f"UNICORE Job stopped with exitCode: {job.properties.get('exitCode', 'unknown exitCode')}" @@ -607,7 +633,22 @@ class UnicoreSpawner(ForwardBaseSpawner): await asyncio.sleep(5) continue # Download stderr to receive port + address - unicore_stderr = self.download_file(unicore_job, "stderr") + timeout = 3 + download_future = self.download_file(unicore_job, "stderr") + try: + unicore_stderr = await gen.with_timeout( + timedelta(seconds=timeout), download_future + ) + except AnyTimeoutError: + self.log.exception( + f"{self._log_name} - Timeout while downloading stderr" + ) + unicore_stderr = "Could not download file" + except: + self.log.exception( + f"{self._log_name} - Error while downloading stderr" + ) + unicore_stderr = "Could not download file" if type(unicore_stderr) == str: unicore_stderr = unicore_stderr.split("\n") log_line = [ @@ -702,13 +743,35 @@ class UnicoreSpawner(ForwardBaseSpawner): async def _stop(self, now, **kwargs): if not getattr(self, "resource_url", False): - f"{self._log_name} - Resource_url not set. Do not stop job." + self.log.error(f"{self._log_name} - Resource_url not set. Do not stop job.") return job = await self._get_job() - unicore_stdout = self.download_file(job, "stdout") - unicore_stderr = self.download_file(job, "stderr") + timeout = 3 + download_future = self.download_file(job, "stdout") + try: + unicore_stdout = await gen.with_timeout( + timedelta(seconds=timeout), download_future + ) + except AnyTimeoutError: + self.log.exception(f"{self._log_name} - Timeout while downloading stdout") + unicore_stdout = "Could not download file" + except: + self.log.exception(f"{self._log_name} - Error while downloading stdout") + unicore_stdout = "Could not download file" + download_future = self.download_file(job, "stderr") + try: + unicore_stderr = await gen.with_timeout( + timedelta(seconds=timeout), download_future + ) + except AnyTimeoutError: + self.log.exception(f"{self._log_name} - Timeout while downloading stderr") + unicore_stderr = "Could not download file" + except: + self.log.exception(f"{self._log_name} - Error while downloading stderr") + unicore_stderr = "Could not download file" + self.log.debug(f"{self._log_name} - File download complete") unicore_logs = job.properties.get("log", []) self.log.info( f"{self._log_name} - Stop job. unicore log:\n{self.short_logs(unicore_logs, 20)}"