-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathtest_parmap.py
189 lines (156 loc) · 6.66 KB
/
test_parmap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import multiprocessing
import time
import unittest
import warnings
import parmap
# The fact that parallelization is happening is controlled via reasonable
# guesses of the parmap overhead and the CPU speeds
# Overhead of map_async, should be less than TIME_PER_TEST
TIME_OVERHEAD = 0.4
# The time each call takes to return in the _wait test
TIME_PER_TEST = 0.8
def _wait(x):
"""Dummy function to not do anything"""
time.sleep(TIME_PER_TEST)
return x
def _identity(*x):
return x
_DEFAULT_B = 1
def _fun_with_keywords(x, a=0, b=_DEFAULT_B):
return x + a + b
class ProgrBar:
def __init__(self, **kwargs):
self.expected = "S" + "T" * kwargs["total"] + "E"
self.content = ""
def __enter__(self):
self.content += "S"
return self
def __exit__(self, exception_type, exception_value, traceback):
self.content += "E"
if self.content != self.expected:
raise ValueError(f"Expected {self.expected}, found {self.content}")
def update(self, n=1):
self.content += "T" * n
class TestParmap(unittest.TestCase):
def test_map_without_parallel_timings(self):
NUM_TASKS = 6
items = range(NUM_TASKS)
mytime = time.time()
pfalse = parmap.map(_wait, items, pm_parallel=False)
elapsed = time.time() - mytime
self.assertTrue(elapsed >= TIME_PER_TEST * NUM_TASKS)
self.assertEqual(pfalse, list(range(NUM_TASKS)))
def test_map_with_parallel_timings(self):
NUM_TASKS = 6
items = range(NUM_TASKS)
mytime = time.time()
ptrue = parmap.map(_wait, items, pm_processes=NUM_TASKS, pm_parallel=True)
elapsed = time.time() - mytime
self.assertTrue(elapsed >= TIME_PER_TEST)
self.assertTrue(elapsed < TIME_PER_TEST * (NUM_TASKS - 1))
self.assertEqual(ptrue, list(range(NUM_TASKS)))
def test_map_kwargs(self):
items = range(2)
pfalse = parmap.map(_fun_with_keywords, items, pm_parallel=False, a=10)
ptrue = parmap.map(_fun_with_keywords, items, pm_parallel=True, a=10)
noparmap = [x + 10 + _DEFAULT_B for x in items]
self.assertEqual(pfalse, ptrue)
self.assertEqual(pfalse, noparmap)
def test_map_pbar_true(self):
items = range(4)
pfalse = parmap.map(_wait, items, pm_pbar=False)
ptrue = parmap.map(_wait, items, pm_pbar=True)
noparmap = list(map(_wait, items))
self.assertEqual(pfalse, ptrue)
self.assertEqual(pfalse, noparmap)
def test_map_pbar_dict(self):
items = range(4)
pfalse = parmap.map(_wait, items, pm_pbar=False)
ptrue = parmap.map(_wait, items, pm_pbar={"desc": "prefix"})
noparmap = list(map(_wait, items))
self.assertEqual(pfalse, ptrue)
self.assertEqual(pfalse, noparmap)
def test_map_pbar_callable(self):
items = range(4)
pfalse = parmap.map(_wait, items, pm_pbar=False)
ptrue = parmap.map(_wait, items, pm_pbar=ProgrBar)
noparmap = list(map(_wait, items))
self.assertEqual(pfalse, ptrue)
self.assertEqual(pfalse, noparmap)
def test_map_async_started_simultaneously_timings(self):
items = list(range(4))
mytime0 = time.time()
# These are started in parallel:
with parmap.map_async(_wait, items, pm_processes=4) as compute1:
elapsed1 = time.time() - mytime0
mytime = time.time()
with parmap.map_async(_wait, items, pm_processes=4) as compute2:
elapsed2 = time.time() - mytime
mytime = time.time()
result1 = compute1.get()
elapsed3 = time.time() - mytime0
mytime = time.time()
result2 = compute2.get()
elapsed4 = time.time() - mytime0
self.assertTrue(elapsed1 < TIME_OVERHEAD)
self.assertTrue(elapsed2 < TIME_OVERHEAD)
self.assertTrue(elapsed3 < 4 * TIME_PER_TEST + 2 * TIME_OVERHEAD)
self.assertTrue(elapsed4 < 4 * TIME_PER_TEST + 2 * TIME_OVERHEAD)
self.assertEqual(result1, result2)
self.assertEqual(result1, items)
def test_map_async_noparallel_started_simultaneously_timings(self):
NTASKS = 4
items = list(range(NTASKS))
mytime = time.time()
# These are started in parallel:
with parmap.map_async(_wait, items, pm_parallel=False) as compute1:
elapsed1 = time.time() - mytime
mytime = time.time()
with parmap.map_async(_wait, items, pm_parallel=False) as compute2:
elapsed2 = time.time() - mytime
mytime = time.time()
result1 = compute1.get()
result2 = compute2.get()
finished = time.time() - mytime
self.assertTrue(elapsed1 >= NTASKS * TIME_PER_TEST)
self.assertTrue(elapsed2 >= NTASKS * TIME_PER_TEST)
self.assertTrue(finished <= 2 * TIME_OVERHEAD)
self.assertEqual(result1, result2)
self.assertEqual(result1, items)
def test_map_async(self):
NUM_TASKS = 6
NCPU = 6
items = range(NUM_TASKS)
mytime = time.time()
pfalse = parmap.map_async(_wait, items, pm_parallel=False).get()
elapsed_false = time.time() - mytime
mytime = time.time()
with parmap.map_async(_wait, items, pm_processes=NCPU) as ptrue:
elap_true_async = time.time() - mytime
mytime = time.time()
ptrue_result = ptrue.get()
elap_true_get = time.time() - mytime
noparmap = list(items)
self.assertEqual(pfalse, ptrue_result)
self.assertEqual(pfalse, noparmap)
self.assertTrue(elapsed_false > TIME_PER_TEST * (NUM_TASKS - 1))
self.assertTrue(elap_true_async < TIME_OVERHEAD)
self.assertTrue(elap_true_get < TIME_PER_TEST * (NUM_TASKS - 1))
def test_starmap(self):
items = [(1, 2), (3, 4), (5, 6)]
pfalse = parmap.starmap(_identity, items, 5, 6, pm_parallel=False)
ptrue = parmap.starmap(_identity, items, 5, 6, pm_parallel=True)
self.assertEqual(pfalse, ptrue)
def test_starmap_async(self):
items = [(1, 2), (3, 4), (5, 6)]
pfalse = parmap.starmap_async(_identity, items, 5, 6, pm_parallel=False)
ptrue = parmap.starmap_async(_identity, items, 5, 6, pm_parallel=True)
self.assertEqual(pfalse.get(), ptrue.get())
def test_warn_wrong_argument_map(self):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
parmap.map(range, [1, 2], pm_processes=-3)
self.assertTrue(len(w) > 0)
if __name__ == "__main__":
multiprocessing.freeze_support()
unittest.main()