Skip to content

Commit

Permalink
integration: fix all linter warnings in test harness. (#898)
Browse files Browse the repository at this point in the history
Signed-off-by: Nashwan Azhari <[email protected]>
  • Loading branch information
Nashwan Azhari authored Dec 20, 2024
1 parent 324cebb commit 71a4869
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 70 deletions.
7 changes: 5 additions & 2 deletions tests/integration/tests/test_util/harness/lxd.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import shlex
import subprocess
from pathlib import Path
from typing import List

from test_util import config
from test_util.harness import Harness, HarnessError, Instance
Expand Down Expand Up @@ -92,6 +91,10 @@ def new_instance(self, network_type: str = "IPv4") -> Instance:
)

if network_type.lower() == "dualstack":
if self.dualstack_profile is None:
raise ValueError(
"LXD `dualstack_profile` must be set for network_type=dualstack"
)
launch_lxd_command.extend(["-p", self.dualstack_profile])

if network_type.lower() == "ipv6":
Expand Down Expand Up @@ -154,7 +157,7 @@ def _configure_profile(self, profile_name: str, profile_config: str):
except subprocess.CalledProcessError as e:
raise HarnessError(f"Failed to configure LXD profile {profile_name}") from e

def _configure_network(self, network_name: str, *network_args: List[str]):
def _configure_network(self, network_name: str, *network_args: str):
LOG.debug("Checking for LXD network %s", network_name)
try:
run(["lxc", "network", "show", network_name])
Expand Down
160 changes: 92 additions & 68 deletions tests/integration/tests/test_util/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,65 @@ def run(command: list, **kwargs) -> subprocess.CompletedProcess:
return subprocess.run(command, **kwargs)


class Retriable:
def __init__(self, retry_kwargs: Optional[Mapping[str, Any]]) -> None:
self._condition = None
self._run = partial(run, capture_output=True)
if retry_kwargs is None:
retry_kwargs = {}
self._retry_kwargs = retry_kwargs

def exec(
self,
command_args: List[str],
**command_kwds,
):
return retry(**self._retry_kwargs)(self._exec)(command_args, **command_kwds)

def _exec(
self,
command_args: List[str],
**command_kwds,
):
"""
Execute a command against a harness or locally with subprocess to be retried.
:param List[str] command_args: The command to be executed, as a str or list of str
:param Mapping[str,str] command_kwds: Additional keyword arguments to be passed to exec
"""

try:
resp = self._run(command_args, **command_kwds)
except subprocess.CalledProcessError as e:
LOG.warning(f" rc={e.returncode}")
LOG.warning(f" stdout={e.stdout.decode()}")
LOG.warning(f" stderr={e.stderr.decode()}")
raise
if self._condition:
assert self._condition(resp), "Failed to meet condition"
return resp

def on(self, instance: harness.Instance) -> "Retriable":
"""
Target the command at some instance.
:param instance Instance: Instance on a test harness.
"""
self._run = partial(instance.exec, capture_output=True)
return self

def until(
self, condition: Callable[[subprocess.CompletedProcess], bool] | None = None
) -> "Retriable":
"""
Test the output of the executed command against an expected response
:param Callable condition: a callable which returns a truth about the command output
"""
self._condition = condition
return self


def stubbornly(
retries: Optional[int] = None,
delay_s: Optional[Union[float, int]] = None,
Expand All @@ -71,78 +130,32 @@ def stubbornly(
def _before_sleep(retry_state: RetryCallState):
attempt = retry_state.attempt_number
tries = f"/{retries}" if retries is not None else ""
LOG.info(
f"Attempt {attempt}{tries} failed. Error: {retry_state.outcome.exception()}"
)
errstr = ""
if retry_state.outcome:
errstr = f" Error: {retry_state.outcome.exception()}"
LOG.info(f"Attempt {attempt}{tries} failed.{errstr}")
LOG.info(f"Retrying in {delay_s} seconds...")

_waits = wait_fixed(delay_s) if delay_s is not None else wait_fixed(0)
_stops = stop_after_attempt(retries) if retries is not None else stop_never
_exceptions = exceptions or (Exception,) # default to retry on all exceptions
waits = wait_fixed(delay_s) if delay_s is not None else wait_fixed(0)
stops = stop_after_attempt(retries) if retries is not None else stop_never
exceptions = exceptions or (Exception,) # default to retry on all exceptions

_retry_args = dict(
wait=_waits,
stop=_stops,
retry=retry_if_exception_type(_exceptions),
retry_args = dict(
wait=waits,
stop=stops,
retry=retry_if_exception_type(exceptions),
before_sleep=_before_sleep,
)
# Permit any tenacity retry overrides from these ^defaults
_retry_args.update(retry_kds)

class Retriable:
def __init__(self) -> None:
self._condition = None
self._run = partial(run, capture_output=True)

@retry(**_retry_args)
def exec(
self,
command_args: List[str],
**command_kwds,
):
"""
Execute a command against a harness or locally with subprocess to be retried.
:param List[str] command_args: The command to be executed, as a str or list of str
:param Mapping[str,str] command_kwds: Additional keyword arguments to be passed to exec
"""

try:
resp = self._run(command_args, **command_kwds)
except subprocess.CalledProcessError as e:
LOG.warning(f" rc={e.returncode}")
LOG.warning(f" stdout={e.stdout.decode()}")
LOG.warning(f" stderr={e.stderr.decode()}")
raise
if self._condition:
assert self._condition(resp), "Failed to meet condition"
return resp

def on(self, instance: harness.Instance) -> "Retriable":
"""
Target the command at some instance.
:param instance Instance: Instance on a test harness.
"""
self._run = partial(instance.exec, capture_output=True)
return self

def until(
self, condition: Callable[[subprocess.CompletedProcess], bool] = None
) -> "Retriable":
"""
Test the output of the executed command against an expected response
:param Callable condition: a callable which returns a truth about the command output
"""
self._condition = condition
return self

return Retriable()
retry_args.update(retry_kds)

return Retriable(retry_args)


def _as_int(value: Optional[str]) -> Optional[int]:
"""Convert a string to an integer."""
if value is None:
return value
try:
return int(value)
except (TypeError, ValueError):
Expand Down Expand Up @@ -249,10 +262,10 @@ def wait_until_k8s_ready(
If the instance name is different from the hostname, the instance name should be passed to the
node_names dictionary, e.g. {"instance_id": "node_name"}.
"""
instance_id_node_name_map = {}
for instance in instances:
if instance.id in node_names:
node_name = node_names[instance.id]
else:
node_name = node_names.get(instance.id)
if node_name is None:
node_name = hostname(instance)

result = (
Expand All @@ -261,8 +274,13 @@ def wait_until_k8s_ready(
.until(lambda p: " Ready" in p.stdout.decode())
.exec(["k8s", "kubectl", "get", "node", node_name, "--no-headers"])
)
LOG.info("Kubelet registered successfully!")
LOG.info("%s", result.stdout.decode())
LOG.info(f"Kubelet registered successfully on instance '{instance.id}'")
LOG.info("%s", result.stdout.decode())
instance_id_node_name_map[instance.id] = node_name
LOG.info(
"Successfully checked Kubelet registered on all harness instances: "
f"{instance_id_node_name_map}"
)


def wait_for_dns(instance: harness.Instance):
Expand Down Expand Up @@ -362,7 +380,7 @@ def get_default_ip(instance: harness.Instance):
return p.stdout.decode().split(" ")[8]


def get_global_unicast_ipv6(instance: harness.Instance, interface="eth0") -> str:
def get_global_unicast_ipv6(instance: harness.Instance, interface="eth0") -> str | None:
# ---
# 2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc fq_codel state UP group default qlen 1000
# link/ether 00:16:3e:0f:4d:1e brd ff:ff:ff:ff:ff:ff
Expand Down Expand Up @@ -488,6 +506,12 @@ def previous_track(snap_version: str) -> str:
stable = r.read().decode().strip()
maj_min = major_minor(stable)

if not maj_min:
raise ValueError(
"Failed to determine previous snap version track for "
f"current version: {snap_version}"
)

flavor_track = {"": "classic", "strict": ""}.get(config.FLAVOR, config.FLAVOR)
track = f"{maj_min[0]}.{maj_min[1]}" + (flavor_track and f"-{flavor_track}")
LOG.info("Previous track for %s is from track: %s", snap_version, track)
Expand Down

0 comments on commit 71a4869

Please sign in to comment.