diff --git a/python/graphscope/deploy/kubernetes/cluster.py b/python/graphscope/deploy/kubernetes/cluster.py index 12ec537c6f18..fe5210a37af7 100644 --- a/python/graphscope/deploy/kubernetes/cluster.py +++ b/python/graphscope/deploy/kubernetes/cluster.py @@ -465,6 +465,7 @@ def _waiting_for_services_ready(self): api_client=self._api_client, namespace=self._namespace, name=self._coordinator_name, + pods_watcher=self._coordinator_pods_watcher, timeout_seconds=self._saved_locals["timeout_seconds"], ): self._coordinator_pods_watcher.stop() @@ -536,13 +537,11 @@ def start(self): "Coordinator pod start successful with address %s, connecting to service ...", self._coordinator_endpoint, ) - except Exception as e: + except Exception: time.sleep(1) self._dump_coordinator_failed_status() self.stop() - raise K8sError( - "Error when launching Coordinator on kubernetes cluster" - ) from e + raise def stop(self, wait=False): """Stop graphscope instance on kubernetes cluster. diff --git a/python/graphscope/deploy/kubernetes/utils.py b/python/graphscope/deploy/kubernetes/utils.py index 0b1da3639e80..d7c297acc15f 100644 --- a/python/graphscope/deploy/kubernetes/utils.py +++ b/python/graphscope/deploy/kubernetes/utils.py @@ -20,6 +20,7 @@ import logging import os import re +import sys import threading import time from queue import Queue @@ -99,12 +100,22 @@ def try_to_read_namespace_from_context(): return None -def wait_for_deployment_complete(api_client, namespace, name, timeout_seconds=60): +def wait_for_deployment_complete( + api_client, namespace, name, pods_watcher=None, timeout_seconds=60 +): core_api = kube_client.CoreV1Api(api_client) app_api = kube_client.AppsV1Api(api_client) start_time = time.time() while time.time() - start_time < timeout_seconds: time.sleep(1) + if pods_watcher is not None: + if pods_watcher.exception is not None: + tp, value, tb = pods_watcher.exception + if value is None: + value = tp() + if value.__traceback__ is not tb: + raise value.with_traceback(tb) + raise value response = app_api.read_namespaced_deployment_status( namespace=namespace, name=name ) @@ -154,6 +165,11 @@ def __init__(self, api_client, namespace, pod, container=None, queue=None): self._stream_event_thread = None self._stream_log_thread = None self._stopped = True + self._exc_info = None + + @property + def exception(self): + return self._exc_info def _stream_event_impl(self, simple=False): field_selector = "involvedObject.name=" + self._pod_name @@ -170,6 +186,7 @@ def _stream_event_impl(self, simple=False): except K8SApiException: pass else: + error_message = [] for event in events.items: msg = f"{self._pod_name}: {event.message}" if msg and msg not in event_messages: @@ -177,7 +194,16 @@ def _stream_event_impl(self, simple=False): self._lines.put(msg) logger.info(msg, extra={"simple": simple}) if event.reason == "Failed": - raise K8sError(f"Kubernetes event error: {msg}") + error_message.append(f"Kubernetes event error: {msg}") + if error_message: + try: + raise K8sError( + "Error when launching Coordinator on kubernetes cluster: \n" + + "\n".join(error_message) + ) + except: # noqa: E722,B110, pylint: disable=bare-except + self._exc_info = sys.exc_info() + return def _stream_log_impl(self, simple=False): log_messages = [] diff --git a/python/graphscope/framework/utils.py b/python/graphscope/framework/utils.py index c91471b6962a..93f097a63f4d 100644 --- a/python/graphscope/framework/utils.py +++ b/python/graphscope/framework/utils.py @@ -66,7 +66,7 @@ def read_and_poll(self): self._sink.flush() if not self._drop: self._lines.put(line) - except: # noqa: E722 + except: # noqa: E722, pylint: disable=bare-except pass self._polling_thread = threading.Thread(target=read_and_poll, args=(self,)) diff --git a/python/jupyter/graphscope/setupbase.py b/python/jupyter/graphscope/setupbase.py index 56160e00eebe..ef68a4ca12a1 100644 --- a/python/jupyter/graphscope/setupbase.py +++ b/python/jupyter/graphscope/setupbase.py @@ -181,7 +181,7 @@ def run(self): def run(cmd, **kwargs): """Echo a command before running it. Defaults to repo as cwd""" - log.info("> " + list2cmdline(cmd)) + log.info("> %s", list2cmdline(cmd)) kwargs.setdefault("cwd", HERE) kwargs.setdefault("shell", os.name == "nt") if not isinstance(cmd, (list, tuple)) and os.name != "nt": @@ -344,10 +344,10 @@ def run(self): if not which(npm_cmd[0]): log.error( - "`{0}` unavailable. If you're running this command " - "using sudo, make sure `{0}` is available to sudo".format( - npm_cmd[0] - ) + "`%s` unavailable. If you're running this command " + "using sudo, make sure `%s` is available to sudo", + npm_cmd[0], + npm_cmd[0], ) return