forked from PanDAWMS/pilot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathArgoJob.py
152 lines (128 loc) · 6 KB
/
ArgoJob.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import sys
from pUtil import serialize, deserialize, convert_unicode_string, tolog
from BalsamJob import BalsamJob
class ArgoJob:
def __init__(self,
preprocess = None,
preprocess_args = None,
postprocess = None,
postprocess_args = None,
input_url = None,
output_url = None,
username = None,
email_address = None,
group_identifier = None,
job_status_routing_key = None,
):
self.preprocess = preprocess
self.preprocess_args = preprocess_args
self.postprocess = postprocess
self.postprocess_args = postprocess_args
self.input_url = input_url
self.output_url = output_url
self.username = username
self.email_address = email_address
self.group_identifier = group_identifier
self.job_status_routing_key = job_status_routing_key
self.jobs = []
def add_job(self,job):
if isinstance(job,BalsamJob):
self.jobs.append(job)
else:
tolog(' Only jobs of the BalsamJob class can be added to this list. ')
raise Exception(' JobTypeError ')
def get_jobs_dictionary_list(self):
# convert job objects into json strings within the list
tmp_jobs = []
for job in self.jobs:
tmp_jobs.append(job.__dict__)
return tmp_jobs
def get_job_list_text(self):
return serialize(self.get_jobs_dictionary_list())
def serialize(self):
# create temp ArgoJob and fill with this list of json job strings
tmp_argojob = ArgoJob()
tmp_argojob.__dict__ = self.__dict__.copy()
tmp_argojob.jobs = self.get_jobs_dictionary_list()
try:
return serialize(tmp_argojob.__dict__)
except:
tolog(' received exception while converting ArgoJob to json string: ' + str(sys.exc_info()[1]))
raise
def deserialize(self,text_string):
# fill self with json dictionary
try:
self.__dict__ = deserialize(text_string)
except:
tolog(' received exception while converting json string to ArgoJob: ' + str(sys.exc_info()[1]))
raise
# convert unicode strings to strings
self.preprocess = convert_unicode_string(self.preprocess)
self.preprocess_args = convert_unicode_string(self.preprocess_args)
self.postprocess = convert_unicode_string(self.postprocess)
self.postprocess_args = convert_unicode_string(self.postprocess_args)
self.input_url = convert_unicode_string(self.input_url)
self.output_url = convert_unicode_string(self.output_url)
self.username = convert_unicode_string(self.username)
self.group_identifier = convert_unicode_string(self.group_identifier)
self.job_status_routing_key = convert_unicode_string(self.job_status_routing_key)
# need to convert vector of json job strings to objects
tmp_jobs = []
for job_dictionary in self.jobs:
tmp_job = BalsamJob()
tmp_job.__dict__ = job_dictionary
tmp_jobs.append(tmp_job)
# now copy into job list
self.jobs = tmp_jobs
class ArgoJobStatus:
CREATED = 'CREATED'
STAGED_IN = 'STAGED_IN'
STAGEIN_FAILED = 'STAGEIN_FAILED'
PREPROCESSED = 'PREPROCESSED'
PREPROCESSING_FAILED = 'PREPROCESSING_FAILED'
SUBJOB_PREPARED = 'SUBJOB_PREPARED'
SUBJOB_PREP_FAILED = 'SUBJOB_PREP_FAILED'
SUBJOB_STAGED_OUT = 'SUBJOB_STAGED_OUT'
SUBJOB_STAGEOUT_FAILED = 'SUBJOB_STAGEOUT_FAILED'
SUBJOB_SUBMITTED = 'SUBJOB_SUBMITTED'
SUBJOB_SUBMIT_FAILED = 'SUBJOB_SUBMIT_FAILED'
SUBJOB_QUEUED = 'SUBJOB_QUEUED'
SUBJOB_RUNNING = 'SUBJOB_RUNNING'
SUBJOB_RUN_FINISHED = 'SUBJOB_RUN_FINISHED'
SUBJOB_RUN_FAILED = 'SUBJOB_RUN_FAILED'
SUBJOB_STAGED_IN = 'SUBJOB_STAGED_IN'
SUBJOB_STAGEIN_FAILED = 'SUBJOB_STAGEIN_FAILED'
SUBJOB_COMPLETED = 'SUBJOB_COMPLETED'
SUBJOB_COMPLETE_FAILED = 'SUBJOB_COMPLETE_FAILED'
POSTPROCESSED = 'POSTPROCESSED'
POSTPROCESSING_FAILED = 'POSTPROCESSING_FAILED'
STAGED_OUT = 'STAGED_OUT'
STAGEOUT_FAILED = 'STAGEOUT_FAILED'
HISTORY = 'HISTORY' # final state
FAILED = 'FAILED'
def __init__(self):
self.state = None
self.job_id = None
self.message = None
def get_serialized_message(self):
return serialize(self.__dict__)
@staticmethod
def get_from_message(message):
tmp = ArgoJobStatus()
tmp.__dict__ = deserialize(message)
return tmp
def is_failed(self):
failed_states = [self.STAGEIN_FAILED,
self.PREPROCESSING_FAILED,
self.SUBJOB_PREP_FAILED,
self.SUBJOB_STAGEOUT_FAILED,
self.SUBJOB_SUBMIT_FAILED,
self.SUBJOB_RUN_FAILED,
self.SUBJOB_STAGEIN_FAILED,
self.SUBJOB_COMPLETE_FAILED,
self.POSTPROCESSING_FAILED,
self.STAGEOUT_FAILED]
if self.state in failed_states:
return True
else:
return False