-
Notifications
You must be signed in to change notification settings - Fork 0
/
SubmitOptions.py
318 lines (252 loc) · 11.8 KB
/
SubmitOptions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
import copy
import re
import heapq
from collections import OrderedDict
from datetime import timedelta
import SubmitCommon as sc
from SubmitArgpacks import SubmitArgpacks
from HpcArgpacks import HpcArgpacks
PBS_RESOURCE_REGEX_STR = r"(?P<start>[ ]*-l[ ]+)?(?P<res>\w+)=(?P<amount>.*?)(?=:|[ ]*-l[ ]|$)"
PBS_RESOURCE_REGEX = re.compile( PBS_RESOURCE_REGEX_STR )
PBS_TIMELIMIT_REGEX_STR = r"^(?P<hh>\d+):(?P<mm>\d+):(?P<ss>\d+)$"
PBS_TIMELIMIT_REGEX = re.compile( PBS_TIMELIMIT_REGEX_STR )
PBS_TIMELIMIT_FORMAT_STR = "{:02}:{:02}:{:02}"
class SubmitOptions( ) :
def __init__( self, optDict={}, isHostSpecific=False, lockSubmitType=False, origin=None, print=print ) :
self.submit_ = optDict
self.workingDirectory_ = None
self.queue_ = None
self.timelimit_ = None
self.wait_ = None
# Should be set at test level
self.debug_ = None
self.account_ = None
# Can be set on a per-action basis, but can also be overridden if cmdline opt
self.submitType_ = None
self.lockSubmitType_ = lockSubmitType
# Should be set via the step
self.name_ = None
self.dependencies_ = None
self.logfile_ = None
self.hpcArguments_ = HpcArgpacks ( OrderedDict() )
self.arguments_ = SubmitArgpacks( OrderedDict() )
# Allow host-specific submit options
self.isHostSpecific_ = isHostSpecific
self.hostSpecificOptions_ = {}
self.parse( origin=origin, print=print )
def parse( self, print=print, origin=None ):
try :
submitKeys = []
key = "working_directory"
submitKeys.append( key )
if key in self.submit_ :
self.workingDirectory_ = self.submit_[ key ]
key = "queue"
submitKeys.append( key )
if key in self.submit_ :
self.queue_ = self.submit_[ key ]
key = "timelimit"
submitKeys.append( key )
if key in self.submit_ :
self.timelimit_ = self.submit_[ key ]
key = "wait"
submitKeys.append( key )
if key in self.submit_ :
self.wait_ = self.submit_[ key ]
key = "hpc_arguments"
submitKeys.append( key )
if key in self.submit_ :
self.hpcArguments_.update( HpcArgpacks( self.submit_[ key ], origin ), print )
key = "arguments"
submitKeys.append( key )
if key in self.submit_ :
self.arguments_.update( SubmitArgpacks( self.submit_[ key ], origin ), print )
# Allow parsing of per-action submission
key = "submission"
submitKeys.append( key )
if key in self.submit_ :
if not self.lockSubmitType_ :
self.submitType_ = sc.SubmissionType( self.submit_[ key ] )
# Process all other keys as host-specific options
for key, value in self.submit_.items() :
if key not in submitKeys :
if not self.isHostSpecific_ :
# ok to parse
self.hostSpecificOptions_[ key ] = SubmitOptions( value, isHostSpecific=True )
self.hostSpecificOptions_[ key ].parse( origin=origin )
else :
print( "Warning: Host-specific options cannot have sub-host-specific options" )
except Exception as e :
msg = print( "ERROR! Failed parse for submit options : {{ {fields} }}".format( fields=str( self.submit_ ) )
)
if msg is None :
# just re-raise, we don't have enough info
raise e
else :
raise sc.SubmitParseException( msg ) from e
# Updates and overrides current with values from rhs if they exist
def update( self, rhs, print=print ) :
if rhs.workingDirectory_ is not None : self.workingDirectory_ = rhs.workingDirectory_
if rhs.queue_ is not None : self.queue_ = rhs.queue_
if rhs.timelimit_ is not None : self.timelimit_ = rhs.timelimit_
if rhs.wait_ is not None : self.wait_ = rhs.wait_
# Should be set at test level
# Never do this so children cannot override parent
# self.lockSubmitType_ = rhs.lockSubmitType_
if not self.lockSubmitType_ :
if rhs.submitType_ is not None : self.submitType_ = rhs.submitType_
if rhs.debug_ is not None : self.debug_ = rhs.debug_
if rhs.account_ is not None : self.account_ = rhs.account_
# Should be set via the step
if rhs.name_ is not None : self.name_ = rhs.name_
if rhs.dependencies_ is not None : self.dependencies_ = rhs.dependencies_
if rhs.hpcArguments_.arguments_ : self.hpcArguments_.update( rhs.hpcArguments_, print=print )
if rhs.arguments_.arguments_ : self.arguments_ .update( rhs.arguments_, print=print )
for rhsHostOpt in rhs.hostSpecificOptions_ :
if rhsHostOpt in self.hostSpecificOptions_ :
self.hostSpecificOptions_[rhsHostOpt].update( rhs.hostSpecificOptions_[rhsHostOpt] )
else :
self.hostSpecificOptions_[rhsHostOpt] = copy.deepcopy( rhs.hostSpecificOptions_[rhsHostOpt] )
# This keeps things consistent but should not affect anything
sc.recursiveUpdate( self.submit_, rhs.submit_ )
# self.parse( print=print )
# Check non-optional fields
def validate( self, print=print ) :
err = None
if self.submitType_ is None :
err = "submission type"
elif self.name_ is None :
err = "submission job name"
elif self.submitType_ is not sc.SubmissionType.LOCAL :
if self.account_ is None :
err = "account"
elif self.queue_ is None :
err = "queue"
if err is not None :
err += " on non-LOCAL submission"
if err is not None :
errMsg = "Error: Invalid submission options [Missing {opt}]\n{opts}".format( opt=err, opts=self )
print( errMsg )
raise Exception( errMsg )
if self.hpcArguments_.arguments_ :
self.hpcArguments_.selectAncestrySpecificSubmitArgpacks( print=print )
def setName( self, name ) :
self.name_ = name
self.arguments_.setName( name )
self.hpcArguments_.setName( name )
for hostOpt in self.hostSpecificOptions_.values() :
hostOpt.setName( name )
def selectHostSpecificSubmitOptions( self, host=None, print=print ) :
# Have to do string matching rather than in dict
hostSpecificOptKey = next( ( hostOpt for hostOpt in self.hostSpecificOptions_ if hostOpt in host ), None )
# Quickly generate a stand-in SubmitOptions in spitting image
currentSubmitOptions = copy.deepcopy( self )
if hostSpecificOptKey is not None :
# Update with host-specifics
currentSubmitOptions.update( self.hostSpecificOptions_[ hostSpecificOptKey ], print )
return currentSubmitOptions
def format( self, print=print ) :
# Why this can't be with the enum
# https://stackoverflow.com/a/45716067
# Why this can't be a dict value of the enum
# https://github.com/python/cpython/issues/88508
submitDict = {}
if self.submitType_ == sc.SubmissionType.PBS :
submitDict = { "submit" : "qsub", "arguments" : "{0}",
"name" : "-N {0}", "dependency" : "-W depend={0}",
"queue" : "-q {0}", "account" : "-A {0}",
"output" : "-j oe -o {0}",
"time" : "-l walltime={0}",
"wait" : "-W block=true" }
elif self.submitType_ == sc.SubmissionType.SLURM :
submitDict = { "submit" : "sbtach", "arguments" : "{0}",
"name" : "-J {0}", "dependency" : "-d {0}",
"queue" : "-p {0}", "account" : "-A {0}",
"output" : "-j -o {0}",
"time" : "-t {0}",
"wait" : "-W" }
elif self.submitType_ == sc.SubmissionType.LOCAL :
submitDict = { "submit" : "", "arguments" : "",
"name" : "", "dependency" : "",
"queue" : "", "account" : "",
"output" : "-o {0}",
"time" : "",
"wait" : "" }
if self.arguments_.arguments_ :
print( "Gathering argument packs..." )
additionalArgs = self.arguments_.selectAncestrySpecificSubmitArgpacks( print=print ).format( print=print )
if self.submitType_ == sc.SubmissionType.LOCAL :
return [], additionalArgs
else :
cmd = [ submitDict[ "submit" ] ]
# Set through config
if self.hpcArguments_.arguments_ :
print( "Gathering HPC argument packs..." )
cmd.extend( submitDict[ "arguments" ].format(
self.hpcArguments_.
selectAncestrySpecificSubmitArgpacks( print=print ).
format( self.submitType_, print=print ) ).
split( " " )
)
if self.queue_ is not None :
cmd.extend( submitDict[ "queue" ].format( self.queue_ ).split( " " ) )
if self.timelimit_ is not None :
cmd.extend( submitDict[ "time" ].format( self.timelimit_ ).split( " " ) )
if self.wait_ is not None :
cmd.extend( submitDict[ "wait" ].format( self.wait_ ).split( " " ) )
# Set via test runner secrets
if self.account_ is not None :
cmd.extend( submitDict[ "account" ].format( self.account_ ).split( " " ) )
# Set via step
if self.name_ is not None :
cmd.extend( submitDict[ "name" ].format( self.name_ ).split( " " ) )
cmd.extend( submitDict[ "output" ].format( self.logfile_ ).split( " " ) )
if self.dependencies_ is not None :
cmd.extend( submitDict[ "dependency" ].format( self.dependencies_ ).split( " " ) )
if self.submitType_ == sc.SubmissionType.PBS :
# Extra bit to delineate command + args
cmd.append( "--" )
return cmd, additionalArgs
def __str__( self ) :
output = {
"working_directory" : self.workingDirectory_,
"queue" : self.queue_,
"hpc_arguments" : self.hpcArguments_,
"timelimit" : self.timelimit_,
"wait" : self.wait_,
"submitType" : self.submitType_,
"lockSubmitType" : self.lockSubmitType_,
"debug" : self.debug_,
"account" : self.account_,
"name" : self.name_,
"dependencies" : self.dependencies_,
"arguments" : self.arguments_,
"union_parse" : self.submit_ }
return str( output )
@staticmethod
def parseTimelimit( timelimit, submitType ) :
timeMatch = None
if submitType == sc.SubmissionType.PBS :
timeMatch = PBS_TIMELIMIT_REGEX.match( timelimit )
elif submitType == sc.SubmissionType.SLURM :
pass
if timeMatch is not None :
timeGroups = timeMatch.groupdict()
return timedelta(
hours =int( timeGroups["hh"] ),
minutes=int( timeGroups["mm"] ),
seconds=int( timeGroups["ss"] )
)
else :
return None
@staticmethod
def formatTimelimit( timelimit, submitType ) :
totalSeconds = timelimit.total_seconds()
if submitType == sc.SubmissionType.PBS :
return '{:02}:{:02}:{:02}'.format(
int(totalSeconds//3600),
int(totalSeconds%3600//60),
int(totalSeconds%60)
)
elif submitType == sc.SubmissionType.SLURM :
return None