This repository has been archived by the owner on Jan 30, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathRunJobEvent.py
4570 lines (3712 loc) · 215 KB
/
RunJobEvent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Class definition:
# RunJobEvent: module for receiving and processing events from the Event Service
# Instances are generated with RunJobFactory via pUtil::getRunJob()
# Implemented as a singleton class
# http://stackoverflow.com/questions/42558/python-and-the-singleton-pattern
# Import relevant python/pilot modules
from RunJob import RunJob # Parent RunJob class
from pUtil import writeToFileWithStatus
# Standard python modules
import os
import re
import sys
import random
import time
import atexit
import signal
import commands
import traceback
import uuid
from optparse import OptionParser
from json import loads, dump
from shutil import copy2
from xml.dom import minidom
# Pilot modules
import Job
import Node
import Site
import pUtil
import RunJobUtilities
import Mover as mover
from JobRecovery import JobRecovery
from FileStateClient import getFilesOfState
from ErrorDiagnosis import ErrorDiagnosis # import here to avoid issues seen at BU with missing module
from PilotErrors import PilotErrors
from StoppableThread import StoppableThread
from pUtil import tolog, isAnalysisJob, readpar, createLockFile, getDatasetDict,\
tailPilotErrorDiag, getExperiment, getEventService,\
getSiteInformation, getGUID
from FileHandling import getExtension, addToOSTransferDictionary, getCPUTimes, getReplicaDictionaryFromXML, writeFile
from EventRanges import downloadEventRanges, updateEventRange, updateEventRanges
from movers.base import BaseSiteMover
from processes import get_cpu_consumption_time
try:
from PilotYamplServer import PilotYamplServer as MessageServer
except Exception, e:
MessageServer = None
tolog("RunJobEvent caught exception: %s" % str(e))
class RunJobEvent(RunJob):
# private data members
__runjob = "RunJobEvent" # String defining the sub class
__instance = None # Boolean used by subclasses to become a Singleton
__error = PilotErrors() # PilotErrors object
__errorCode = 0 # Error code, e.g. set by stage-out method
__experiment = "ATLAS" # Current experiment (can be set with pilot option -F <experiment>)
__pilotserver = "localhost" # Default server
__pilotport = 88888 # Default port
__failureCode = None # Set by signal handler when user/batch system kills the job
__pworkdir = "/tmp" # Site work dir used by the parent
__logguid = None # GUID for the log file
__pilotlogfilename = "pilotlog.txt" # Default pilotlog filename
__stageinretry = None # Number of stage-in tries
__stageoutretry = None # Number of stage-out tries
__pilot_initdir = "" # location of where the pilot is untarred and started
__proxycheckFlag = True # True (default): perform proxy validity checks, False: no check
__globalPilotErrorDiag = "" # Global pilotErrorDiag used with signal handler (only)
__globalErrorCode = 0 # Global error code used with signal handler (only)
__inputDir = "" # Location of input files (source for mv site mover)
__outputDir = "" # Location of output files (destination for mv site mover)
__taskID = "" # TaskID (needed for OS transfer file and eventually for job metrics)
__event_loop_running = False # Is the event loop running?
__output_files = [] # A list of all files that have been successfully staged-out, used by createFileMetadata()
__guid_list = [] # Keep track of downloaded GUIDs
__lfn_list = [] # Keep track of downloaded LFNs
__eventRange_dictionary = {} # eventRange_dictionary[event_range_id] = [path, cpu, wall]
__eventRangeID_dictionary = {} # eventRangeID_dictionary[event_range_id] = True (corr. output file has been transferred)
__stageout_queue = [] # Queue for files to be staged-out; files are added as they arrive and removed after they have been staged-out
__pfc_path = "" # The path to the pool file catalog
__message_server_payload = None # Message server for the payload
__message_server_prefetcher = None # Message server for Prefetcher
__message_thread_payload = None # Thread for listening to messages from the payload
__message_thread_prefetcher = None # Thread for listening to messages from the Prefetcher
__status = True # Global job status; will be set to False if an event range or stage-out fails
__athenamp_is_ready = False # True when an AthenaMP worker is ready to process an event range
__prefetcher_is_ready = False # True when Prefetcher is ready to receive an event range
__prefetcher_has_finished = False # True when Prefetcher has updated an event range which then should be sent to AthenaMP
__asyncOutputStager_thread = None #
__asyncOutputStager_thread_sleep_time = 600 #
__analysisJob = False # True for analysis job
__jobSite = None # Site object
__siteInfo = None # site information
__node = None
__job = None # Job object
__cache = "" # Cache URL, e.g. used by LSST
__metadata_filename = "" # Full path to the metadata file
__yamplChannelNamePayload = None # Yampl channel name used by the payload (AthenaMP)
__yamplChannelNamePrefetcher = None # Yampl channel name used by the Prefetcher
__useEventIndex = True # Should Event Index be used? If not, a TAG file will be created
__tokenextractor_input_list_filenane = "" #
__sending_event_range = False # True while event range is being sent to payload
__current_event_range = "" # Event range being sent to payload
__updated_lfn = "" # Updated LFN sent from the Prefetcher
__useTokenExtractor = False # Should the TE be used?
__usePrefetcher = False # Should the Prefetcher be user
__inFilePosEvtNum = False # Use event number ranges relative to in-file position
__pandaserver = "" # Full PanDA server url incl. port and sub dirs
# ES zip
__esToZip = True
__multipleBuckets = None
__numBuckets = 1
__stageoutStorages = None
__max_wait_for_one_event = 360 # 6 hours, 360 minutes
__min_events = 1
__allowPrefetchEvents = True
# calculate cpu time, os.times() doesn't report correct value for preempted jobs
__childProcs = []
__child_cpuTime = {}
# record processed events
__nEvents = 0
__nEventsW = 0
__nEventsFailed = 0
__nEventsFailedStagedOut = 0
__nStageOutFailures = 0
__nStageOutSuccessAfterFailure = 0
__isLastStageOutFailed = False
__eventrangesToBeUpdated = []
# error fatal code
__esFatalCode = None
__isKilled = False
# external stagout time(time after athenaMP terminated)
__external_stagout_time = 0
# allow read/download remote inputs if closest RSE is in downtime
__allow_remote_inputs = False
# input files
__input_files = {}
# Getter and setter methods
def getNEvents(self):
return self.__nEvents, self.__nEventsW, self.__nEventsFailed, self.__nEventsFailedStagedOut
def getSubStatus(self):
if not self.__eventRangeID_dictionary:
return 'no_events'
if self.__esFatalCode:
return 'pilot_fatal'
if self.__nEventsFailed:
if self.__nEventsFailed < self.__nEventsW:
return 'partly_failed'
elif self.__nEventsW == 0:
return 'all_failed' # 'all_failed'
else:
return 'mostly_failed'
else:
return 'all_success'
def getStageOutDetail(self):
retStr = 'Stageout summary:'
if 'primary' in self.__stageoutStorages and self.__stageoutStorages['primary']:
retStr += "primary storage('%s' at '%s'): [success %s, failed %s]" % (self.__stageoutStorages['primary']['activity'],
self.__stageoutStorages['primary']['endpoint'],
self.__stageoutStorages['primary']['success'],
self.__stageoutStorages['primary']['failed'])
if 'failover' in self.__stageoutStorages and self.__stageoutStorages['failover']:
retStr += "failover storage('%s' at '%s'): [success %s, failed %s]" % (self.__stageoutStorages['failover']['activity'],
self.__stageoutStorages['failover']['endpoint'],
self.__stageoutStorages['failover']['success'],
self.__stageoutStorages['failover']['failed'])
return retStr
def setFinalESStatus(self, job):
if self.__nEventsW < 1 and self.__nStageOutFailures >= 3:
job.subStatus = 'pilot_failed'
job.pilotErrorDiag = "Too many stageout failures. (%s)" % self.getStageOutDetail()
job.result[0] = "failed"
job.result[2] = self.__error.ERR_ESRECOVERABLE
job.jobState = "failed"
elif not self.__eventRangeID_dictionary:
job.subStatus = 'pilot_noevents' # 'no_events'
job.pilotErrorDiag = "Pilot got no events"
job.result[0] = "failed"
job.result[2] = self.__error.ERR_NOEVENTS
job.jobState = "failed"
elif self.__eventRangeID_dictionary and self.__nEventsW < 1:
job.subStatus = 'pilot_failed' # 'no_running_events'
job.pilotErrorDiag = "Pilot didn't run any events"
job.result[0] = "failed"
job.result[2] = self.__error.ERR_UNKNOWN
job.jobState = "failed"
elif self.__esFatalCode:
job.subStatus = 'pilot_failed'
job.pilotErrorDiag = "AthenaMP fatal error happened. (%s)" % self.getStageOutDetail()
job.result[0] = "failed"
job.result[2] = self.__esFatalCode
job.jobState = "failed"
elif self.__nEventsFailed:
if self.__nEventsW == 0:
job.subStatus = 'pilot_failed' # all failed
job.pilotErrorDiag = "All events failed. (%s, other failure: %s)" % (self.getStageOutDetail(), self.__nEventsFailed - self.__nEventsFailedStagedOut)
job.result[0] = "failed"
job.result[2] = self.__error.ERR_ESRECOVERABLE
job.jobState = "failed"
elif self.__nEventsFailed < self.__nEventsW:
job.subStatus = 'partly_failed'
job.pilotErrorDiag = "Part of events failed. (%s, other failure: %s)" % (self.getStageOutDetail(), self.__nEventsFailed - self.__nEventsFailedStagedOut)
job.result[0] = "failed"
job.result[2] = self.__error.ERR_ESRECOVERABLE
job.jobState = "failed"
else:
job.subStatus = 'mostly_failed'
job.pilotErrorDiag = "Most of events failed. (%s, other failure: %s)" % (self.getStageOutDetail(), self.__nEventsFailed - self.__nEventsFailedStagedOut)
job.result[0] = "failed"
job.result[2] = self.__error.ERR_ESRECOVERABLE
job.jobState = "failed"
else:
job.subStatus = 'all_success'
job.jobState = "finished"
job.pilotErrorDiag = "AllSuccess. (%s)" % self.getStageOutDetail()
def getESFatalCode(self):
return self.__esFatalCode
def getExperiment(self):
""" Getter for __experiment """
return self.__experiment
def setExperiment(self, experiment):
""" Setter for __experiment """
self.__experiment = experiment
def getPilotServer(self):
""" Getter for __pilotserver """
return self.__pilotserver
def setPilotServer(self, pilotserver):
""" Setter for __pilotserver """
self.__pilotserver = pilotserver
def getPilotPort(self):
""" Getter for __pilotport """
return self.__pilotport
def setPilotPort(self, pilotport):
""" Setter for __pilotport """
self.__pilotport = pilotport
def getFailureCode(self):
""" Getter for __failureCode """
return self.__failureCode
def setFailureCode(self, code):
""" Setter for __failureCode """
self.__failureCode = code
def getParentWorkDir(self):
""" Getter for __pworkdir """
return self.__pworkdir
def setParentWorkDir(self, pworkdir):
""" Setter for __pworkdir """
self.__pworkdir = pworkdir
super(RunJobEvent, self).setParentWorkDir(pworkdir)
def getLogGUID(self):
""" Getter for __logguid """
return self.__logguid
def setLogGUID(self, logguid):
""" Setter for __logguid """
self.__logguid = logguid
def getPilotLogFilename(self):
""" Getter for __pilotlogfilename """
return self.__pilotlogfilename
def setPilotLogFilename(self, pilotlogfilename):
""" Setter for __pilotlogfilename """
self.__pilotlogfilename = pilotlogfilename
def getStageInRetry(self):
""" Getter for __stageinretry """
return self.__stageinretry
def setStageInRetry(self, stageinretry):
""" Setter for __stageinretry """
self.__stageinretry = stageinretry
super(RunJobEvent, self).setStageInRetry(stageinretry)
def getStageOutRetry(self):
""" Getter for __stageoutretry """
return self.__stageoutretry
def setStageOutRetry(self, stageoutretry):
""" Setter for __stageoutretry """
self.__stageoutretry = stageoutretry
def getPilotInitDir(self):
""" Getter for __pilot_initdir """
return self.__pilot_initdir
def setPilotInitDir(self, pilot_initdir):
""" Setter for __pilot_initdir """
self.__pilot_initdir = pilot_initdir
super(RunJobEvent, self).setPilotInitDir(pilot_initdir)
def getProxyCheckFlag(self):
""" Getter for __proxycheckFlag """
return self.__proxycheckFlag
def setProxyCheckFlag(self, proxycheckFlag):
""" Setter for __proxycheckFlag """
self.__proxycheckFlag = proxycheckFlag
def getGlobalPilotErrorDiag(self):
""" Getter for __globalPilotErrorDiag """
return self.__globalPilotErrorDiag
def setGlobalPilotErrorDiag(self, pilotErrorDiag):
""" Setter for __globalPilotErrorDiag """
self.__globalPilotErrorDiag = pilotErrorDiag
def getGlobalErrorCode(self):
""" Getter for __globalErrorCode """
return self.__globalErrorCode
def setGlobalErrorCode(self, code):
""" Setter for __globalErrorCode """
self.__globalErrorCode = code
def getErrorCode(self):
""" Getter for __errorCode """
return self.__errorCode
def setErrorCode(self, code):
""" Setter for __errorCode """
self.__errorCode = code
def getInputDir(self):
""" Getter for __inputDir """
return self.__inputDir
def setInputDir(self, inputDir):
""" Setter for __inputDir """
self.__inputDir = inputDir
super(RunJobEvent, self).setInputDir(inputDir)
def getOutputDir(self):
""" Getter for __outputDir """
return self.__outputDir
def setOutputDir(self, outputDir):
""" Setter for __outputDir """
self.__outputDir = outputDir
def getEventLoopRunning(self):
""" Getter for __event_loop_running """
return self.__event_loop_running
def setEventLoopRunning(self, event_loop_running):
""" Setter for __event_loop_running """
self.__event_loop_running = event_loop_running
def getOutputFiles(self):
""" Getter for __output_files """
return self.__output_files
def setOutputFiles(self, output_files):
""" Setter for __output_files """
self.__output_files = output_files
def getGUIDList(self):
""" Getter for __guid_list """
return self.__guid_list
def setGUIDList(self, guid_list):
""" Setter for __guid_list """
self.__guid_list = guid_list
def getLFNList(self):
""" Getter for __lfn_list """
return self.__lfn_list
def setLFNList(self, lfn_list):
""" Setter for __lfn_list """
self.__lfn_list = lfn_list
def getUpdatedLFN(self):
""" Getter for __updated_lfn """
return self.__updated_lfn
def setUpdatedLFN(self, updated_lfn):
""" Setter for __updated_lfn """
self.__updated_lfn = updated_lfn
def getEventRangeDictionary(self):
""" Getter for __eventRange_dictionary """
return self.__eventRange_dictionary
def setEventRangeDictionary(self, eventRange_dictionary):
""" Setter for __eventRange_dictionary """
self.__eventRange_dictionary = eventRange_dictionary
def getEventRangeIDDictionary(self):
""" Getter for __eventRangeID_dictionary """
return self.__eventRangeID_dictionary
def setEventRangeIDDictionary(self, eventRangeID_dictionary):
""" Setter for __eventRangeID_dictionary """
self.__eventRangeID_dictionary = eventRangeID_dictionary
def getStageOutQueue(self):
""" Getter for __stageout_queue """
return self.__stageout_queue
def setStageOutQueue(self, stageout_queue):
""" Setter for __stageout_queue """
self.__stageout_queue = stageout_queue
def getPoolFileCatalogPath(self):
""" Getter for __pfc_path """
return self.__pfc_path
def setPoolFileCatalogPath(self, pfc_path):
""" Setter for __pfc_path """
self.__pfc_path = pfc_path
def getMessageServerPayload(self):
""" Getter for __message_server_payload """
return self.__message_server_payload
def setMessageServerPayload(self, message_server):
""" Setter for __message_server_payload """
self.__message_server_payload = message_server
def getMessageServerPrefetcher(self):
""" Getter for __message_server_prefetcher """
return self.__message_server_prefetcher
def setMessageServerPrefetcher(self, message_server):
""" Setter for __message_server_prefetcher """
self.__message_server_prefetcher = message_server
def getMessageThreadPayload(self):
""" Getter for __message_thread_payload """
return self.__message_thread_payload
def setMessageThreadPayload(self, message_thread_payload):
""" Setter for __message_thread_payload """
self.__message_thread_payload = message_thread_payload
def getMessageThreadPrefetcher(self):
""" Getter for __message_thread_prefetcher """
return self.__message_thread_prefetcher
def setMessageThreadPrefetcher(self, message_thread_prefetcher):
""" Setter for __message_thread_prefetcher """
self.__message_thread_prefetcher = message_thread_prefetcher
def isAthenaMPReady(self):
""" Getter for __athenamp_is_ready """
return self.__athenamp_is_ready
def setAthenaMPIsReady(self, athenamp_is_ready):
""" Setter for __athenamp_is_ready """
self.__athenamp_is_ready = athenamp_is_ready
def isPrefetcherReady(self):
""" Getter for __prefetcher_is_ready """
return self.__prefetcher_is_ready
def setPrefetcherIsReady(self, prefetcher_is_ready):
""" Setter for __prefetcher_is_ready """
self.__prefetcher_is_ready = prefetcher_is_ready
def prefetcherHasFinished(self):
""" Getter for __prefetcher_has_finished """
return self.__prefetcher_has_finished
def setPrefetcherHasFinished(self, prefetcher_has_finished):
""" Setter for __prefetcher_has_finished """
self.__prefetcher_has_finished = prefetcher_has_finished
def getAsyncOutputStagerThread(self):
""" Getter for __asyncOutputStager_thread """
return self.__asyncOutputStager_thread
def setAsyncOutputStagerThread(self, asyncOutputStager_thread):
""" Setter for __asyncOutputStager_thread """
self.__asyncOutputStager_thread = asyncOutputStager_thread
def getAnalysisJob(self):
""" Getter for __analysisJob """
return self.__analysisJob
def setAnalysisJob(self, analysisJob):
""" Setter for __analysisJob """
self.__analysisJob = analysisJob
def getCache(self):
""" Getter for __cache """
return self.__cache
def setCache(self, cache):
""" Setter for __cache """
self.__cache = cache
def getMetadataFilename(self):
""" Getter for __cache """
return self.__metadata_filename
def setMetadataFilename(self, event_range_id):
""" Setter for __metadata_filename """
self.__metadata_filename = os.path.join(self.__job.workdir, "metadata-%s.xml" % (event_range_id))
def getJobSite(self):
""" Getter for __jobSite """
return self.__jobSite
def setJobSite(self, jobSite):
""" Setter for __jobSite """
self.__jobSite = jobSite
def setJobNode(self, node):
self.__node = node
def getYamplChannelNamePayload(self):
""" Getter for __yamplChannelNamePayload """
return self.__yamplChannelNamePayload
def setYamplChannelNamePayload(self, yamplChannelNamePayload):
""" Setter for __yamplChannelNamePayload """
self.__yamplChannelNamePayload = yamplChannelNamePayload
def getYamplChannelNamePrefetcher(self):
""" Getter for __yamplChannelNamePrefetcher """
return self.__yamplChannelNamePrefetcher
def setYamplChannelNamePrefetcher(self, yamplChannelNamePrefetcher):
""" Setter for __yamplChannelNamePrefetcher """
self.__yamplChannelNamePrefetcher = yamplChannelNamePrefetcher
def getStatus(self):
""" Getter for __status """
return self.__status
def setStatus(self, status):
""" Setter for __status """
self.__status = status
def isSendingEventRange(self):
""" Getter for __sending_event_range """
return self.__sending_event_range
def setSendingEventRange(self, sending_event_range):
""" Setter for __sending_event_range """
self.__sending_event_range = sending_event_range
def getCurrentEventRange(self):
""" Getter for __current_event_range """
return self.__current_event_range
def setCurrentEventRange(self, current_event_range):
""" Setter for __current_event_range """
self.__current_event_range = current_event_range
def getMaxWaitOneEvent(self):
""" Getter for __max_wait_for_one_event """
return self.__max_wait_for_one_event
def getMinEvents(self):
""" Getter for __min_events """
return self.__min_events
def shouldBeAborted(self):
""" Should the job be aborted? """
if os.path.exists(os.path.join(self.__job.workdir, "ABORT")):
return True
else:
return False
def setAbort(self):
""" Create the ABORT lock file """
createLockFile(False, self.__job.workdir, lockfile="ABORT")
def shouldBeKilled(self):
""" Does the TOBEKILLED lock file exist? """
path = os.path.join(self.__job.workdir, "TOBEKILLED")
if os.path.exists(path):
tolog("path exists: %s" % (path))
return True
else:
tolog("path does not exist: %s" % (path))
return False
def setToBeKilled(self):
""" Create the TOBEKILLED lock file"""
createLockFile(False, self.__job.workdir, lockfile="TOBEKILLED")
# Get/setters for the job object
def getJob(self):
""" Getter for __job """
return self.__job
def setJob(self, job):
""" Setter for __job """
self.__job = job
# Reset the outFilesGuids list since guids will be generated by this module
self.__job.outFilesGuids = []
def getJobWorkDir(self):
""" Getter for workdir """
return self.__job.workdir
def setJobWorkDir(self, workdir):
""" Setter for workdir """
self.__job.workdir = workdir
def getJobID(self):
""" Getter for jobId """
return self.__job.jobId
def setJobID(self, jobId):
""" Setter for jobId """
self.__job.jobId = jobId
def getJobDataDir(self):
""" Getter for datadir """
return self.__job.datadir
def setJobDataDir(self, datadir):
""" Setter for datadir """
self.__job.datadir = datadir
def getJobTrf(self):
""" Getter for trf """
return self.__job.trf
def setJobTrf(self, trf):
""" Setter for trf """
self.__job.trf = trf
def getJobResult(self):
""" Getter for result """
return self.__job.result
def setJobResult(self, result, pilot_failed=False):
""" Setter for result """
self.__job.result = result
if pilot_failed:
self.setFinalESStatus(self.__job)
def getJobState(self):
""" Getter for jobState """
return self.__job.jobState
def setJobState(self, jobState):
""" Setter for jobState """
self.__job.jobState = jobState
def getJobStates(self):
""" Getter for job states """
return self.__job.result
def setJobStates(self, states):
""" Setter for job states """
self.__job.result = states
self.__job.currentState = states[0]
def getTaskID(self):
""" Getter for TaskID """
return self.__taskID
def setTaskID(self, taskID):
""" Setter for taskID """
self.__taskID = taskID
def getJobOutFiles(self):
""" Getter for outFiles """
return self.__job.outFiles
def setJobOutFiles(self, outFiles):
""" Setter for outFiles """
self.__job.outFiles = outFiles
def getTokenExtractorInputListFilename(self):
""" Getter for __tokenextractor_input_list_filenane """
return self.__tokenextractor_input_list_filenane
def setTokenExtractorInputListFilename(self, tokenextractor_input_list_filenane):
""" Setter for __tokenextractor_input_list_filenane """
self.__tokenextractor_input_list_filenane = tokenextractor_input_list_filenane
def useEventIndex(self):
""" Should the Event Index be used? """
return self.__useEventIndex
def setUseEventIndex(self, jobPars):
""" Set the __useEventIndex variable to a boolean value """
if "--createTAGFileForES" in jobPars:
value = False
else:
value = True
self.__useEventIndex = value
def useTokenExtractor(self):
""" Should the Token Extractor be used? """
return self.__useTokenExtractor
def setUseTokenExtractor(self, setup):
""" Set the __useTokenExtractor variable to a boolean value """
# Decision is based on info in the setup string
self.__useTokenExtractor = 'TokenScatterer' in setup or 'UseTokenExtractor=True' in setup.replace(" ","").replace(" ","")
if self.__useTokenExtractor:
tolog("Token Extractor is needed")
else:
tolog("Token Extractor is not needed")
def usePrefetcher(self):
""" Should the Prefetcher be used? """
return self.__usePrefetcher
def setUsePrefetcher(self, usePrefetcher):
""" Set the __usePrefetcher variable to a boolean value """
self.__usePrefetcher = usePrefetcher
def getInFilePosEvtNum(self):
""" Should the event range numbers relative to in-file position be used? """
return self.__inFilePosEvtNum
def setInFilePosEvtNum(self, inFilePosEvtNum):
""" Set the __inFilePosEvtNum variable to a boolean value """
self.__inFilePosEvtNum = inFilePosEvtNum
def getPanDAServer(self):
""" Getter for __pandaserver """
return self.__pandaserver
def setPanDAServer(self, pandaserver):
""" Setter for __pandaserver """
self.__pandaserver = pandaserver
def getAllowPrefetchEvents(self):
return self.__allowPrefetchEvents
def setAllowPrefetchEvents(self, allowPrefetchEvents):
self.__allowPrefetchEvents = allowPrefetchEvents
def init_guid_list(self):
""" Init guid and lfn list for staged in files"""
for guid in self.__job.inFilesGuids:
self.__guid_list.append(guid)
for lfn in self.__job.inFiles:
self.__lfn_list.append(lfn)
def init_input_files(self, job):
""" Init input files list"""
self.__input_files = job.get_stagedIn_files()
def add_input_file(self, scope, name, pfn):
""" Add a file to input files """
self.__input_files['%s:%s' % (scope, name)] = pfn
def get_input_files(self):
return self.__input_files
def addPFNsToEventRanges(self, eventRanges):
""" Add the pfn's to the event ranges """
# If an event range is file related, we need to add the pfn to the event range
if self.getInFilePosEvtNum():
for eventRange in eventRanges:
key = '%s:%s' % (eventRange['scope'], eventRange['LFN'])
if key in self.__input_files:
eventRange['PFN'] = self.__input_files[key]
else:
eventRange['PFN'] = eventRange['LFN']
return eventRanges
def addPFNToEventRange(self, eventRange):
""" Add the pfn to an event range """
# If an event range is file related, we need to add the pfn to the event range
key = '%s:%s' % (eventRange['scope'], eventRange['LFN'])
if key in self.__input_files:
eventRange['PFN'] = self.__input_files[key]
else:
eventRange['PFN'] = eventRange['LFN']
return eventRange
def setAsyncOutputStagerSleepTime(self, sleep_time=600):
self.__asyncOutputStager_thread_sleep_time = sleep_time
# Required methods
def __init__(self):
""" Default initialization """
# e.g. self.__errorLabel = errorLabel
uuidgen = commands.getoutput('uuidgen')
self.__yamplChannelNamePayload = "EventService_EventRanges-%s" % (uuidgen)
self.__yamplChannelNamePrefetcher = "EventService_Prefetcher-%s" % (uuidgen)
# is this necessary? doesn't exist in RunJob
def __new__(cls, *args, **kwargs):
""" Override the __new__ method to make the class a singleton """
if not cls.__instance:
cls.__instance = super(RunJobEvent, cls).__new__(cls, *args, **kwargs)
return cls.__instance
def getRunJob(self):
""" Return a string with the experiment name """
return self.__runjob
def getRunJobFileName(self):
""" Return the filename of the module """
return super(RunJobEvent, self).getRunJobFileName()
def allowLoopingJobKiller(self):
""" Should the pilot search for looping jobs? """
# The pilot has the ability to monitor the payload work directory. If there are no updated files within a certain
# time limit, the pilot will consider the as stuck (looping) and will kill it. The looping time limits are set
# in environment.py (see e.g. loopingLimitDefaultProd)
return True
def argumentParser(self):
""" Argument parser for the RunJob module """
# Return variables
appdir = None
queuename = None
sitename = None
workdir = None
parser = OptionParser()
parser.add_option("-a", "--appdir", dest="appdir",
help="The local path to the applications directory", metavar="APPDIR")
parser.add_option("-b", "--queuename", dest="queuename",
help="Queue name", metavar="QUEUENAME")
parser.add_option("-d", "--workdir", dest="workdir",
help="The local path to the working directory of the payload", metavar="WORKDIR")
parser.add_option("-g", "--inputdir", dest="inputDir",
help="Location of input files to be transferred by the mv site mover", metavar="INPUTDIR")
parser.add_option("-i", "--logfileguid", dest="logguid",
help="Log file guid", metavar="GUID")
parser.add_option("-k", "--pilotlogfilename", dest="pilotlogfilename",
help="The name of the pilot log file", metavar="PILOTLOGFILENAME")
parser.add_option("-l", "--pilotinitdir", dest="pilot_initdir",
help="The local path to the directory where the pilot was launched", metavar="PILOT_INITDIR")
parser.add_option("-m", "--outputdir", dest="outputDir",
help="Destination of output files to be transferred by the mv site mover", metavar="OUTPUTDIR")
parser.add_option("-o", "--parentworkdir", dest="pworkdir",
help="Path to the work directory of the parent process (i.e. the pilot)", metavar="PWORKDIR")
parser.add_option("-s", "--sitename", dest="sitename",
help="The name of the site where the job is to be run", metavar="SITENAME")
parser.add_option("-w", "--pilotserver", dest="pilotserver",
help="The URL of the pilot TCP server (localhost) WILL BE RETIRED", metavar="PILOTSERVER")