Skip to content
Snippets Groups Projects
Select Git revision
  • 699228d9613d3bcfed4ca07f33cab365e65e6544
  • master default protected
  • enxhi_issue460_remove_TOAR-I_access
  • michael_issue459_preprocess_german_stations
  • sh_pollutants
  • develop protected
  • release_v2.4.0
  • michael_issue450_feat_load-ifs-data
  • lukas_issue457_feat_set-config-paths-as-parameter
  • lukas_issue454_feat_use-toar-statistics-api-v2
  • lukas_issue453_refac_advanced-retry-strategy
  • lukas_issue452_bug_update-proj-version
  • lukas_issue449_refac_load-era5-data-from-toar-db
  • lukas_issue451_feat_robust-apriori-estimate-for-short-timeseries
  • lukas_issue448_feat_load-model-from-path
  • lukas_issue447_feat_store-and-load-local-clim-apriori-data
  • lukas_issue445_feat_data-insight-plot-monthly-distribution
  • lukas_issue442_feat_bias-free-evaluation
  • lukas_issue444_feat_choose-interp-method-cams
  • 414-include-crps-analysis-and-other-ens-verif-methods-or-plots
  • lukas_issue384_feat_aqw-data-handler
  • v2.4.0 protected
  • v2.3.0 protected
  • v2.2.0 protected
  • v2.1.0 protected
  • Kleinert_etal_2022_initial_submission
  • v2.0.0 protected
  • v1.5.0 protected
  • v1.4.0 protected
  • v1.3.0 protected
  • v1.2.1 protected
  • v1.2.0 protected
  • v1.1.0 protected
  • IntelliO3-ts-v1.0_R1-submit
  • v1.0.0 protected
  • v0.12.2 protected
  • v0.12.1 protected
  • v0.12.0 protected
  • v0.11.0 protected
  • v0.10.0 protected
  • IntelliO3-ts-v1.0_initial-submit
41 results

data_generator.py

Blame
  • utils.py 12.46 KiB
    import copy
    import logging
    import json
    import os
    import socket
    import subprocess
    import uuid
    
    from jupyterjsc_tunneling.settings import LOGGER_NAME
    from kubernetes import client
    from kubernetes import config
    
    
    log = logging.getLogger(LOGGER_NAME)
    assert log.__class__.__name__ == "ExtraLoggerClass"
    
    
    class RemoteExceptionError(Exception):
        pass
    
    
    class TunnelExceptionError(Exception):
        pass
    
    
    SYSTEM_NOT_AVAILABLE_ERROR_MESSAGE = "System is not available"
    
    
    def get_random_open_local_port():
        with socket.socket() as s:
            s.bind(("", 0))
            return s.getsockname()[1]
    
    
    def is_port_in_use(port):
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            return s.connect_ex(("localhost", port)) == 0
    
    
    def get_base_cmd(verbose=False):
        base_cmd = [
            "timeout",
            os.environ.get("SSHTIMEOUT", "3"),
            "ssh",
            "-F",
            os.environ.get("SSHCONFIGFILE", "/home/tunnel/.ssh/config"),
        ]
        if verbose:
            base_cmd.append("-v")
        return base_cmd
    
    
    def get_remote_cmd(action, verbose=False, **kwargs):
        base_cmd = get_base_cmd(verbose=verbose)
        return base_cmd + [f"remote_{kwargs['hostname']}", action]
    
    
    def get_tunnel_cmd(action, verbose=False, **kwargs):
        base_cmd = get_base_cmd(verbose=verbose)
        action_cmd = [
            "-O",
            action,
            f"tunnel_{kwargs['hostname']}",
            "-L",
            f"0.0.0.0:{kwargs['local_port']}:{kwargs['target_node']}:{kwargs['target_port']}",
        ]
        check_cmd = [
            "-O",
            "check",
            f"tunnel_{kwargs['hostname']}",
        ]
        create_cmd = [f"tunnel_{kwargs['hostname']}"]
        cmds = {
            "cancel": base_cmd + action_cmd,
            "check": base_cmd + check_cmd,
            "create": base_cmd + create_cmd,
            "forward": base_cmd + action_cmd,
        }
        return cmds[action]
    
    
    def get_cmd(prefix, action, verbose=False, **kwargs):
        if prefix == "remote":
            return get_remote_cmd(action, verbose=verbose, **kwargs)
        elif prefix == "tunnel":
            return get_tunnel_cmd(action, verbose=verbose, **kwargs)
        return []
    
    
    alert_admins_log = {True: log.critical, False: log.warning}
    action_log = {
        "cancel": log.info,
        "check": log.debug,
        "create": log.debug,
        "forward": log.info,
        "start": log.debug,
        "status": log.debug,
        "stop": log.debug,
    }
    
    
    def run_popen_cmd(
        prefix,
        action,
        log_msg,
        alert_admins=False,
        max_attempts=1,
        verbose=False,
        expected_returncodes=[0],
        **kwargs,
    ):
        cmd = get_cmd(prefix, action, verbose=verbose, **kwargs)
        log_extra = copy.deepcopy(kwargs)
        log_extra["cmd"] = cmd
        log.debug(
            f"{log_msg} ...",
            extra=log_extra,
        )
    
        # gunicorn preload app feature does not use gunicorn user/group but
        # the current uid instead. Which is root. We don't want to run commands as root.
        def set_uid():
            try:
                os.setuid(1000)
            except:
                pass
    
        with subprocess.Popen(
            cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, preexec_fn=set_uid
        ) as p:
            stdout, stderr = p.communicate()
            returncode = p.returncode
    
        log_extra["stdout"] = stdout.decode("utf-8").strip()
        log_extra["stderr"] = stderr.decode("utf-8").strip()
        log_extra["returncode"] = returncode
    
        action_log[action](
            f"{log_msg} done",
            extra=log_extra,
        )
    
        if returncode not in expected_returncodes:
            if max_attempts > 1:
                return run_popen_cmd(
                    prefix,
                    action,
                    log_msg,
                    alert_admins=alert_admins,
                    max_attempts=max_attempts - 1,
                    verbose=max_attempts == 2,
                    expected_returncodes=expected_returncodes,
                    **kwargs,
                )
            if not action == "check":
                # Check is expected to fail. So no extra log required
                alert_admins_log[alert_admins](
                    f"{log_msg} failed. Action may be required",
                    extra=log_extra,
                )
            raise Exception(
                f"unexpected returncode: {returncode} not in {expected_returncodes}"
            )
        return returncode
    
    
    def check_tunnel_connection(func):
        def build_up_connection(*args, **kwargs):
            # check if ssh connection to the node is up
            try:
                run_popen_cmd(
                    "tunnel",
                    "check",
                    "SSH tunnel check connection",
                    max_attempts=1,
                    **kwargs,
                )
            except:
                # That's ok. Let's try to start the tunnel.
                try:
                    run_popen_cmd(
                        "tunnel",
                        "create",
                        "SSH tunnel create connection",
                        max_attempts=3,
                        **kwargs,
                    )
                except:
                    log.critical(
                        "Could not create ssh tunnel.", extra=kwargs, exc_info=True
                    )
                    # That's not ok. We could not connect to the system
                    raise TunnelExceptionError(
                        f"System not available: Could not connect via ssh to {kwargs['hostname']}",
                        f"Request identification: {kwargs['uuidcode']}",
                    )
            return func(*args, **kwargs)
    
        return build_up_connection
    
    
    def stop_and_delete(alert_admins=False, raise_exception=False, **kwargs):
        stop_tunnel(alert_admins=alert_admins, raise_exception=raise_exception, **kwargs)
        k8s_svc(
            "delete", alert_admins=alert_admins, raise_exception=raise_exception, **kwargs
        )
    
    
    @check_tunnel_connection
    def stop_tunnel(alert_admins=True, raise_exception=True, **kwargs):
        try:
            run_popen_cmd(
                "tunnel",
                "cancel",
                "SSH stop tunnel",
                alert_admins=alert_admins,
                max_attempts=1,
                **kwargs,
            )
        except Exception as e:
            alert_admins_log[alert_admins](
                "Could not stop ssh tunnel", extra=kwargs, exc_info=True
            )
            if raise_exception:
                raise TunnelExceptionError("Could not stop ssh tunnel", str(e))
    
    
    @check_tunnel_connection
    def start_tunnel(alert_admins=True, raise_exception=True, **validated_data):
        try:
            run_popen_cmd(
                "tunnel",
                "forward",
                "SSH start tunnel",
                alert_admins=alert_admins,
                max_attempts=3,
                **validated_data,
            )
        except Exception as e:
            alert_admins_log[alert_admins](
                "Could not start tunnel", extra=validated_data, exc_info=True
            )
            if raise_exception:
                raise TunnelExceptionError(
                    "Could not forward port to system via ssh tunnel", str(e)
                )
    
    
    def start_remote(alert_admins=True, raise_exception=True, **validated_data):
        try:
            run_popen_cmd(
                "remote",
                "start",
                "SSH start remote",
                alert_admins=True,
                max_attempts=3,
                expected_returncodes=[217],
                **validated_data,
            )
        except Exception as e:
            alert_admins_log[alert_admins](
                "Could not start remote ssh tunnel", extra=validated_data, exc_info=True
            )
            if raise_exception:
                raise TunnelExceptionError("Could not start remote ssh tunnel", str(e))
    
    
    def status_remote(alert_admins=True, raise_exception=True, **data):
        try:
            return (
                run_popen_cmd(
                    "remote",
                    "status",
                    "SSH status remote",
                    alert_admins=alert_admins,
                    max_attempts=1,
                    expected_returncodes=[217, 218],
                    **data,
                )
                == 217
            )
        except Exception as e:
            alert_admins_log[alert_admins](
                "Could not receive status from remote ssh tunnel", extra=data, exc_info=True
            )
            if raise_exception:
                raise RemoteExceptionError(
                    "Could not receive status from remote ssh tunnel", str(e)
                )
    
    
    def stop_remote(alert_admins=True, raise_exception=True, **data):
        try:
            run_popen_cmd(
                "remote",
                "stop",
                "SSH stop remote",
                alert_admins=True,
                max_attempts=3,
                expected_returncodes=[218],
                **data,
            )
        except Exception as e:
            alert_admins_log[alert_admins](
                "Could not stop remote ssh tunnel", extra=data, exc_info=True
            )
            if raise_exception:
                raise RemoteExceptionError("Could not stop remote ssh tunnel", str(e))
    
    
    def k8s_get_client():
        config.load_incluster_config()
        return client.CoreV1Api()
    
    
    def k8s_get_svc_namespace():
        return os.environ.get("DEPLOYMENT_NAMESPACE", "default")
    
    
    def k8s_create_svc(**kwargs):
        v1 = k8s_get_client()
        deployment_name = os.environ.get("DEPLOYMENT_NAME", "tunneling")
        pod_name = os.environ.get("HOSTNAME", "drf-tunnel-0")
        name = kwargs["svc_name"]
        namespace = k8s_get_svc_namespace()
        labels = {"name": name}
        if kwargs.get("labels", {}):
            labels.update(json.loads(kwargs["labels"]))
        service_manifest = {
            "apiVersion": "v1",
            "kind": "Service",
            "metadata": {
                "labels": labels,
                "name": name,
                "resourceversion": "v1",
            },
            "spec": {
                "ports": [
                    {
                        "name": "port",
                        "port": kwargs["svc_port"],
                        "protocol": "TCP",
                        "targetPort": kwargs["local_port"],
                    }
                ],
                "selector": {
                    "app": deployment_name,
                    "statefulset.kubernetes.io/pod-name": pod_name,
                    },
            },
        }
        return v1.create_namespaced_service(
            body=service_manifest, namespace=namespace
        ).to_dict()
    
    
    # def k8s_get_svc(servername, **kwargs):
    #     v1 = k8s_get_client()
    #     name = k8s_get_svc_name(servername)
    #     namespace = k8s_get_svc_namespace()
    #     return v1.read_namespaced_service(name=name, namespace=namespace).to_dict()
    
    
    def k8s_delete_svc(**kwargs):
        v1 = k8s_get_client()
        name = kwargs["svc_name"]
        namespace = k8s_get_svc_namespace()
        return v1.delete_namespaced_service(name=name, namespace=namespace).to_dict()
    
    
    k8s_log = {
        "create": log.info,
        # "get": log.debug,
        "delete": log.info,
    }
    
    k8s_func = {
        "create": k8s_create_svc,
        # "get": k8s_get_svc,
        "delete": k8s_delete_svc,
    }
    
    
    def k8s_svc(action, alert_admins=False, raise_exception=True, **kwargs):
        log_extra = copy.deepcopy(kwargs)
        log.debug(f"Call K8s API to {action} svc ...", extra=log_extra)
        try:
            response = k8s_func[action](**kwargs)
        except Exception as e:
            alert_admins_log[alert_admins](
                f"Call K8s API to {action} svc failed", exc_info=True, extra=log_extra
            )
            if raise_exception:
                raise TunnelExceptionError(f"Call K8s API to {action} svc failed", str(e))
        else:
            k8s_log[action](f"Call K8s API to {action} svc done", extra=log_extra)
    
    
    def start_remote_from_config_file(uuidcode="", hostname=""):
        if not uuidcode:
            uuidcode = uuid.uuid4().hex
        kwargs = {"uuidcode": uuidcode}
        config_file_path = os.environ.get("SSHCONFIGFILE", "/home/tunnel/.ssh/config")
        try:
            with open(config_file_path, "r") as f:
                config_file = f.read().split("\n")
        except:
            log.critical(
                "Could not load ssh config file during startup",
                exc_info=True,
                extra=kwargs,
            )
            return
        remote_prefix = "Host remote_"
        remote_hosts_lines = [
            x[len(remote_prefix) :] for x in config_file if x.startswith(remote_prefix)
        ]
        kwargs["remote_hosts"] = remote_hosts_lines
        log.debug("Start remote tunnels (hostname={hostname})", extra=kwargs)
        for _hostname in remote_hosts_lines:
            if hostname and hostname != _hostname:
                continue
            kwargs["hostname"] = _hostname
            try:
                start_remote(**kwargs)
            except:
                log.exception("Could not start ssh remote tunnel", extra=kwargs)
    
    
    def get_custom_headers(request_headers):
        if "headers" in request_headers.keys():
            ret = copy.deepcopy(request_headers["headers"])
            return ret
        custom_header_keys = {"HTTP_UUIDCODE": "uuidcode", "HTTP_HOSTNAME": "hostname", "HTTP_LABELS": "labels"}
        ret = {}
        for key, new_key in custom_header_keys.items():
            if key in request_headers.keys():
                ret[new_key] = request_headers[key]
        return ret