-
Notifications
You must be signed in to change notification settings - Fork 98
/
Copy paththreadedgenerator.py
111 lines (94 loc) · 2.71 KB
/
threadedgenerator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
"""
Code from https://gist.github.com/everilae/9697228
I added __next__
QHduan
"""
# A simple generator wrapper, not sure if it's good for anything at all.
# With basic python threading
from threading import Thread
from queue import Queue
# ... or use multiprocessing versions
# WARNING: use sentinel based on value, not identity
# from multiprocessing import Process, Queue as MpQueue
class ThreadedGenerator(object):
"""
Generator that runs on a separate thread, returning values to calling
thread. Care must be taken that the iterator does not mutate any shared
variables referenced in the calling thread.
"""
def __init__(self, iterator,
sentinel=object(),
queue_maxsize=0,
daemon=False):
self._iterator = iterator
self._sentinel = sentinel
self._queue = Queue(maxsize=queue_maxsize)
self._thread = Thread(
name=repr(iterator),
target=self._run
)
self._thread.daemon = daemon
self._started = False
def __repr__(self):
return 'ThreadedGenerator({!r})'.format(self._iterator)
def _run(self):
try:
for value in self._iterator:
if not self._started:
return
self._queue.put(value)
finally:
self._queue.put(self._sentinel)
def close(self):
self._started = False
try:
while True:
self._queue.get(timeout=0)
except KeyboardInterrupt as e:
raise e
except: # pylint: disable=bare-except
pass
# self._thread.join()
def __iter__(self):
self._started = True
self._thread.start()
for value in iter(self._queue.get, self._sentinel):
yield value
self._thread.join()
self._started = False
def __next__(self):
if not self._started:
self._started = True
self._thread.start()
value = self._queue.get(timeout=30)
if value == self._sentinel:
raise StopIteration()
return value
def test():
"""测试"""
def gene():
i = 0
while True:
yield i
i += 1
t = gene()
tt = ThreadedGenerator(t)
for _ in range(10):
print(next(tt))
tt.close()
# for i in range(10):
# print(next(tt))
# for t in ThreadedGenerator(range(10)):
# print(t)
# print('-' * 10)
#
# t = ThreadedGenerator(range(10))
# # def gene():
# # for t in range(10):
# # yield t
# # t = gene()
# for _ in range(10):
# print(next(t))
# print('-' * 10)
if __name__ == '__main__':
test()