-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathapplication.py
99 lines (84 loc) · 3.49 KB
/
application.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import logging
from abc import ABC, abstractmethod
import model_repo
import orchestrator_repo
from metrics import ApplicationMetrics, ApplicationSLO
from simulator import clock, schedule_event, cancel_event, reschedule_event
class Application():
"""
An Application is the endpoint that a Request targets.
Applications can have any number of Instances which all serve the same model.
Requests are scheduled to Instances by the Scheduler.
Application Instances can be autoscaled by the Allocator.
"""
def __init__(self,
application_id,
model_architecture,
model_size,
cluster,
router,
arbiter,
overheads,
scheduler=None,
allocator=None,
instances=None):
self.application_id = application_id
# hardware
self.processors = []
# model
self.model_architecture = model_architecture
self.model_size = model_size
# orchestration
if instances is None:
self.instances = []
self.cluster = cluster
self.scheduler = scheduler
self.allocator = allocator
self.router = router
self.arbiter = arbiter
# overheads
self.overheads = overheads
# metrics
self.metrics = ApplicationMetrics()
self.slo = ApplicationSLO()
def add_instance(self, instance):
"""
Application-specific method to add an instance to the application.
"""
self.instances.append(instance)
self.scheduler.add_instance(instance)
def get_results(self):
allocator_results = self.allocator.get_results()
scheduler_results = self.scheduler.get_results()
self.scheduler.save_all_request_metrics()
return allocator_results, scheduler_results
@classmethod
def from_config(cls, *args, cluster, arbiter, router, **kwargs):
# parse args
application_cfg = args[0]
# get model
model_architecture_name = application_cfg.model_architecture
model_size_name = application_cfg.model_size
model_architecture = model_repo.get_model_architecture(model_architecture_name)
model_size = model_repo.get_model_size(model_size_name)
# get orchestrators
allocator_name = application_cfg.allocator
scheduler_name = application_cfg.scheduler
application = cls(application_id=application_cfg.application_id,
model_architecture=model_architecture,
model_size=model_size,
cluster=cluster,
router=router,
arbiter=arbiter,
overheads=application_cfg.overheads)
allocator = orchestrator_repo.get_allocator(allocator_name,
arbiter=arbiter,
application=application,
debug=application_cfg.debug)
scheduler = orchestrator_repo.get_scheduler(scheduler_name,
router=router,
application=application,
debug=application_cfg.debug)
application.scheduler = scheduler
application.allocator = allocator
return application