-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathinsideout.nim
131 lines (117 loc) · 3.82 KB
/
insideout.nim
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import std/genasts
import std/macros
import pkg/cps
import insideout/spec
import insideout/pools
import insideout/mailboxes
import insideout/runtimes
import insideout/valgrind
export mailboxes
export runtimes
export valgrind
export pools
proc goto*[T](continuation: var T; where: Mailbox[T]): T {.cpsMagic.} =
## move the current continuation to another compute domain
# we want to be sure that a future destroy finds nothing,
# so we move the continuation and then send /that/ ref.
where.send(move continuation)
result = nil.T
macro createWaitron*(A: typedesc; B: typedesc): untyped =
## The compiler really hates when you do this one thing;
## but they cannot stop you!
let name =
nskProc.genSym:
"waitron " & repr(A) & " To " & repr(B)
name.copyLineInfo(A)
genAstOpt({}, name, A, B):
proc name(box: Mailbox[B]) {.cps: A.} =
## continuously consume and run `B` continuations
mixin coop
debug "starting waitron"
while true:
var c: Continuation
var r = tryRecv(box, c.B)
case r
of Unreadable:
debug "shutting down due to unreadable mailbox"
break
of Interrupt:
debug "caught interrupt"
of Received:
while c.running:
debug "will bounce continuation"
c = bounce c
coop()
# reap the local in the cps environment
reset c
else:
debug r, "; waiting for poppable"
if not box.waitForPoppable():
debug "shutting down due to unavailable mailbox"
break
coop()
debug "exiting waitron"
whelp name
macro createRunner*(A: typedesc; B: typedesc): untyped =
## Create a dispatcher, itself an `A` continuation,
## which runs a single `B` continuation and terminates.
let name =
nskProc.genSym:
"runner " & repr(A) & " To " & repr(B)
name.copyLineInfo(A)
genAstOpt({}, name, A, B):
proc name(box: Mailbox[B]) {.cps: A.} =
## run a single `B` continuation
mixin coop
debug "starting ", B, " runner"
while true:
var c: Continuation
var r = box.tryRecv(B c)
case r
of Received:
while c.running:
debug "will bounce continuation"
c = bounce c
coop()
reset c
break
of Unreadable:
debug "shutting down due to unreadable mailbox"
break
of Interrupt:
debug "caught interrupt"
else:
debug r, "; waiting for poppable"
if not box.waitForPoppable():
debug "shutting down due to unavailable mailbox"
break
debug "wait complete"
coop()
debug "exiting ", B, " runner"
whelp name
const
ContinuationWaiter* = createWaitron(Continuation, Continuation)
ContinuationRunner* = createRunner(Continuation, Continuation)
type
ComeFrom = ref object of Continuation
reply: Mailbox[Continuation]
proc landing(c: sink Continuation): Continuation =
goto(c.mom, (ComeFrom c).reply)
proc comeFrom*[T](c: var T; into: Mailbox[T]): Continuation {.cpsMagic.} =
## move the continuation to the given mailbox; control
## resumes in the current thread when successful
# NOTE: the mom, which is Continuation, defines the reply mailbox type;
# thus, the return value of comeFrom()
var reply = newMailbox[Continuation]()
c.mom = ComeFrom(fn: landing, mom: move c.mom, reply: reply)
discard goto(c, into)
result = recv reply
proc novelThread*[T](c: var T): T {.cpsMagic.} =
## move to a new thread; control resumes
## in the current thread when complete
## NOTE: specifying `T` goes away if cps loses color
const Waiter = createRunner(T, T)
var mailbox = newMailbox[T](1)
var runtime = spawn(Waiter, mailbox)
result = cast[T](comeFrom(c, mailbox))
join runtime