-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathexecutor.py
173 lines (151 loc) · 5.45 KB
/
executor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import logging
from enum import IntEnum
from flow import Flow
from node import NodeState
from simulator import clock, schedule_event, cancel_event, reschedule_event
from task import Task
class ExecutorType(IntEnum):
LocalExecutor = 0
CentralExecutor = 1
class Executor():
"""
Executor orchestrates Request execution on Instances and Interconnects.
Executors can themselves run anywhere, e.g., on the Scheduler, Instance, etc.,
with different amounts of overheads.
They could execute multiple Tasks/Flows of the Request in parallel.
NOTE: We don't ensure predeccessors of node are completed before submit.
Implicitly, we assume that the Request is a tree instead of a DAG.
Could be changed by waiting on Node predecessors.
"""
def __init__(self,
request,
scheduler,
overheads):
self.request = request
self.scheduler = scheduler
self.overheads = overheads
self.submitted = []
# to cancel any events
self.completion_events = {}
def successors(self, node):
"""
Returns the successors of the specified node.
"""
nodes = self.request.successors(node)
return nodes
def check_predecessors(self, node):
"""
Checks if all predecessors of the specified node are completed.
"""
for predecessor in self.request.predecessors(node):
if predecessor.state != NodeState.COMPLETED:
return False
return True
def submit(self, node=None):
"""
Submits the specified node for execution.
"""
if isinstance(node, Task):
self.submit_task(node)
elif isinstance(node, Flow):
self.submit_flow(node)
else:
raise ValueError(f"Unknown node type: {type(node)}")
def submit_chain(self, chain):
"""
Submits the specified chain of Nodes for execution.
"""
for node in chain:
self.submit(node)
def submit_task(self, task, instance=None):
"""
Submits the specified task for execution.
If instance is not specified, uses the task's instance.
"""
if instance is None:
instance = task.instance
task.executor = self
self.submitted.append(task)
schedule_event(self.overheads.submit_task,
lambda instance=instance,task=task: \
instance.task_arrival(task))
# if this is the first task in the chain, submit the chain
self.submit_chain(task.chain)
def finish_task(self, task, instance):
"""
Finishes the specified task.
"""
self.submitted.remove(task)
successor_nodes = list(self.successors(task))
# NOTE: assumes a single leaf node
if len(successor_nodes) == 0:
self.finish_request()
return
# submit nodes for whom all predecessors have completed
# and are not already submitted
for node in successor_nodes:
if node.state == NodeState.NONE and self.check_predecessors(node):
self.submit(node)
def submit_flow(self, flow, link=None):
"""
Submits the specified flow for execution.
If link is not specified, uses the flow's link.
"""
if link is None:
link = flow.link
flow.executor = self
self.submitted.append(flow)
schedule_event(self.overheads.submit_flow,
lambda link=link,flow=flow: link.flow_arrival(flow))
# if this is the first flow in the chain, submit the chain
self.submit_chain(flow.chain)
def finish_flow(self, flow, link):
"""
Finishes the specified flow.
"""
self.submitted.remove(flow)
successor_nodes = list(self.successors(flow))
# NOTE: assumes a single leaf node
if len(successor_nodes) == 0:
self.finish_request()
return
# submit nodes for whom all predecessors have completed
# and are not already submitted
for node in successor_nodes:
if node.state == NodeState.NONE and self.check_predecessors(node):
self.submit(node)
def finish_request(self):
"""
Finishes executing the entire Request.
"""
def fin_req():
self.scheduler.request_completion(self.request)
schedule_event(self.overheads.finish_request, fin_req)
def run(self):
"""
Runs the Request by submitting the root node.
"""
self.submit(self.request.root_node)
@classmethod
def create(cls, executor_type, request, scheduler, overheads):
"""
Creates an Executor instance based on the specified type.
"""
if executor_type == ExecutorType.CentralExecutor:
return CentralExecutor(request, scheduler, overheads)
if executor_type == ExecutorType.LocalExecutor:
return LocalExecutor(request, scheduler, overheads)
raise ValueError(f"Unsupported executor type: {executor_type}")
class CentralExecutor(Executor):
"""
CentralExecutor coordinates with Scheduler for each Task.
Logically, it runs within Scheduler itself.
TODO: appropriate overheads
"""
pass
class LocalExecutor(Executor):
"""
LocalExecutor logically runs on Servers, alongside Instances.
TODO: appropriate overheads
"""
pass