-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathsimulator.py
185 lines (153 loc) · 5.26 KB
/
simulator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import heapq
import logging
from collections import defaultdict
import utils
# global simulator that drives the simulation
# bad practice, but it works for now
sim = None
class Event:
"""
Events are scheduled actions in the simulator.
"""
def __init__(self, time, action):
self.time = time
self.action = action
def __str__(self):
return f"Event with time {self.time} and action {self.action}"
def __lt__(self, other):
return self.time < other.time
class Simulator:
"""
A discrete event simulator that schedules and runs Events.
"""
def __init__(self, end_time):
global sim
sim = self
self.time = 0
self.end_time = end_time
self.events = []
self.deleted_events = []
logging.info("Simulator initialized")
# logger for simulator events
self.logger = utils.file_logger("simulator")
self.logger.info("time,event")
def schedule(self, delay, action):
"""
Schedule an event by specifying delay and an action function.
"""
# run immediately if delay is 0
if delay == 0:
action()
return None
event = Event(self.time + delay, action)
heapq.heappush(self.events, event)
return event
def cancel(self, event):
"""
Cancel an event.
"""
self.deleted_events.append(event)
def reschedule(self, event, delay):
"""
Reschedule an event by cancelling and scheduling it again.
"""
self.cancel(event)
return self.schedule(delay, event.action)
def run(self):
"""
Run the simulation until the end time.
"""
while self.events and self.time < self.end_time:
event = heapq.heappop(self.events)
if event in self.deleted_events:
self.deleted_events.remove(event)
continue
self.time = event.time
event.action()
self.logger.debug(f"{event.time},{event.action}")
class TraceSimulator(Simulator):
"""
A discrete event simulator that processes Request arrivals from a Trace.
"""
def __init__(self,
trace,
cluster,
applications,
router,
arbiter,
end_time):
super().__init__(end_time)
self.trace = trace
self.cluster = cluster
self.applications = applications
self.router = router
self.arbiter = arbiter
logging.info("TraceSimulator initialized")
self.load_trace()
def load_trace(self):
"""
Load requests from the trace as arrival events.
"""
for request in self.trace.requests:
self.schedule(request.arrival_timestamp,
lambda request=request: self.router.request_arrival(request))
def run(self):
# start simulation by scheduling a cluster run
self.schedule(0, self.cluster.run)
self.schedule(0, self.router.run)
self.schedule(0, self.arbiter.run)
# run simulation
super().run()
self.logger.info(f"{self.time},end")
logging.info(f"TraceSimulator completed at {self.time}")
self.save_results()
def save_results(self, detailed=True):
"""
Save results at the end of the simulation.
"""
self.router.save_results()
sched_results = {}
alloc_results = {}
for application_id, application in self.applications.items():
allocator_results, scheduler_results = application.get_results()
alloc_results[application_id] = allocator_results
sched_results[application_id] = scheduler_results
# summary sched results
summary_results = defaultdict(list)
for application_id, results_dict in sched_results.items():
summary_results["application_id"].append(application_id)
for key, values in results_dict.items():
summary = utils.get_statistics(values)
# merge summary into summary_results
for metric, value in summary.items():
summary_results[f"{key}_{metric}"].append(value)
# save summary results
utils.save_dict_as_csv(summary_results, "summary.csv")
if detailed:
# create a dataframe of all requests, save as csv
for application_id, result in sched_results.items():
utils.save_dict_as_csv(result, f"detailed/{application_id}.csv")
for application_id, result in alloc_results.items():
utils.save_dict_as_csv(result, f"detailed/{application_id}_alloc.csv")
# Convenience functions for simulator object
def clock():
"""
Return the current time of the simulator.
"""
return sim.time
def schedule_event(*args):
"""
Schedule an event in the simulator at desired delay.
"""
return sim.schedule(*args)
def cancel_event(*args):
"""
Cancel existing event in the simulator.
"""
return sim.cancel(*args)
def reschedule_event(*args):
"""
Reschedule existing event in the simulator.
Equivalent to cancelling and scheduling a new event.
"""
return sim.reschedule(*args)