Skip to content

Commit

Permalink
Propogate the k8s exception to avoid waiting forever (#2747)
Browse files Browse the repository at this point in the history
Fixes #2746

Signed-off-by: Tao He <[email protected]>
  • Loading branch information
sighingnow authored May 25, 2023
1 parent e6af69b commit 3cf0798
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 12 deletions.
7 changes: 3 additions & 4 deletions python/graphscope/deploy/kubernetes/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ def _waiting_for_services_ready(self):
api_client=self._api_client,
namespace=self._namespace,
name=self._coordinator_name,
pods_watcher=self._coordinator_pods_watcher,
timeout_seconds=self._saved_locals["timeout_seconds"],
):
self._coordinator_pods_watcher.stop()
Expand Down Expand Up @@ -536,13 +537,11 @@ def start(self):
"Coordinator pod start successful with address %s, connecting to service ...",
self._coordinator_endpoint,
)
except Exception as e:
except Exception:
time.sleep(1)
self._dump_coordinator_failed_status()
self.stop()
raise K8sError(
"Error when launching Coordinator on kubernetes cluster"
) from e
raise

def stop(self, wait=False):
"""Stop graphscope instance on kubernetes cluster.
Expand Down
30 changes: 28 additions & 2 deletions python/graphscope/deploy/kubernetes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import logging
import os
import re
import sys
import threading
import time
from queue import Queue
Expand Down Expand Up @@ -99,12 +100,22 @@ def try_to_read_namespace_from_context():
return None


def wait_for_deployment_complete(api_client, namespace, name, timeout_seconds=60):
def wait_for_deployment_complete(
api_client, namespace, name, pods_watcher=None, timeout_seconds=60
):
core_api = kube_client.CoreV1Api(api_client)
app_api = kube_client.AppsV1Api(api_client)
start_time = time.time()
while time.time() - start_time < timeout_seconds:
time.sleep(1)
if pods_watcher is not None:
if pods_watcher.exception is not None:
tp, value, tb = pods_watcher.exception
if value is None:
value = tp()
if value.__traceback__ is not tb:
raise value.with_traceback(tb)
raise value
response = app_api.read_namespaced_deployment_status(
namespace=namespace, name=name
)
Expand Down Expand Up @@ -154,6 +165,11 @@ def __init__(self, api_client, namespace, pod, container=None, queue=None):
self._stream_event_thread = None
self._stream_log_thread = None
self._stopped = True
self._exc_info = None

@property
def exception(self):
return self._exc_info

def _stream_event_impl(self, simple=False):
field_selector = "involvedObject.name=" + self._pod_name
Expand All @@ -170,14 +186,24 @@ def _stream_event_impl(self, simple=False):
except K8SApiException:
pass
else:
error_message = []
for event in events.items:
msg = f"{self._pod_name}: {event.message}"
if msg and msg not in event_messages:
event_messages.append(msg)
self._lines.put(msg)
logger.info(msg, extra={"simple": simple})
if event.reason == "Failed":
raise K8sError(f"Kubernetes event error: {msg}")
error_message.append(f"Kubernetes event error: {msg}")
if error_message:
try:
raise K8sError(
"Error when launching Coordinator on kubernetes cluster: \n"
+ "\n".join(error_message)
)
except: # noqa: E722,B110, pylint: disable=bare-except
self._exc_info = sys.exc_info()
return

def _stream_log_impl(self, simple=False):
log_messages = []
Expand Down
2 changes: 1 addition & 1 deletion python/graphscope/framework/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def read_and_poll(self):
self._sink.flush()
if not self._drop:
self._lines.put(line)
except: # noqa: E722
except: # noqa: E722, pylint: disable=bare-except
pass

self._polling_thread = threading.Thread(target=read_and_poll, args=(self,))
Expand Down
10 changes: 5 additions & 5 deletions python/jupyter/graphscope/setupbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def run(self):

def run(cmd, **kwargs):
"""Echo a command before running it. Defaults to repo as cwd"""
log.info("> " + list2cmdline(cmd))
log.info("> %s", list2cmdline(cmd))
kwargs.setdefault("cwd", HERE)
kwargs.setdefault("shell", os.name == "nt")
if not isinstance(cmd, (list, tuple)) and os.name != "nt":
Expand Down Expand Up @@ -344,10 +344,10 @@ def run(self):

if not which(npm_cmd[0]):
log.error(
"`{0}` unavailable. If you're running this command "
"using sudo, make sure `{0}` is available to sudo".format(
npm_cmd[0]
)
"`%s` unavailable. If you're running this command "
"using sudo, make sure `%s` is available to sudo",
npm_cmd[0],
npm_cmd[0],
)
return

Expand Down

0 comments on commit 3cf0798

Please sign in to comment.