Skip to content

Commit

Permalink
avoid false positive, skip HC, limit #report per task. For dmwm#8773
Browse files Browse the repository at this point in the history
  • Loading branch information
belforte committed Nov 18, 2024
1 parent badf36d commit 0b70bb2
Showing 1 changed file with 106 additions and 62 deletions.
168 changes: 106 additions & 62 deletions src/python/TaskWorker/Actions/RetryJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import socket
from collections import namedtuple

from ServerUtilities import executeCommand
from ServerUtilities import executeCommand, getLock
from ServerUtilities import MAX_DISK_SPACE, MAX_WALLTIME, MAX_MEMORY

if 'useHtcV2' in os.environ:
Expand All @@ -17,6 +17,12 @@

JOB_RETURN_CODES = namedtuple('JobReturnCodes', 'OK RECOVERABLE_ERROR FATAL_ERROR')(0, 1, 2)

# strings in fatal root exception text which indicate code problem, not corrupted file
# a small "knowledge data base"
NOT_FILE_RELATED_FATAL_ROOT_ERRORS = [
"already deleted (list name = TList)",
]

# Without this environment variable set, HTCondor takes a write lock per logfile entry
os.environ['_condor_ENABLE_USERLOG_LOCKING'] = 'false'

Expand Down Expand Up @@ -413,7 +419,9 @@ def check_corrupted_file(self, exitCode):

corruptedFile = False
suspiciousFile = False
fatalLine = False
inputFileName = 'NotAvailable'
errorLines = []
RSE = self.site
RSE = RSE if not RSE.startswith('T1') else f"{RSE}_Disk"
fname = os.path.realpath("WEB_DIR/job_out.%s.%d.txt" % (self.job_id, self.crab_retry))
Expand All @@ -424,73 +432,109 @@ def check_corrupted_file(self, exitCode):
# remember last opened file, in case of 8021 that's the one that matters
if line.startswith("== CMSSW:") and ' Successfully opened file' in line:
inputFileName = f"/store/{line.split('/store/')[1]}" # strip protocol part
if line.startswith("== CMSSW:") and "Fatal Root Error:" in line:
corruptedFile = True
self.logger.info("Corrupted input file found")
self.logger.debug(line)
errorLines = [line]
# file name is in next line
continue
if corruptedFile:
errorLines.append(line)
if '/store/' in line and '.root' in line:
# this may be better done in the script which processes the BadInputFiles reports
# if '/store/user' in line or '/store/group' in line and not 'rucio' in line:
# # no point in reporting files unknown to Rucio
# corruptedFile = False
# break

# extract the '/store/...root' part of this line
fragment1 = line.split('/store/')[1]
fragment2 = fragment1.split('.root')[0]
inputFileName = f"/store/{fragment2}.root"
self.logger.info(f"RSE: {RSE} - ec: {exitCode} - file: {inputFileName}")

else:
corruptedFile = False
suspiciousFile = True
errorLines.append('NOT CLEARLY CORRUPTED, OTHER ROOT ERROR ?')
errorLines.append('DID Identification may not be correct')
self.logger.info("RootFatalError does not contain file info")
# extract the Exception message
if fatalLine:
fatalExceptionLines.append(line)
if line.startswith("== CMSSW:") and " ----- Begin Fatal Exception" in line:
fatalExceptionLines = []
fatalLine = True
if line.startswith("== CMSSW:") and " ----- End Fatal Exception" in line:
break
if corruptedFile or suspiciousFile:
# do not report HammerCloud
username = self.reqname.split(':')[1].split('_')[0]
if username == 'sciaba':
return corruptedFile
# add pointers to logs
schedHostname = socket.gethostname().split('.')[0]
schedId = schedHostname.removeprefix('vocms') # vomcs059 -> 059, vocms0106 -> 0106 etc,
webDirUrl = f"https://cmsweb.cern.ch:8443/scheddmon/{schedId}/{username}/{self.reqname}"
stdoutUrl = f"{webDirUrl}/job_out.{self.job_id}.{self.crab_retry}.txt"
postJobUrl = f"{webDirUrl}/postjob.{self.job_id}.{self.crab_retry}.txt"
errorLines.append(f"stdout: {stdoutUrl}")
errorLines.append(f"postjob: {postJobUrl}")
# note things down
reportFileName = f'Badfile.job.{self.job_id}.{self.crab_retry}.json'
corruptionMessage = {'DID': f'cms:{inputFileName}', 'RSE': RSE,
'exitCode': exitCode, 'message': errorLines}
with open(reportFileName, 'w', encoding='utf-8') as fp:
json.dump(corruptionMessage, fp)
self.logger.info('corruption message prepared, gfal-copy to EOS')
proxy = os.getenv('X509_USER_PROXY')
self.logger.info(f"X509_USER_PROXY = {proxy}")
reportLocation = 'davs://eoscms.cern.ch:443/eos/cms/store/temp/user/BadInputFiles/'
# there can be so many that we better split by task
# parse fatal exception text
for line in fatalExceptionLines:
for falsePositive in NOT_FILE_RELATED_FATAL_ROOT_ERRORS:
if falsePositive in line:
return False
for line in fatalExceptionLines:
if "Fatal Root Error:" in fatalExceptionLines:
corruptedFile = True
self.logger.info("Corrupted input file found")
self.logger.debug(line)
errorLines = [line]
# file name is in next line
continue
if corruptedFile:
reportLocation += f'corrupted/new/{self.reqname}/'
if suspiciousFile:
reportLocation += f'suspicious/new/{self.reqname}/'

destination = reportLocation + reportFileName
cmd = f'gfal-copy -vp -t 60 {reportFileName} {destination}'
out, err, ec = executeCommand(cmd)
if ec:
self.logger.error(f'gfal-copy failed with out: {out} err: {err}')
errorLines.append(line)
if '/store/' in line and '.root' in line:
# this may be better done in the script which processes the BadInputFiles reports
# if '/store/user' in line or '/store/group' in line and not 'rucio' in line:
# # no point in reporting files unknown to Rucio
# corruptedFile = False
# break

# extract the '/store/...root' part of this line
fragment1 = line.split('/store/')[1]
fragment2 = fragment1.split('.root')[0]
inputFileName = f"/store/{fragment2}.root"
self.logger.info(f"RSE: {RSE} - ec: {exitCode} - file: {inputFileName}")
else:
corruptedFile = False
suspiciousFile = True
errorLines.append('NOT CLEARLY CORRUPTED, OTHER ROOT ERROR ?')
errorLines.append('DID Identification may not be correct')
self.logger.info("RootFatalError does not contain file info")
if corruptedFile or suspiciousFile:
corruptionMessage = {'DID': f'cms:{inputFileName}', 'RSE': RSE,
'exitCode': exitCode, 'message': errorLines}
self.reportBadInputFile(corruptedFile, suspiciousFile, corruptionMessage)
return corruptedFile

# = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =

def reportBadInputFile(self, corruptedFile, suspiciousFile, corruptionMessage):
"""
report bad file via a file on EOS
"""
taskName = self.reqname
username = taskName.split(':')[1].split('_')[0]
jobId = f"{self.job_id}.{self.crab_retry}"
# do not report HammerCloud
if username == 'sciaba':
return
# count reports for this task, too many indicates software error, not bad file(s)
fname = 'BadInputFileCount.json'
with getLock(fname): # use lock to avoid races with concurrent PostJobs
if os.path.exists(fname):
with open(fname, 'r', encoding='utf-8') as fp:
oldCount = int(json.load(fp))
else:
oldCount = 0
count = str(oldCount + 1)
with open(fname, 'w', encoding='utf-8') as fp:
json.dump(count, fp)
if count > 30:
return

# add pointers to logs
schedHostname = socket.gethostname().split('.')[0]
schedId = schedHostname.removeprefix('vocms') # vomcs059 -> 059, vocms0106 -> 0106 etc,
webDirUrl = f"https://cmsweb.cern.ch:8443/scheddmon/{schedId}/{username}/{taskName}"
stdoutUrl = f"{webDirUrl}/job_out.{jobId}.txt"
postJobUrl = f"{webDirUrl}/postjob.{jobId}.txt"
corruptionMessage['errorLines'].append(f"stdout: {stdoutUrl}")
corruptionMessage['errorLines'].append(f"postjob: {postJobUrl}")
# note things down
reportFileName = f'Badfile.job.{jobId}.json'
with open(reportFileName, 'w', encoding='utf-8') as fp:
json.dump(corruptionMessage, fp)
self.logger.info('corruption message prepared, gfal-copy to EOS')
proxy = os.getenv('X509_USER_PROXY')
self.logger.info(f"X509_USER_PROXY = {proxy}")
reportLocation = 'davs://eoscms.cern.ch:443/eos/cms/store/temp/user/BadInputFiles/'
# there can be so many that we better split by task
if corruptedFile:
reportLocation += f'corrupted/new/{taskName}/'
if suspiciousFile:
reportLocation += f'suspicious/new/{taskName}/'

destination = reportLocation + reportFileName
cmd = f'gfal-copy -vp -t 60 {reportFileName} {destination}'
out, err, ec = executeCommand(cmd)
if ec:
self.logger.error(f'gfal-copy failed with out: {out} err: {err}')

# = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =

def check_empty_report(self):
"""
Need a doc string here.
Expand Down

0 comments on commit 0b70bb2

Please sign in to comment.