-
Notifications
You must be signed in to change notification settings - Fork 43
/
k8s_helper.py
127 lines (97 loc) · 3.71 KB
/
k8s_helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import time
from typing import Optional
import kubernetes
import yaml
from kubernetes.client.models import (V1Deployment, V1Namespace, V1ObjectMeta,
V1Pod, V1StatefulSet)
from .thread_logger import get_thread_logger
def is_pod_ready(pod: V1Pod) -> bool:
'''Check if the pod is ready
Args:
pod: Pod object in kubernetes
Returns:
if the pod is ready
'''
if pod.status is None or pod.status.conditions is None:
return False
for condition in pod.status.conditions:
if condition.type == 'Ready' and condition.status == 'True':
return True
return False
def get_deployment_available_status(deployment: V1Deployment) -> bool:
'''Get availability status from deployment condition
Args:
deployment: Deployment object in kubernetes
Returns:
if the deployment is available
'''
if deployment.status is None or deployment.status.conditions is None:
return False
for condition in deployment.status.conditions:
if condition.type == 'Available' and condition.status == 'True':
return True
return False
def get_stateful_set_available_status(stateful_set: V1StatefulSet) -> bool:
'''Get availability status from stateful set condition
Args:
stateful_set: stateful set object in kubernetes
Returns:
if the stateful set is available
'''
if stateful_set.status is None:
return False
if stateful_set.status.replicas > 0 and stateful_set.status.current_replicas == stateful_set.status.replicas:
return True
return False
def get_yaml_existing_namespace(fn: str) -> Optional[str]:
'''Get yaml's existing namespace
Args:
fn (str): Yaml file path
Returns:
bool: True if yaml has namespace
'''
with open(fn, 'r') as operator_yaml:
parsed_operator_documents = yaml.load_all(operator_yaml,
Loader=yaml.FullLoader)
for document in parsed_operator_documents:
if document != None and 'metadata' in document and 'namespace' in document['metadata']:
return document['metadata']['namespace']
return None
def create_namespace(apiclient, name: str) -> V1Namespace:
logger = get_thread_logger(with_prefix=False)
corev1Api = kubernetes.client.CoreV1Api(apiclient)
namespace = None
try:
namespace = corev1Api.create_namespace(
V1Namespace(metadata=V1ObjectMeta(name=name)))
except Exception as e:
logger.error(e)
return namespace
def delete_namespace(apiclient, name: str) -> bool:
logger = get_thread_logger(with_prefix=False)
corev1Api = kubernetes.client.CoreV1Api(apiclient)
try:
corev1Api.delete_namespace(name=name)
except Exception as e:
logger.error(e)
return False
return True
def delete_operator_pod(apiclient, namespace: str) -> bool:
logger = get_thread_logger(with_prefix=False)
coreV1Api = kubernetes.client.CoreV1Api(apiclient)
operator_pod_list = coreV1Api.list_namespaced_pod(
namespace=namespace, watch=False, label_selector="acto/tag=operator-pod").items
if len(operator_pod_list) >= 1:
logger.debug('Got operator pod: pod name:' +
operator_pod_list[0].metadata.name)
else:
logger.error('Failed to find operator pod')
return False
# TODO: refine what should be done if no operator pod can be found
pod = coreV1Api.delete_namespaced_pod(name=operator_pod_list[0].metadata.name,
namespace=namespace)
if pod == None:
return False
else:
time.sleep(10)
return True