-
Notifications
You must be signed in to change notification settings - Fork 43
/
testplan.py
278 lines (210 loc) · 8.35 KB
/
testplan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
import json
import random
from typing import List, Tuple
from acto.schema.base import TreeNode
from acto.utils import get_thread_logger
from .testcase import TestCase
class InputTreeNode(TreeNode):
def __init__(self, path: list) -> None:
self.path = list(path)
self.parent: TreeNode = None
self.children = {}
self.testcases = []
self.node_disabled = False
self.subtree_disabled = False
# for Overspecified fields analysis
self.used = False
def add_child(self, key: str, child: 'TreeNode'):
self.children[key] = child
child.set_parent(self)
child.path = self.path + [key]
def set_parent(self, parent: 'TreeNode'):
self.parent = parent
def set_used(self):
self.used = True
parent = self.parent
while parent != None:
parent.used = True
parent = parent.parent
return
def add_testcases(self, testcases: list):
self.testcases.extend(testcases)
def add_testcases_by_path(self, testcases: list, path: list):
if len(path) == 0:
self.testcases.extend(testcases)
else:
key = path.pop(0)
if key not in self.children:
if 'ITEM' in self.children and isinstance(key, int):
concrete_node = self.children['ITEM'].deepcopy(self.path + [key])
concrete_node.path[-1] = key
concrete_node.add_testcases_by_path(testcases, path)
self.children[key] = concrete_node
elif 'additional_properties' in self.children and isinstance(key, str):
concrete_node = self.children['additional_properties'].deepcopy(self.path +
[key])
concrete_node.path[-1] = key
concrete_node.add_testcases_by_path(testcases, path)
self.children[key] = concrete_node
else:
raise KeyError('key: %s, path %s' % (key, path))
else:
self.children[key].add_testcases_by_path(testcases, path)
def enable_subtree(self):
self.node_disabled = False
self.subtree_disabled = False
for child in self.children.values():
child.enable_subtree()
def disable_node(self):
'''This node will not be selected at this step'''
self.node_disabled = True
def disable_subtree(self):
'''This node and its children will not be selected at this step'''
self.subtree_disabled = True
def disable_ancestors(self):
parent = self.parent
while parent != None:
parent.disable_node()
parent = parent.parent
return
def discard_testcase(self, discarded_testcases: dict):
'''Discard the current testcase, store the discarded testcase into the parameter
Args:
discarded_testcases: dict to store the discarded testcase
'''
encoded_path = json.dumps(self.path)
if len(self.testcases) > 0:
discarded_testcase = self.testcases.pop()
else:
discarded_testcase = {}
if encoded_path in discarded_testcases:
discarded_testcases[encoded_path].append(discarded_testcase)
else:
discarded_testcases[encoded_path] = [discarded_testcase]
def get_node_by_path(self, path: list) -> 'TreeNode':
logger = get_thread_logger(with_prefix=True)
if len(path) == 0:
return self
key = path.pop(0)
if key in self:
return self[key].get_node_by_path(path)
else:
logger.error('%s not in children', key)
logger.error('%s', self.children)
return None
def get_children(self) -> dict:
return self.children
def get_testcases(self) -> list:
return self.testcases
def get_next_testcase(self) -> TestCase:
return self.testcases[-1]
def get_path(self) -> list:
return self.path
def traverse_func(self, func: callable):
if func(self):
for child in self.children.values():
child.traverse_func(func)
def __getitem__(self, key):
if key not in self.children:
if 'ITEM' in self.children and key == 'INDEX':
return self.children['ITEM']
if isinstance(key, int) and 'ITEM' in self.children:
return self.children['ITEM']
elif 'additional_properties' in self.children and isinstance(key, str):
return self.children['additional_properties']
else:
raise KeyError('key: %s' % key)
else:
return self.children[key]
def __contains__(self, key) -> bool:
if key not in self.children:
if 'ITEM' in self.children and key == 'INDEX':
return True
if 'ITEM' in self.children and isinstance(key, int):
return True
elif 'additional_properties' in self.children and isinstance(key, str):
return True
else:
return False
else:
return True
def __str__(self) -> str:
return str(self.path)
def eligible_fields(self) -> List['TreeNode']:
'''Returns all eligible fields of this subtree
a field is eligible if it is not disabled and has at least one testcase
'''
if self.subtree_disabled:
return []
ret = []
if not self.node_disabled and len(self.testcases) > 0:
ret.append(self)
for key, child in self.children.items():
if key != 'ITEM' and key != 'additional_properties':
ret.extend(child.eligible_fields())
return ret
def deepcopy(self, path: list) -> 'InputTreeNode':
ret = InputTreeNode(path)
for key, child in self.children.items():
ret.add_child(key, child.deepcopy(path + [key]))
ret.testcases = list(self.testcases)
return ret
class TestPlan():
def __init__(self, root: TreeNode) -> None:
self.root = root
def select_fields(self, num_cases: int = 1) -> List[TreeNode]:
logger = get_thread_logger(with_prefix=True)
ret = []
for i in range(num_cases):
eligible_fields = self.root.eligible_fields()
if len(eligible_fields) == 0:
break
field = random.choice(eligible_fields)
field.disable_subtree()
field.disable_ancestors()
ret.append(field)
logger.info('TestPlan: selected %s', field.get_path())
self.root.enable_subtree()
return ret
def add_testcases_by_path(self, testcases: list, path: list):
node = self.root.add_testcases_by_path(testcases, path)
def __len__(self):
return sum([len(i.get_testcases()) for i in self.root.eligible_fields()])
class TestGroup:
def __init__(self, tests: List[Tuple[str, TestCase]]):
self.tests = tests
def discard_testcase(self, discarded_testcases: dict):
'''Discard the current testcase, store the discarded testcase into the parameter
Args:
discarded_testcases: dict to store the discarded testcase
'''
path, testcase = self.tests.pop(0)
encoded_path = json.dumps(path)
if encoded_path in discarded_testcases:
discarded_testcases[encoded_path].append(testcase)
else:
discarded_testcases[encoded_path] = [testcase]
def get_next_testcase(self) -> Tuple[str, TestCase]:
return self.tests[0]
def finish_testcase(self):
self.tests.pop(0)
def __len__(self):
return len(self.tests)
class DeterministicTestPlan(TestPlan):
def __init__(self):
self.groups = []
pass
def next_group(self):
if len(self.groups) == 0:
return None
elif len(self.groups[0]) == 0:
self.groups.pop(0)
return None
else:
return self.groups[0]
def add_testcase_groups(self, groups: List[TestGroup]):
self.groups.extend(groups)
def add_testcase_group(self, groups: TestGroup):
self.groups.append(groups)
def __len__(self):
return sum([len(i) for i in self.groups])