From b28897ebc9b5ed49280b5882ca029cea2ead91f8 Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Mon, 23 Aug 2021 19:26:50 +0200 Subject: [PATCH] =?UTF-8?q?Refactor=20CMSRunAnalysis.py=20to=20reduce=20WM?= =?UTF-8?q?Core=20stuff,=20simplify,=20uniform=20=E2=80=A6=20(#6726)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Refactor CMSRunAnalysis.py to reduce WMCore stuff, simplify, uniform the handling of CMSSW and ScriptExe, make it ready for dropping use of WMCore in TweakPSet * pylint * more pylint --- scripts/CMSRunAnalysis.py | 282 +++++++++++++------------------------- 1 file changed, 93 insertions(+), 189 deletions(-) diff --git a/scripts/CMSRunAnalysis.py b/scripts/CMSRunAnalysis.py index 89876fda79..da29f75554 100644 --- a/scripts/CMSRunAnalysis.py +++ b/scripts/CMSRunAnalysis.py @@ -41,7 +41,7 @@ def __init__(self, logger=None, level=None): def __enter__(self): self.previousLogLevel = self.logger.getEffectiveLevel() self.logger.setLevel(self.newLogLevel) - def __exit__(self,a,b,c): + def __exit__(self, a, b, c): self.logger.setLevel(self.previousLogLevel) @@ -295,11 +295,11 @@ def stopDashboardMonitoring(myad): fjr = {} try: fjr = json.load(open("jobReport.json")) - except: + except Exception: print("WARNING: Unable to parse jobReport.json; Dashboard reporting will not be useful.\n", traceback.format_exc()) try: addReportInfo(params, fjr) - except: + except Exception: if 'ExeExitCode' not in params: params['ExeExitCode'] = 50115 if 'JobExitCode' not in params: @@ -308,7 +308,7 @@ def stopDashboardMonitoring(myad): print("Dashboard end parameters: %s" % str(params)) try: reportPopularity(params['MonitorID'], params['MonitorJobID'], myad, fjr) - except: + except Exception: print("ERROR: Failed to report popularity information to Dashboard.\n", traceback.format_exc()) DashboardAPI.apmonSend(params['MonitorID'], params['MonitorJobID'], params) DashboardAPI.apmonFree() @@ -320,7 +320,7 @@ def logCMSSW(): limit each line to maxLineLen. These logs will be returned back to schedd and we don`t want to take a lot of space on it. Full log files will be returned back to user SE, if he set saveLogs flag in crab config.""" - global logCMSSWSaved + global logCMSSWSaved # pylint: disable=global-statement if logCMSSWSaved: return if not os.path.exists("cmsRun-stdout.log"): @@ -353,14 +353,14 @@ def logCMSSW(): if nl < keepAtStart: printCMSSWLine("== CMSSW: %s " % line, maxLineLen) if nl == keepAtStart + 1: - print("== CMSSW: ") - print("== CMSSW: [...BIG SNIP...]") - print("== CMSSW: ") + print(prefix) + print(prefix + " [...BIG SNIP...]") + print(prefix) if numLines - nl <= keepAtEnd: - printCMSSWLine("== CMSSW: %s " % line, maxLineLen) + printCMSSWLine("%s" % prefix+line, maxLineLen) else: for line in open(outfile): - printCMSSWLine("== CMSSW: %s " % line, maxLineLen) + printCMSSWLine("%s" % prefix+line, maxLineLen) print("======== CMSSW OUTPUT FINSHING ========") logCMSSWSaved = True @@ -381,7 +381,7 @@ def handleException(exitAcronym, exitCode, exitMsg): report = json.load(open("jobReport.json")) else: print("WARNING: WMCore did not produce a jobReport.json; FJR will not be useful.") - except: + except Exception: print("WARNING: Unable to parse WMCore's jobReport.json; FJR will not be useful.\n", traceback.format_exc()) if report.get('steps', {}).get('cmsRun', {}).get('errors'): @@ -415,7 +415,7 @@ def handleException(exitAcronym, exitCode, exitMsg): sLCfg = SiteLocalConfig.loadSiteLocalConfig() report['executed_site'] = sLCfg.siteName print("== Execution site for failed job from site-local-config.xml: %s" % sLCfg.siteName) - except: + except Exception: print("ERROR: Failed to record execution site name in the FJR from the site-local-config.xml") print(traceback.format_exc()) @@ -500,7 +500,7 @@ def parseArgs(): type='string', default=None) - (opts, args) = parser.parse_args(sys.argv[1:]) + (opts, _) = parser.parse_args(sys.argv[1:]) try: print("==== Parameters Dump at %s ===" % time.asctime(time.gmtime())) @@ -526,8 +526,8 @@ def parseArgs(): print("scriptArgs: ", opts.scriptArgs) print("maxRuntime: ", opts.maxRuntime) print("===================") - except: - name, value, traceBack = sys.exc_info() + except Exception: + name, value, _ = sys.exc_info() print('ERROR: missing parameters: %s - %s' % (name, value)) handleException("FAILED", EC_MissingArg, 'CMSRunAnalysisERROR: missing parameters: %s - %s' % (name, value)) mintime() @@ -536,41 +536,12 @@ def parseArgs(): return opts -#TODO: MM I do not believe this is necessary at all def prepSandbox(opts): - print("==== Sandbox preparation STARTING at %s ====" % time.asctime(time.gmtime())) - os.environ['WMAGENTJOBDIR'] = os.getcwd() - if opts.archiveJob and not "CRAB3_RUNTIME_DEBUG" in os.environ: - if os.path.exists(opts.archiveJob): - print("Sandbox %s already exists, skipping" % opts.archiveJob) - elif opts.sourceURL == 'LOCAL' and not os.path.exists(opts.archiveJob): - print("ERROR: Requested for condor to transfer the tarball, but it didn't show up") - handleException("FAILED", EC_WGET, 'CMSRunAnalysisERROR: could not get jobO files from panda server') - sys.exit(EC_WGET) - else: - print("--- wget for jobO ---") - output = commands.getoutput('wget -h') - wgetCommand = 'wget' - for line in output.split('\n'): - if re.search('--no-check-certificate', line) != None: - wgetCommand = 'wget --no-check-certificate' - break - com = '%s %s/cache/%s' % (wgetCommand, opts.sourceURL, opts.archiveJob) - nTry = 3 - for iTry in range(nTry): - print('Try : %s' % iTry) - status, output = commands.getstatusoutput(com) - print(output) - if status == 0: - break - if iTry+1 == nTry: - print("ERROR : cound not get jobO files from panda server") - handleException("FAILED", EC_WGET, 'CMSRunAnalysisERROR: could not get jobO files from panda server') - sys.exit(EC_WGET) - time.sleep(30) + print("==== Sandbox untarring STARTING at %s ====" % time.asctime(time.gmtime())) + #The user sandbox.tar.gz has to be unpacked no matter what (even in DEBUG mode) print(commands.getoutput('tar xfm %s' % opts.archiveJob)) - print("==== Sandbox preparation FINISHED at %s ====" % time.asctime(time.gmtime())) + print("==== Sandbox untarring FINISHED at %s ====" % time.asctime(time.gmtime())) #move the pset in the right place print("==== WMCore filesystem preparation STARTING at %s ====" % time.asctime(time.gmtime())) @@ -587,10 +558,15 @@ def prepSandbox(opts): os.rename(myfile, destDir + '/' + myfile) print("==== WMCore filesystem preparation FINISHED at %s ====" % time.asctime(time.gmtime())) - def extractUserSandbox(archiveJob, cmsswVersion): + # the user sandbox contains the user scram directory files and thus + # is unpacked in the local CMSSW_X_Y_X dir, but the cmsRun command + # will be executed from the job working directory, so we move "up" + # the PSet which is also in the user sandbox os.chdir(cmsswVersion) print(commands.getoutput('tar xfm %s ' % os.path.join('..', archiveJob))) + os.rename('PSet.py','../PSet.py') + os.rename('PSet.pkl','../PSet.pkl') os.chdir('..') def getProv(filename, scram): @@ -603,143 +579,75 @@ def getProv(filename, scram): print(msg) mintime() sys.exit(EC_CMSRunWrapper) - #with open("edmProvDumpOutput.log", "r") as fd: - # output = fd.read() output = scram.getStdout() return output - -def executeScriptExe(opts, scram): - #make scriptexe executable - st = os.stat(opts.scriptExe) - os.chmod(opts.scriptExe, st.st_mode | stat.S_IEXEC) - - command_ = ('python %s/TweakPSet.py --location=%s '+ - '--inputFile=\'%s\' '+ - '--runAndLumis=\'%s\' '+ - '--firstEvent=%s '+ - '--lastEvent=%s '+ - '--firstLumi=%s '+ - '--firstRun=%s '+ - '--seeding=%s '+ - '--lheInputFiles=%s '+ - '--oneEventMode=%s ' + - '--eventsPerLumi=%s ' + - '--maxRuntime=%s') %\ - (os.getcwd(), os.getcwd(), - opts.inputFile, - opts.runAndLumis, - opts.firstEvent, - opts.lastEvent, - opts.firstLumi, - opts.firstRun, - opts.seeding, - opts.lheInputFiles, - opts.oneEventMode, - opts.eventsPerLumi, - opts.maxRuntime) - - print ('Executing %s' % command_) +def tweakPSet(opts, scram): + + # this command is executed with cwd set to the job running directory + commandTemplate = 'python %s/TweakPSet.py --location=%s ' + '--inputFile=\'%s\' ' + '--runAndLumis=\'%s\' ' +\ + '--firstEvent=%s ' + '--lastEvent=%s ' + '--firstLumi=%s ' + '--firstRun=%s ' +\ + '--seeding=%s ' + '--lheInputFiles=%s ' + '--oneEventMode=%s ' +\ + '--eventsPerLumi=%s ' + '--maxRuntime=%s' + command = commandTemplate %\ + (os.getcwd(), os.getcwd(), + opts.inputFile, + opts.runAndLumis, + opts.firstEvent, + opts.lastEvent, + opts.firstLumi, + opts.firstRun, + opts.seeding, + opts.lheInputFiles, + opts.oneEventMode, + opts.eventsPerLumi, + opts.maxRuntime) + + print('Executing %s' % command) with tempSetLogLevel(logger=logging.getLogger(), level=logging.ERROR): - ret = scram(command_, runtimeDir = os.getcwd()) + ret = scram(command, runtimeDir = os.getcwd()) if ret > 0: - msg = 'Error executing TweakPSet.\n\tScram Diagnostic %s' % scram.diagnostic() + msg = 'Error executing TweakPSet.\n\tScram Diagnostic %s' % scram.diagnostic() handleException("FAILED", EC_CMSRunWrapper, msg) mintime() sys.exit(EC_CMSRunWrapper) +def executeScriptExe(opts, scram): + #make scriptexe executable + st = os.stat(opts.scriptExe) + os.chmod(opts.scriptExe, st.st_mode | stat.S_IEXEC) + command_ = os.getcwd() + "/%s %s %s" % (opts.scriptExe, opts.jobNumber, " ".join(json.loads(opts.scriptArgs))) print ('Exdcuting user script: %s' % command_) with tempSetLogLevel(logger=logging.getLogger(), level=logging.DEBUG): - ret = scram(command_, runtimeDir = os.getcwd(), cleanEnv = False) + ret = scram(command_, runtimeDir=os.getcwd(), cleanEnv=False) if ret > 0: - with open('cmsRun-stdout.log','w') as fh: + with open('cmsRun-stdout.log', 'w') as fh: fh.write(scram.diagnostic()) msg = 'Error executing scriptExe.\n\tSee stdout log' handleException("FAILED", EC_CMSRunWrapper, msg) mintime() sys.exit(EC_CMSRunWrapper) - with open('cmsRun-stdout.log','w') as fh: + with open('cmsRun-stdout.log', 'w') as fh: fh.write(scram.getStdout()) return ret def executeCMSSWStack(opts, scram): - def getOutputModules(): - pythonScript = "from PSetTweaks.WMTweak import makeTweak;"+\ - "config = __import__(\"WMTaskSpace.cmsRun.PSet\", globals(), locals(), [\"process\"], -1);"+\ - "tweakJson = makeTweak(config.process).jsondictionary();"+\ - "print tweakJson[\"process\"][\"outputModules_\"]" - with tempSetLogLevel(logger=logging.getLogger(), level=logging.ERROR): - ret = scram("python -c '%s'" % pythonScript, runtimeDir=os.getcwd()) - if ret > 0: - msg = 'Error getting output modules from the pset.\n\tScram Diagnostic %s' % scram.diagnostic() - handleException("FAILED", EC_CMSRunWrapper, msg) - mintime() - sys.exit(EC_CMSRunWrapper) - output = literal_eval(scram.getStdout()) - return output - - cmssw = CMSSW() - cmssw.stepName = "cmsRun" - cmssw.step = WMStep(cmssw.stepName) - CMSSWTemplate().install(cmssw.step) - cmssw.task = makeWMTask(cmssw.stepName) - cmssw.workload = newWorkload(cmssw.stepName) - cmssw.step.application.setup.softwareEnvironment = '' - cmssw.step.application.setup.scramArch = opts.scramArch - cmssw.step.application.setup.cmsswVersion = opts.cmsswVersion - cmssw.step.application.configuration.section_("arguments") - cmssw.step.application.configuration.arguments.globalTag = "" - for output in getOutputModules(): - cmssw.step.output.modules.section_(output) - getattr(cmssw.step.output.modules, output).primaryDataset = '' - getattr(cmssw.step.output.modules, output).processedDataset = '' - getattr(cmssw.step.output.modules, output).dataTier = '' - #cmssw.step.application.command.arguments = '' #TODO - cmssw.step.user.inputSandboxes = [opts.archiveJob] - cmssw.step.user.userFiles = opts.userFiles or '' - #Setting the following job attribute is required because in the CMSSW executor there is a call to analysisFileLFN to set up some attributes for TFiles. - #Same for lfnbase. We actually don't use these information so I am setting these to dummy values. Next: fix and use this lfn or drop WMCore runtime.. - cmssw.job = {'counter' : 0, 'workflow' : 'unused'} - cmssw.step.user.lfnBase = '/store/temp/user/' - cmssw.step.section_("builder") - cmssw.step.builder.workingDir = os.getcwd() - cmssw.step.runtime.invokeCommand = 'python' - cmssw.step.runtime.scramPreDir = os.getcwd() - cmssw.step.runtime.preScripts = [] - - - cmssw.step.runtime.scramPreScripts = [('%s/TweakPSet.py --location=%s '+ - '--inputFile=\'%s\' '+ - '--runAndLumis=\'%s\' '+ - '--firstEvent=%s '+ - '--lastEvent=%s '+ - '--firstLumi=%s '+ - '--firstRun=%s '+ - '--seeding=%s '+ - '--lheInputFiles=%s '+ - '--oneEventMode=%s ' + - '--eventsPerLumi=%s ' + - '--maxRuntime=%s') % - (os.getcwd(), os.getcwd(), - opts.inputFile, - opts.runAndLumis, - opts.firstEvent, - opts.lastEvent, - opts.firstLumi, - opts.firstRun, - opts.seeding, - opts.lheInputFiles, - opts.oneEventMode, - opts.eventsPerLumi, - opts.maxRuntime)] - cmssw.step.section_("execution") #exitStatus of cmsRun is set here - cmssw.report = Report("cmsRun") #report is loaded and put here - cmssw.execute() - return cmssw - + command_ = 'pwd; cmsRun -p PSet.py -j FrameworkJobReport.xml' + with tempSetLogLevel(logger=logging.getLogger(), level=logging.DEBUG): + ret = scram(command_, runtimeDir=os.getcwd(), cleanEnv=False) + if ret > 0: + with open('cmsRun-stdout.log', 'w') as fh: + fh.write(scram.diagnostic()) + msg = 'Error executing scriptExe.\n\tSee stdout log' + handleException("FAILED", EC_CMSRunWrapper, msg) + mintime() + sys.exit(EC_CMSRunWrapper) + with open('cmsRun-stdout.log', 'w') as fh: + fh.write(scram.getStdout()) + return ret def AddChecksums(report): if 'steps' not in report: @@ -757,13 +665,13 @@ def AddChecksums(report): if 'fileName' in fileInfo: fileInfo['pfn'] = fileInfo['fileName'] else: - continue + continue fileInfo['size'] = os.stat(fileInfo['pfn']).st_size print("==== Checksum computation STARTING at %s ====" % time.asctime(time.gmtime())) (adler32, cksum) = calculateChecksums(fileInfo['pfn']) print("==== Checksum FINISHED at %s ====" % time.asctime(time.gmtime())) print("== FileName: %s - FileAdler32: %s - FileSize: %.3f MBytes" % \ - (fileInfo['pfn'], adler32, float(fileInfo['size'])/(1024*1024)) ) + (fileInfo['pfn'], adler32, float(fileInfo['size'])/(1024*1024))) fileInfo['checksums'] = {'adler32': adler32, 'cksum': cksum} @@ -864,7 +772,7 @@ def StripReport(report): ad = {} try: ad = parseAd() - except: + except Exception: print("==== FAILURE WHEN PARSING HTCONDOR CLASSAD AT %s ====" % time.asctime(time.gmtime())) print(traceback.format_exc()) ad = {} @@ -883,13 +791,13 @@ def StripReport(report): from WMCore.FwkJobReport.Report import FwkJobReportException from WMCore.WMSpec.Steps.WMExecutionFailure import WMExecutionFailure from Utils.FileTools import calculateChecksums - from WMCore.WMSpec.Steps.Executors.CMSSW import CMSSW - from WMCore.WMSpec.WMStep import WMStep - from WMCore.WMSpec.WMTask import makeWMTask - from WMCore.WMSpec.WMWorkload import newWorkload - from WMCore.WMSpec.Steps.Templates.CMSSW import CMSSW as CMSSWTemplate + #from WMCore.WMSpec.Steps.Executors.CMSSW import CMSSW + #from WMCore.WMSpec.WMStep import WMStep + #from WMCore.WMSpec.WMTask import makeWMTask + #from WMCore.WMSpec.WMWorkload import newWorkload + #from WMCore.WMSpec.Steps.Templates.CMSSW import CMSSW as CMSSWTemplate from WMCore.WMRuntime.Tools.Scram import Scram - except: + except Exception: # We may not even be able to create a FJR at this point. Record # error and exit. if ad and not "CRAB3_RUNTIME_DEBUG" in os.environ: @@ -909,21 +817,13 @@ def StripReport(report): try: setupLogging('.') - # following commented lines are not needed anymore with new scram() in WMCore 1.2+ - # Also add stdout to the logging - #logHandler = logging.StreamHandler(sys.stdout) - #logFormatter = logging.Formatter("%(asctime)s:%(levelname)s:%(module)s:%(message)s") - #logging.Formatter.converter = time.gmtime - #logHandler.setFormatter(logFormatter) - #logging.getLogger().addHandler(logHandler) - if ad and not "CRAB3_RUNTIME_DEBUG" in os.environ: startDashboardMonitoring(ad) print("==== CMSSW Stack Execution STARTING at %s ====" % time.asctime(time.gmtime())) scr = Scram( - version = options.cmsswVersion, - directory = os.getcwd(), - architecture = options.scramArch, + version=options.cmsswVersion, + directory=os.getcwd(), + architecture=options.scramArch, ) print("==== SCRAM Obj CREATED at %s ====" % time.asctime(time.gmtime())) @@ -933,27 +833,31 @@ def StripReport(report): mintime() sys.exit(EC_CMSMissingSoftware) + print("==== Extract user sandbox in CMSSW directory ====") extractUserSandbox(options.archiveJob, options.cmsswVersion) + # tweaking of the PSet is needed both for CMSSWStack and ScriptEXE + print("==== Tweak PSet ====") + tweakPSet(options, scr) + try: jobExitCode = None - if options.scriptExe=='None': + if options.scriptExe == 'None': print("==== CMSSW JOB Execution started at %s ====" % time.asctime(time.gmtime())) - cmsswSt = executeCMSSWStack(options, scr) - jobExitCode = cmsswSt.step.execution.exitStatus + jobExitCode = executeCMSSWStack(options, scr) else: print("==== ScriptEXE Execution started at %s ====" % time.asctime(time.gmtime())) jobExitCode = executeScriptExe(options, scr) except: - print("==== CMSSW Stack Execution FAILED at %s ====" % time.asctime(time.gmtime())) + print("==== Execution FAILED at %s ====" % time.asctime(time.gmtime())) logCMSSW() raise - if options.scriptExe=='None': + if options.scriptExe == 'None': print("==== CMSSW JOB Execution completed at %s ====" % time.asctime(time.gmtime())) else: print("==== ScriptEXE Execution completed at %s ====" % time.asctime(time.gmtime())) print("Job exit code: %s" % str(jobExitCode)) - print("==== CMSSW Stack Execution FINISHED at %s ====" % time.asctime(time.gmtime())) + print("==== Execution FINISHED at %s ====" % time.asctime(time.gmtime())) logCMSSW() except WMExecutionFailure as WMex: print("ERROR: Caught WMExecutionFailure - code = %s - name = %s - detail = %s" % (WMex.code, WMex.name, WMex.detail)) @@ -966,7 +870,7 @@ def StripReport(report): rep.parse('FrameworkJobReport.xml', "cmsRun") try: jobExitCode = rep.getExitCode() - except: + except Exception: jobExitCode = WMex.code rep = rep.__to_json__(None) #save the virgin WMArchive report @@ -976,7 +880,7 @@ def StripReport(report): rep['jobExitCode'] = jobExitCode with open('jobReport.json', 'w') as of: json.dump(rep, of) - except: + except Exception: print("WARNING: Failure when trying to parse FJR XML after job failure.") handleException("FAILED", WMex.code, exmsg)