From 0b70bb27fc822c12ccefd56fcd0867cdb38fc130 Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Mon, 18 Nov 2024 18:28:45 +0100 Subject: [PATCH] avoid false positive, skip HC, limit #report per task. For #8773 --- src/python/TaskWorker/Actions/RetryJob.py | 168 ++++++++++++++-------- 1 file changed, 106 insertions(+), 62 deletions(-) diff --git a/src/python/TaskWorker/Actions/RetryJob.py b/src/python/TaskWorker/Actions/RetryJob.py index 2b5a98a263..517e7af10e 100644 --- a/src/python/TaskWorker/Actions/RetryJob.py +++ b/src/python/TaskWorker/Actions/RetryJob.py @@ -7,7 +7,7 @@ import socket from collections import namedtuple -from ServerUtilities import executeCommand +from ServerUtilities import executeCommand, getLock from ServerUtilities import MAX_DISK_SPACE, MAX_WALLTIME, MAX_MEMORY if 'useHtcV2' in os.environ: @@ -17,6 +17,12 @@ JOB_RETURN_CODES = namedtuple('JobReturnCodes', 'OK RECOVERABLE_ERROR FATAL_ERROR')(0, 1, 2) +# strings in fatal root exception text which indicate code problem, not corrupted file +# a small "knowledge data base" +NOT_FILE_RELATED_FATAL_ROOT_ERRORS = [ + "already deleted (list name = TList)", +] + # Without this environment variable set, HTCondor takes a write lock per logfile entry os.environ['_condor_ENABLE_USERLOG_LOCKING'] = 'false' @@ -413,7 +419,9 @@ def check_corrupted_file(self, exitCode): corruptedFile = False suspiciousFile = False + fatalLine = False inputFileName = 'NotAvailable' + errorLines = [] RSE = self.site RSE = RSE if not RSE.startswith('T1') else f"{RSE}_Disk" fname = os.path.realpath("WEB_DIR/job_out.%s.%d.txt" % (self.job_id, self.crab_retry)) @@ -424,73 +432,109 @@ def check_corrupted_file(self, exitCode): # remember last opened file, in case of 8021 that's the one that matters if line.startswith("== CMSSW:") and ' Successfully opened file' in line: inputFileName = f"/store/{line.split('/store/')[1]}" # strip protocol part - if line.startswith("== CMSSW:") and "Fatal Root Error:" in line: - corruptedFile = True - self.logger.info("Corrupted input file found") - self.logger.debug(line) - errorLines = [line] - # file name is in next line - continue - if corruptedFile: - errorLines.append(line) - if '/store/' in line and '.root' in line: - # this may be better done in the script which processes the BadInputFiles reports - # if '/store/user' in line or '/store/group' in line and not 'rucio' in line: - # # no point in reporting files unknown to Rucio - # corruptedFile = False - # break - - # extract the '/store/...root' part of this line - fragment1 = line.split('/store/')[1] - fragment2 = fragment1.split('.root')[0] - inputFileName = f"/store/{fragment2}.root" - self.logger.info(f"RSE: {RSE} - ec: {exitCode} - file: {inputFileName}") - - else: - corruptedFile = False - suspiciousFile = True - errorLines.append('NOT CLEARLY CORRUPTED, OTHER ROOT ERROR ?') - errorLines.append('DID Identification may not be correct') - self.logger.info("RootFatalError does not contain file info") + # extract the Exception message + if fatalLine: + fatalExceptionLines.append(line) + if line.startswith("== CMSSW:") and " ----- Begin Fatal Exception" in line: + fatalExceptionLines = [] + fatalLine = True + if line.startswith("== CMSSW:") and " ----- End Fatal Exception" in line: break - if corruptedFile or suspiciousFile: - # do not report HammerCloud - username = self.reqname.split(':')[1].split('_')[0] - if username == 'sciaba': - return corruptedFile - # add pointers to logs - schedHostname = socket.gethostname().split('.')[0] - schedId = schedHostname.removeprefix('vocms') # vomcs059 -> 059, vocms0106 -> 0106 etc, - webDirUrl = f"https://cmsweb.cern.ch:8443/scheddmon/{schedId}/{username}/{self.reqname}" - stdoutUrl = f"{webDirUrl}/job_out.{self.job_id}.{self.crab_retry}.txt" - postJobUrl = f"{webDirUrl}/postjob.{self.job_id}.{self.crab_retry}.txt" - errorLines.append(f"stdout: {stdoutUrl}") - errorLines.append(f"postjob: {postJobUrl}") - # note things down - reportFileName = f'Badfile.job.{self.job_id}.{self.crab_retry}.json' - corruptionMessage = {'DID': f'cms:{inputFileName}', 'RSE': RSE, - 'exitCode': exitCode, 'message': errorLines} - with open(reportFileName, 'w', encoding='utf-8') as fp: - json.dump(corruptionMessage, fp) - self.logger.info('corruption message prepared, gfal-copy to EOS') - proxy = os.getenv('X509_USER_PROXY') - self.logger.info(f"X509_USER_PROXY = {proxy}") - reportLocation = 'davs://eoscms.cern.ch:443/eos/cms/store/temp/user/BadInputFiles/' - # there can be so many that we better split by task + # parse fatal exception text + for line in fatalExceptionLines: + for falsePositive in NOT_FILE_RELATED_FATAL_ROOT_ERRORS: + if falsePositive in line: + return False + for line in fatalExceptionLines: + if "Fatal Root Error:" in fatalExceptionLines: + corruptedFile = True + self.logger.info("Corrupted input file found") + self.logger.debug(line) + errorLines = [line] + # file name is in next line + continue if corruptedFile: - reportLocation += f'corrupted/new/{self.reqname}/' - if suspiciousFile: - reportLocation += f'suspicious/new/{self.reqname}/' - - destination = reportLocation + reportFileName - cmd = f'gfal-copy -vp -t 60 {reportFileName} {destination}' - out, err, ec = executeCommand(cmd) - if ec: - self.logger.error(f'gfal-copy failed with out: {out} err: {err}') + errorLines.append(line) + if '/store/' in line and '.root' in line: + # this may be better done in the script which processes the BadInputFiles reports + # if '/store/user' in line or '/store/group' in line and not 'rucio' in line: + # # no point in reporting files unknown to Rucio + # corruptedFile = False + # break + + # extract the '/store/...root' part of this line + fragment1 = line.split('/store/')[1] + fragment2 = fragment1.split('.root')[0] + inputFileName = f"/store/{fragment2}.root" + self.logger.info(f"RSE: {RSE} - ec: {exitCode} - file: {inputFileName}") + else: + corruptedFile = False + suspiciousFile = True + errorLines.append('NOT CLEARLY CORRUPTED, OTHER ROOT ERROR ?') + errorLines.append('DID Identification may not be correct') + self.logger.info("RootFatalError does not contain file info") + if corruptedFile or suspiciousFile: + corruptionMessage = {'DID': f'cms:{inputFileName}', 'RSE': RSE, + 'exitCode': exitCode, 'message': errorLines} + self.reportBadInputFile(corruptedFile, suspiciousFile, corruptionMessage) return corruptedFile # = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = + def reportBadInputFile(self, corruptedFile, suspiciousFile, corruptionMessage): + """ + report bad file via a file on EOS + """ + taskName = self.reqname + username = taskName.split(':')[1].split('_')[0] + jobId = f"{self.job_id}.{self.crab_retry}" + # do not report HammerCloud + if username == 'sciaba': + return + # count reports for this task, too many indicates software error, not bad file(s) + fname = 'BadInputFileCount.json' + with getLock(fname): # use lock to avoid races with concurrent PostJobs + if os.path.exists(fname): + with open(fname, 'r', encoding='utf-8') as fp: + oldCount = int(json.load(fp)) + else: + oldCount = 0 + count = str(oldCount + 1) + with open(fname, 'w', encoding='utf-8') as fp: + json.dump(count, fp) + if count > 30: + return + + # add pointers to logs + schedHostname = socket.gethostname().split('.')[0] + schedId = schedHostname.removeprefix('vocms') # vomcs059 -> 059, vocms0106 -> 0106 etc, + webDirUrl = f"https://cmsweb.cern.ch:8443/scheddmon/{schedId}/{username}/{taskName}" + stdoutUrl = f"{webDirUrl}/job_out.{jobId}.txt" + postJobUrl = f"{webDirUrl}/postjob.{jobId}.txt" + corruptionMessage['errorLines'].append(f"stdout: {stdoutUrl}") + corruptionMessage['errorLines'].append(f"postjob: {postJobUrl}") + # note things down + reportFileName = f'Badfile.job.{jobId}.json' + with open(reportFileName, 'w', encoding='utf-8') as fp: + json.dump(corruptionMessage, fp) + self.logger.info('corruption message prepared, gfal-copy to EOS') + proxy = os.getenv('X509_USER_PROXY') + self.logger.info(f"X509_USER_PROXY = {proxy}") + reportLocation = 'davs://eoscms.cern.ch:443/eos/cms/store/temp/user/BadInputFiles/' + # there can be so many that we better split by task + if corruptedFile: + reportLocation += f'corrupted/new/{taskName}/' + if suspiciousFile: + reportLocation += f'suspicious/new/{taskName}/' + + destination = reportLocation + reportFileName + cmd = f'gfal-copy -vp -t 60 {reportFileName} {destination}' + out, err, ec = executeCommand(cmd) + if ec: + self.logger.error(f'gfal-copy failed with out: {out} err: {err}') + + # = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = + def check_empty_report(self): """ Need a doc string here.