This repository has been archived by the owner on Jan 30, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathSiteMover.py
2959 lines (2480 loc) · 118 KB
/
SiteMover.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Base class of site movers
# All site movers inherit from this class
import os
import commands
import re
import time
from urllib import urlopen, urlencode
from urllib2 import Request, urlopen
from futil import *
from pUtil import tolog, readpar, dumpOrderedItems, getDirectAccessDic, getSiteInformation
from PilotErrors import PilotErrors
from timed_command import timed_command
from configSiteMover import config_sm
from FileHandling import getExtension, getTracingReportFilename, writeJSON
PERMISSIONS_DIR = config_sm.PERMISSIONS_DIR
PERMISSIONS_FILE = config_sm.PERMISSIONS_FILE
CMD_CHECKSUM = config_sm.COMMAND_MD5
ARCH_DEFAULT = config_sm.ARCH_DEFAULT
class SiteMover(object):
"""
File movers move files between a storage element (of different kinds) and a local directory
get_data: SE->local
put_data: local->SE
getMover: static function returning a SiteMover
It furter provides functions useful for child classes (AAASiteMover):
put_data_retfail -- facilitate return in case of failure
mkdirWperm -- create recursively dirs setting appropriate permissions
getLocalFileInfo -- get size and checksum of a local file
This is the Default SiteMover, the SE has to be locally accessible for all the WNs
and all commands like cp, mkdir, md5checksum have to be available on files in the SE
E.g. NFS exported file system
"""
__childDict = {}
copyCommand = "cp"
checksum_command = "adler32"
has_mkdir = True
has_df = True
has_getsize = True
has_md5sum = True
has_chmod = True
permissions_DIR = PERMISSIONS_DIR
permissions_FILE = PERMISSIONS_FILE
arch_type = ARCH_DEFAULT
timeout = 5*3600
useTracingService = True
filesInRucioDataset = {}
CONDPROJ = ['oflcond', 'comcond', 'cmccond', 'tbcond', 'tbmccond', 'testcond']
PRODFTYPE = ['AOD', 'CBNT', 'ESD', 'EVNT', 'HIST', 'HITS', 'RDO', 'TAG', 'log', 'NTUP']
def __init__(self, setup_path='', *args, **kwrds):
self._setup = setup_path
def init_data(self, job):
pass
def get_timeout(self):
return self.timeout
def getChecksumCommand(self):
""" return the checksum command to be used with this site mover """
return self.checksum_command
def getID(self):
""" return the current copy command """
return self.copyCommand
def getSetup(self):
""" Return the setup string (pacman setup os setup script) for the copy command used by the mover """
return self._setup
def mountNSF4AndGetPFN(self, error, gpfn):
""" Get and check PNFS mount point, return the pfn """
ec = 0
pilotErrorDiag = ""
src_loc_pfn = ""
try:
if 'SFN' in gpfn:
seName = gpfn.replace("srm://", "").split(':8446/srm/managerv2?SFN=')[0]
src_loc_pfn = gpfn.split(':8446/srm/managerv2?SFN=')[1]
else:
seName = gpfn.replace("srm://", "").split('/')[0]
src_loc_pfn = gpfn.split('%s' % (seName))[1]
# seName = gpfn.replace("srm://", "").split(':8446/srm/managerv2?SFN=')[0]
# src_loc_pfn = gpfn.split(':8446/srm/managerv2?SFN=')[1]
except Exception, e:
pilotErrorDiag = "Exception caught: %s" % (e)
tolog("!!WARNING!!1887!! %s" % (pilotErrorDiag))
ec = error.ERR_STAGEINFAILED
return ec, pilotErrorDiag, src_loc_pfn
_cmd_str = 'mount -l -t nfs4|grep %s' % (seName)
timeout = 3600
try:
s, telapsed, cout, cerr = timed_command(_cmd_str, timeout)
except Exception, e:
tolog("!!WARNING!!1887!! timed_command() threw an exception: %s" % str(e))
s = 1
o = str(e)
telapsed = timeout
else:
o = cout + cerr
tolog("Elapsed time: %d" % (telapsed))
if s == 0:
try:
pnfsMountPoint = o.split()[2]
except Exception, e:
pilotErrorDiag = "Exception caught: %s" % (e)
tolog("!!WARNING!!1887!! %s" % (pilotErrorDiag))
ec = error.ERR_STAGEINFAILED
else:
if os.path.ismount("%s" % (pnfsMountPoint)):
tolog("PNFS Server: %s, mount point: %s" % (seName, pnfsMountPoint))
src_loc_pfn = '%s%s' % (pnfsMountPoint, src_loc_pfn)
else:
tolog("!!WARNING!!1887!! %s is no mount point" % (pnfsMountPoint))
pilotErrorDiag = "PNFS system error: %s" % (o)
ec = error.ERR_GETFAILEDTOMOUNTNFS4
else:
tolog("!!WARNING!!1887!! Command failed: %s" % (_cmd_str))
if is_timeout(s):
pilotErrorDiag = "Mount command was timed out after %d seconds" % (telapsed)
ec = error.ERR_GETTIMEOUT
else:
pilotErrorDiag = "PNFS system error: %s" % (o)
ec = error.ERR_GETPNFSSYSTEMERROR
return ec, pilotErrorDiag, src_loc_pfn
def get_data(self, gpfn, lfn, path, fsize=0, fchecksum=0, guid=0, **pdict):
"""
Move a file from the local SE (where it was put from DDM) to the working directory.
gpfn: full source URL (e.g. method://[host[:port]/full-dir-path/filename - a SRM URL is OK)
path: destination absolute path (in a local file system). It is assumed to be there. get_data returns an error if the path is missing
The local file is assumed to have a relative path that is the same of the relative path in the 'gpfn'
loc_...: variables used to access the file in the locally exported file system
"""
error = PilotErrors()
pilotErrorDiag = ""
# Get input parameters from pdict
timeout = pdict.get('timeout', 5*3600)
experiment = pdict.get('experiment', "ATLAS")
# get the Rucio tracing report
report = self.getStubTracingReport(pdict['report'], 'sm', lfn, guid)
# get the site information object
si = getSiteInformation(experiment)
src_loc_pfn = ''
if si.isTier3():
src_loc_pfn = gpfn
else:
if 'dpm' in gpfn:
# Get and Check PNFS mount point
ec, pilotErrorDiag, src_loc_pfn = self.mountNSF4AndGetPFN(error, gpfn)
if ec != 0:
return ec, pilotErrorDiag
else:
# remove any host and SFN info from PFN path
src_loc_pfn = self.extractPathFromPFN(gpfn)
src_loc_filename = lfn
# source vars: gpfn, loc_pfn, loc_host, loc_dirname, loc_filename
# dest vars: path
if fchecksum != 0 and fchecksum != "":
csumtype = SiteMover.getChecksumType(fchecksum)
else:
csumtype = "default"
if fsize == 0 or fchecksum == 0:
ec, pilotErrorDiag, fsize, fchecksum = SiteMover.getLocalFileInfo(src_loc_pfn, csumtype=csumtype)
if ec != 0:
self.prepareReport('LOCAL_FILE_INFO_FAIL', report)
return ec, pilotErrorDiag
dest_file = os.path.join(path, src_loc_filename)
# execute the copy command
#PN
_cmd_str = "cp %s %s" % (src_loc_pfn, dest_file)
# if ".lib." in src_loc_pfn:
# _cmd_str = _cmd_str.replace('XXX', '')
tolog("Executing command: %s" % (_cmd_str))
report['transferStart'] = time.time()
try:
s, telapsed, cout, cerr = timed_command(_cmd_str, timeout)
except Exception, e:
tolog("!!WARNING!!2999!! timed_command() threw an exception: %s" % str(e))
s = 1
o = str(e)
telapsed = timeout
else:
o = cout + cerr
tolog("Elapsed time: %d" % (telapsed))
report['validateStart'] = time.time()
# error code handling
if s != 0:
tolog("!!WARNING!!2990!! Command failed: %s" % (_cmd_str))
check_syserr(s, o)
if is_timeout(s):
pilotErrorDiag = "cp get was timed out after %d seconds" % (telapsed)
ec = error.ERR_GETTIMEOUT
else:
o = o.replace('\n', ' ')
if o.find("No such file or directory") >= 0:
if src_loc_pfn.find("DBRelease") >= 0:
pilotErrorDiag = "DBRelease file missing: %s" % (src_loc_pfn)
ec = error.ERR_MISSDBREL
else:
pilotErrorDiag = "No such file or directory: %s" % (src_loc_pfn)
ec = error.ERR_NOSUCHFILE
else:
pilotErrorDiag = "cp failed with output: ec = %d, output = %s" % (s, o)
ec = error.ERR_STAGEINFAILED
tolog("!!WARNING!!2999!! %s" % (pilotErrorDiag))
self.prepareReport('COPY_FAIL', report)
return ec, pilotErrorDiag
# get remote file size and checksum
ec, pilotErrorDiag, dstfsize, dstfchecksum = SiteMover.getLocalFileInfo(dest_file, csumtype=csumtype)
if ec != 0:
self.prepareReport('LOCAL_FILE_INFO_FAIL', report)
return ec, pilotErrorDiag
# compare remote and local file size
if dstfsize != fsize:
pilotErrorDiag = "Remote and local file sizes do not match for %s (%s != %s)" %\
(os.path.basename(gpfn), str(dstfsize), str(fsize))
tolog('!!WARNING!!2999!! %s' % (pilotErrorDiag))
self.prepareReport('FS_MISMATCH', report)
return error.ERR_GETWRONGSIZE, pilotErrorDiag
# compare remote and local file checksum
if dstfchecksum != fchecksum and not self.isDummyChecksum(fchecksum):
pilotErrorDiag = "Remote and local checksums (of type %s) do not match for %s (%s != %s)" %\
(csumtype, os.path.basename(gpfn), dstfchecksum, fchecksum)
tolog('!!WARNING!!2999!! %s' % (pilotErrorDiag))
if csumtype == "adler32":
self.prepareReport('AD_MISMATCH', report)
return error.ERR_GETADMISMATCH, pilotErrorDiag
else:
self.prepareReport('MD5_MISMATCH', report)
return error.ERR_GETMD5MISMATCH, pilotErrorDiag
self.prepareReport('DONE', report)
return 0, pilotErrorDiag
def getRSE(surl=None):
""" Return the Rucio site name (RSE ... Rucio Storage Element) using the SURL """
sitename = None
if surl:
try:
from dq2.info import TiersOfATLAS
except:
# Note: do not print the exception since it sometimes can not be converted to a string (as seen at Taiwan)
tolog("!!WARNING!!1119!! TiersOfATLAS could not be imported from dq2.info")
else:
sites = TiersOfATLAS.getAllDestinationSites()
for site in sites:
if TiersOfATLAS.isSURLFromSiteOrCloud(surl, site):
sitename = site
break
return sitename
getRSE = staticmethod(getRSE)
def getDefaultRSE(self):
""" Return the Rucio site name using the schedconfig.se info """
# Build a preliminary SURL using minimum information necessary for the getRSE() method
default_token, se = SiteMover.extractSE(readpar('se'))
tolog("default_token=%s, se=%s" % (default_token, se))
# Get a preliminary path
sepath = readpar('seprodpath')
if sepath == "":
sepath = readpar('sepath')
# Note that the sepath might not be simple, but can contain complex structures (brackets and commas)
# First create a properly formatted selist list and then use the default token to get the corresponding proper sepath
destinationList = self.getDirList(sepath)
tolog("destinationList=%s"%str(destinationList))
# Now find the proper sepath
destination = self.getMatchingDestinationPath(default_token, destinationList)
tolog("destination=%s"%destination)
# Create the SURL
surl = se + destination
tolog("surl=%s"%surl)
# Get the default Rucio site name
return SiteMover.getRSE(surl=surl)
def getTiersOfATLASAlternativeName(self, endpoint):
""" Return the alternativeName from TiersOfATLAS for a given edpoint """
alternativeName = ""
try:
from dq2.info import TiersOfATLAS
except:
# Note: do not print the exception since it sometimes can not be converted to a string (as seen at Taiwan)
tolog("!!WARNING!!1119!! TiersOfATLAS could not be imported from dq2.info")
else:
# Now get the alternativeName
tolog("endpoint=%s"%endpoint)
try:
alternativeName = TiersOfATLAS.getSiteProperty(endpoint, 'alternateName')[0]
except:
tolog("!!WARNING!!5656!! TiersOfATLAS.getSiteProperty() failed to find alternativeName for %s" % (endpoint))
return alternativeName
def getTiersOfATLASSE(self, endpoint):
""" Return the se from TiersOfATLAS """
se = ""
try:
from dq2.info import TiersOfATLAS
except:
tolog("!!WARNING!!1119!! TiersOfATLAS could not be imported from dq2.info")
else:
# Get the sites list
sites = TiersOfATLAS.ToACache.sites
# Get the se info
try:
se = sites[endpoint]['srm']
except Exception, e:
tolog("!!WARNING!!1120!! No such endpoint in TiersOfATLAS: %s" % (e))
else:
tolog("Endpoint %s corresponds to se=%s (TiersOfATLAS)" % (endpoint, se))
return se
def getGroupDiskPath(self, endpoint=""):
""" Get the seprodpath from TiersOfATLAS instead of schedconfig if destination is a groupdisk """
# We know it's a group disk if 'dst:' is present in the token descriptor (which in this case it the same as the endpoint name)
sepath = ""
# Remove the dst: substring from the endpoint string unless the alternativeName is different between the site and the requested endpoint
if "dst:" in endpoint:
endpoint = endpoint[len('dst:'):]
# Get the se from TiersOfATLAS
se = self.getTiersOfATLASSE(endpoint)
if se != "":
# Now extract the seprodpath from the srm info
sepath = SiteMover.extractSEPath(se)
# Add /rucio to sepath if not there already
if not sepath.endswith('/rucio'):
sepath += '/rucio'
else:
tolog("!!WARNING!!3999!! Group disk verification failed, space token will be reset to default value")
else:
tolog("!!WARNING!!2233!! Not a groupdisk endpoint: %s" % (endpoint))
return sepath
def verifyGroupSpaceToken(self, token):
""" Make sure that space token is valid in case group disk is requested """
# In case a groupdisk space token is requested, make sure that the site's alternativeName is the same as the endpoints' alternativeName
# They will have different alternativeNames if the job originates from a different cloud
# Note: ATLAS specific
if not token:
return None
if token.startswith("dst:"):
# Found a groupdisk space token
_token = token[len('dst:'):]
tolog("token=%s"%_token)
tolog("sitename=%s"%self.getDefaultRSE())
# Get the corresponding alternative name and compare it to the alternative name of the site
alternativeName_token = self.getTiersOfATLASAlternativeName(_token)
tolog("alternativeName_token = %s" % (alternativeName_token))
alternativeName_site = self.getTiersOfATLASAlternativeName(self.getDefaultRSE())
tolog("alternativeName_site = %s" % (alternativeName_site))
# Only proceed ith getting the groupdisk path if the alternativeName's are the same
if alternativeName_token == alternativeName_site:
tolog("Verified groupdisk token (same alternativeName for site and endpoint)")
else:
tolog("!!WARNING!!3999!! Alternative names are not the same for site and requested endpoint, will reset GROUPDISK")
default_token, _se = SiteMover.extractSE(readpar('se'))
tolog("Requested space token %s reset to %s" % (_token, default_token))
token = default_token
return token
def put_data_retfail(fail, errortext, surl=""):
"""
Provides the return value for put_data when there is a failure.
Used to enforce the number of parameters returned
"""
return fail, errortext, surl, 0, 0, ''
put_data_retfail = staticmethod(put_data_retfail)
def put_data(self, source, destination, fsize=0, fchecksum=0, **pdict):
"""
Move the file from the current local directory to a storage element
Parameters are:
source: full path of the file in the local directory
destination: destination SE, method://[hostname[:port]]/full-dir-path/ (NB: no file name)
fsize: file size of the source file (evaluated if 0)
fchecksum: checksum of the source file (evaluated if 0)
pdict: to allow additional parameters that may make sense for specific movers
Assume that the SE is locally mounted and its local path is the same as the remote path
if both fsize and fchecksum (for the source) are given and !=0 these are assumed without reevaluating them
returns: exitcode, gpfn, fsize, fchecksum
"""
error = PilotErrors()
pilotErrorDiag = ""
# Get input parameters from pdict
DN = pdict.get('DN', '')
lfn = pdict.get('lfn', '')
guid = pdict.get('guid', '')
token = pdict.get('token', '')
scope = pdict.get('scope', '')
dsname = pdict.get('dsname', '')
timeout = pdict.get('timeout', 5*3600)
analyJob = pdict.get('analJob', False)
testLevel = pdict.get('testLevel', '0')
extradirs = pdict.get('extradirs', '')
experiment = pdict.get('experiment', 'ATLAS')
prodSourceLabel = pdict.get('prodSourceLabel', '')
# get the Rucio tracing report
report = self.getStubTracingReport(pdict['report'], 'sm', lfn, guid)
# get the checksum type
if fchecksum != 0 and fchecksum != "":
csumtype = SiteMover.getChecksumType(fchecksum)
else:
csumtype = "default"
if fsize == 0 or fchecksum == 0:
ec, pilotErrorDiag, fsize, fchecksum = SiteMover.getLocalFileInfo(source, csumtype="adler32")
if ec != 0:
self.prepareReport('LOCAL_FILE_INFO_FAIL', report)
return SiteMover.put_data_retfail(ec, pilotErrorDiag)
# now that the file size is known, add it to the tracing report
report['filesize'] = fsize
# get the site information object
si = getSiteInformation(experiment)
# are we on a tier 3?
if si.isTier3():
dst_loc_se = SiteMover.getTier3Path(dsname, DN)
dst_prefix = ""
tolog("Writing output on a Tier 3 site to: %s" % (dst_loc_se))
else:
dst_se = destination
if dst_se.find('SFN') != -1: # srm://dcsrm.usatlas.bnl.gov:8443/srm/managerv1?SFN=/pnfs/usatlas.bnl.gov/
s = dst_se.split('SFN=')
dst_loc_se = s[1]
dst_prefix = s[0] + 'SFN='
else:
_sentries = dst_se.split('/', 3)
try:
dst_prefix = _sentries[0] + '//' + _sentries[2] # 'method://host:port' is it always a ftp server? can it be srm? something else?
dst_loc_se = '/'+ _sentries[3]
except Exception, e:
pilotErrorDiag = "Could not figure out destination path from dst_se (%s): %s" % (dst_se, str(e))
tolog('!!WARNING!!2999!! %s' % (pilotErrorDiag))
self.prepareReport('DEST_PATH_UNDEF', report)
return SiteMover.put_data_retfail(error.ERR_STAGEOUTFAILED, pilotErrorDiag)
# VCH added check for Tier3 sites because the ds name is added to the path in SiteMove.getTier3Path()
if si.isTier3():
dst_loc_sedir = os.path.join(dst_loc_se, extradirs)
else:
dst_loc_sedir = os.path.join(dst_loc_se, os.path.join(extradirs, dsname))
filename = os.path.basename(source)
ec, pilotErrorDiag, tracer_error, dst_loc_pfn, lfcdir, surl = si.getProperPaths(error, analyJob, token, prodSourceLabel, dsname, filename, scope=scope, sitemover=self) # quick workaround
if ec != 0:
self.prepareReport(tracer_error, report)
return self.put_data_retfail(ec, pilotErrorDiag)
#dst_loc_pfn = os.path.join(dst_loc_sedir, filename)
dst_gpfn = dst_prefix + dst_loc_pfn
try:
SiteMover.mkdirWperm(os.path.dirname(dst_loc_pfn))
#SiteMover.mkdirWperm(dst_loc_sedir)
except Exception, e:
tolog("!!WARNING!!2999!! Could not create dir: %s, %s" % (dst_loc_sedir, str(e)))
if testLevel == "1":
source = "thisisjustatest"
_cmd_str = "cp %s %s" % (source, dst_loc_pfn)
tolog("Executing command: %s" % (_cmd_str))
report['transferStart'] = time.time()
try:
s, telapsed, cout, cerr = timed_command(_cmd_str, timeout)
except Exception, e:
tolog("!!WARNING!!2999!! timed_command() threw an exception: %s" % str(e))
s = 1
o = str(e)
telapsed = timeout
else:
o = cout + cerr
tolog("Elapsed time: %d" % (telapsed))
report['validateStart'] = time.time()
if s != 0:
tolog("!!WARNING!!2990!! Command failed: %s" % (_cmd_str))
check_syserr(s, o)
if is_timeout(s):
pilotErrorDiag = "cp put was timed out after %d seconds" % (telapsed)
ec = error.ERR_PUTTIMEOUT
else:
o = o.replace('\n', ' ')
pilotErrorDiag = "cp failed with output: ec = %d, output = %s" % (s, o)
ec = error.ERR_STAGEOUTFAILED
self.prepareReport('COPY_FAIL', report)
return SiteMover.put_data_retfail(ec, pilotErrorDiag, surl=dst_loc_pfn)
tolog("!!WARNING!!2999!! %s" % (pilotErrorDiag))
# get remote file size and checksum
ec, pilotErrorDiag, dstfsize, dstfchecksum = SiteMover.getLocalFileInfo(dst_loc_pfn, csumtype="adler32")
if ec != 0:
self.prepareReport('LOCAL_FILE_INFO_FAIL', report)
return SiteMover.put_data_retfail(ec, pilotErrorDiag, surl=dst_loc_pfn)
# compare remote and local file size
if dstfsize != fsize:
pilotErrorDiag = "Remote and local file sizes do not match for %s (%s != %s)" %\
(os.path.basename(dst_gpfn), str(dstfsize), str(fsize))
tolog('!!WARNING!!2999!! %s' % (pilotErrorDiag))
self.prepareReport('FS_MISMATCH', report)
return SiteMover.put_data_retfail(error.ERR_PUTWRONGSIZE, pilotErrorDiag, surl=dst_loc_pfn)
# compare remote and local checksums
if dstfchecksum != fchecksum:
pilotErrorDiag = "Remote and local checksums (of type %s) do not match for %s (%s != %s)" %\
(csumtype, os.path.basename(dst_gpfn), dstfchecksum, fchecksum)
tolog('!!WARNING!!2999!! %s' % (pilotErrorDiag))
if csumtype == "adler32":
self.prepareReport('AD_MISMATCH', report)
return SiteMover.put_data_retfail(error.ERR_PUTADMISMATCH, pilotErrorDiag, surl=dst_loc_pfn)
else:
self.prepareReport('MD5_MISMATCH', report)
return SiteMover.put_data_retfail(error.ERR_PUTMD5MISMATCH, pilotErrorDiag, surl=dst_loc_pfn)
self.prepareReport('DONE', report)
return 0, pilotErrorDiag, str(dst_gpfn), fsize, fchecksum, ARCH_DEFAULT # Eddie added str, unicode protection
def getLCGPaths(self, destination, dsname, filename, lfcpath):
""" return the proper paths for lcg-cp/cr file transfer and registration """
# return full lfc file path (beginning lfcpath might need to be replaced)
native_lfc_path = self.to_native_lfn(dsname, filename)
# /grid/atlas/dq2/testpanda/testpanda.destDB.b7cd4b56-1b5e-465a-a5d7-38d5e2609724_sub01000457/
#58f836d5-ff4b-441a-979b-c37094257b72_0.job.log.tgz
# tolog("Native_lfc_path: %s" % (native_lfc_path))
# replace the default path /grid/atlas/rucio with lfcpath if different
# (to_native_lfn returns a path begining with /grid/atlas/rucio)
default_lfcpath = '/grid/atlas/rucio' # to_native_lfn always returns this at the beginning of the string
if default_lfcpath != lfcpath:
final_lfc_path = native_lfc_path.replace(default_lfcpath, lfcpath)
else:
final_lfc_path = native_lfc_path
stripped_lfcpath = os.path.dirname(native_lfc_path[len(default_lfcpath):]) # the rest (to be added to the 'destination' variable)
# /testpanda/testpanda.destDB.b7cd4b56-1b5e-465a-a5d7-38d5e2609724_sub01000457/58f836d5-ff4b-441a-979b-c37094257b72_0.job.log.tgz
# tolog("stripped_lfcpath: %s" % (stripped_lfcpath))
# full file path for disk
if stripped_lfcpath[0] == "/":
stripped_lfcpath = stripped_lfcpath[1:]
destination = os.path.join(destination, stripped_lfcpath)
# /pnfs/tier2.hep.manchester.ac.uk/data/atlas/dq2/testpanda/testpanda.destDB.fcaf8da5-ffb6-4a63-9963-f31e768b82ef_sub01000345
# tolog("Updated SE destination: %s" % (destination))
# name of dir to be created in LFC
lfcdir = os.path.dirname(final_lfc_path)
# /grid/atlas/dq2/testpanda/testpanda.destDB.dfb45803-1251-43bb-8e7a-6ad2b6f205be_sub01000492
# tolog("LFC dir: %s" % (lfcdir))
return destination, lfcdir
def getPreDestination(self, analyJob, token, prodSourceLabel, alt=False):
""" get the pre destination """
destination = ""
# Special case for GROUPDISK
# In this case, (e.g.) token = 'dst:AGLT2_PERF-MUONS'
# Pilot should then consult TiersOfATLAS and get it from the corresponding srm entry
if token != None and "dst:" in token:
# if the job comes from a different cloud than the sites' cloud, destination will be set to "" and the
# default space token will be used instead (the transfer to groupdisk will be handled by DDM not pilot)
destination = self.getGroupDiskPath(endpoint=token)
if destination != "":
if destination.endswith('//rucio'):
destination = destination.replace('//rucio','/rucio')
tolog("GROUPDISK token requested (%s), destination=%s" % (token, destination))
return destination
else:
# Reset the space token to the default value
default_token, _se = SiteMover.extractSE(readpar('se'))
tolog("Requested space token %s reset to %s" % (token, default_token))
token = default_token
if not analyJob:
# process the destination path with getDirList since it can have a complex structure
# as well as be a list of destination paths matching a corresponding space token
if prodSourceLabel == 'ddm' and readpar('seprodpath', alt=alt) == '':
sepath = readpar('sepath', alt=alt)
else:
sepath = readpar('seprodpath', alt=alt)
destinationList = self.getDirList(sepath)
# decide which destination path to use depending on the space token for the current file
if token:
# find the proper path
destination = self.getMatchingDestinationPath(token, destinationList, alt=alt)
if destination == "":
tolog("!!WARNING!!2990!! seprodpath not properly defined: seprodpath = %s, destinationList = %s, using sepath instead" %\
(sepath, str(destinationList)))
sepath = readpar('sepath', alt=alt)
destinationList = self.getDirList(sepath)
destination = self.getMatchingDestinationPath(token, destinationList, alt=alt)
if destination == "":
tolog("!!WARNING!!2990!! sepath not properly defined: sepath = %s, destinationList = %s" %\
(sepath, str(destinationList)))
else:
# space tokens are not used
destination = destinationList[0]
else:
sepath = readpar('sepath', alt=alt)
destinationList = self.getDirList(sepath)
# decide which destination path to use depending on the space token for the current file
if token:
# find the proper path
destination = self.getMatchingDestinationPath(token, destinationList, alt=alt)
if destination == "":
tolog("!!WARNING!!2990!! sepath not properly defined: sepath = %s, destinationList = %s" %\
(sepath, str(destinationList)))
else:
# space tokens are not used
destination = destinationList[0]
return destination
def getUserLFCDir(destination, lfcpath, dsname):
""" Get the LFC dir path for a user job """
ec = 0
pilotErrorDiag = ""
lfcdir = ""
# old pat = re.compile('([^\.]+\.[^\.]+)\..*')
# pat = re.compile('([^\.]+\.[^\.]+\.[^\.]+[^\.]+)\..*')
pat = re.compile('([^\.]+\.[^\.]+\.[^\.]+)\..*')
mat = pat.match(dsname)
if mat:
# old prefixdir = mat.group(1) # 'user.pnilsson'
subdirs = mat.group(1).split('.') # 'user.pnilsson.0915151927'
_user = subdirs[0] # 'user'
_username = subdirs[1] # 'pnilsson'
_field3 = subdirs[2] # '0915151927'
prefixdir = os.path.join(_user, _username, _field3)
destination = os.path.join(destination, prefixdir)
if lfcpath != "":
lfcdir = os.path.join(lfcpath, prefixdir, dsname)
tolog("LFC dir: %s" % (lfcdir))
tolog("SE destination: %s" % (destination))
else:
error = PilotErrors()
ec = error.ERR_STAGEOUTFAILED
pilotErrorDiag = "put_data encountered an unexpected dataset name format: %s" % (dsname)
tolog('!!WARNING!!2990!! %s' % (pilotErrorDiag))
return 0, pilotErrorDiag, destination, str(lfcdir) # Eddie added str, unicode protection
getUserLFCDir = staticmethod(getUserLFCDir)
def getFinalLCGPaths(self, analyJob, destination, dsname, filename, lfcpath, token, prodSourceLabel, scope="", alt=False):
"""
set up paths differently for analysis and production jobs
use conventional LFC paths or production jobs
use special convention for analysis jobs (Aug-Sep 2011)
"""
dst_gpfn = ""
lfcdir = ""
if "/rucio" in destination and scope != "":
useRucio = True
else:
useRucio = False
if analyJob: # for analysis jobs
ec, pilotErrorDiag, destination, lfcdir = self.getUserLFCDir(destination, lfcpath, dsname)
if ec != 0:
return ec, pilotErrorDiag, dst_gpfn, lfcdir
dst_gpfn = os.path.join(destination, os.path.join(dsname, filename))
else:
# get the proper paths
destination, lfcdir = self.getLCGPaths(destination, dsname, filename, lfcpath)
dst_gpfn = os.path.join(destination, filename)
# /pnfs/tier2.hep.manchester.ac.uk/data/atlas/dq2/testpanda/testpanda.destDB.dfb45803-1251-43bb-8e7a-6ad2b6f205be_sub01000492
# overwrite the dst_gpfn if path contains /rucio
if useRucio:
dst_gpfn = self.getPathFromScope(scope, filename)
# correct for a potentially missing sepath
sepath = self.getPreDestination(analyJob, token, prodSourceLabel, alt=alt)
if not sepath in dst_gpfn:
dst_gpfn = os.path.join(sepath, dst_gpfn)
# correct for possible double rucio substring
if "rucio/rucio" in dst_gpfn:
dst_gpfn = dst_gpfn.replace('rucio/rucio', 'rucio')
return 0, "", dst_gpfn, lfcdir
def check_space_df(self, dst_loc_se):
""" Run df to check space availability """
avail = -1
s, o = commands.getstatusoutput('df %s' % (dst_loc_se))
if s != 0:
check_syserr(s, o)
tolog("WARNING: Error in running df: %s" % str(o))
else:
output = o.strip().split('\n')
for l in output:
m = re.search('\s\s*([0-9]*)\s\s*([0-9]*)\s\s*([0-9]*)\%\s', l)
if m != None:
avail = int(m.group(2))/1048576
break
return avail
def getStubTracingReport(self, initial_report, protocol, filename, guid):
""" Return the first part of the tracing report """
try:
report = initial_report
except:
report = {}
else:
# set the proper protocol
report['protocol'] = protocol
# mark the catalog (or relative?) start
report['catStart'] = time.time()
# the current file
report['filename'] = filename
# guid
report['guid'] = guid.replace('-','')
return report
def sendTrace(self, report):
""" Go straight to the tracing server and post the instrumentation dictionary """
if not self.useTracingService:
tolog("Experiment is not using Tracing service. skip sending tracing report")
return
url = 'https://rucio-lb-prod.cern.ch/traces/'
tolog("Tracing server: %s" % (url))
tolog("Sending tracing report: %s" % str(report))
try:
# take care of the encoding
#data = urlencode({'API':'0_3_0', 'operation':'addReport', 'report':report})
from json import dumps
data = dumps(report).replace('"','\\"')
from SiteInformation import SiteInformation
si = SiteInformation()
sslCertificate = si.getSSLCertificate()
# create the command
cmd = 'curl --connect-timeout 20 --max-time 120 --cacert %s -v -k -d "%s" %s' % (sslCertificate, data, url)
tolog("Executing command: %s" % (cmd))
s,o = commands.getstatusoutput(cmd)
if s != 0:
raise Exception(str(o))
except:
# if something fails, log it but ignore
from sys import exc_info
tolog('!!WARNING!!2999!! tracing failed: %s' % str(exc_info()))
else:
tolog("Tracing report sent")
def prepareReport(self, state, report):
""" Prepare the Rucio tracing report. Set the client exit state and finish """
if report.has_key('timeStart'):
# Handle the client state which might be a string or a dictionary
if type(state) is str:
report['clientState'] = state
elif type(state) is dict:
for key in state.keys():
report[key] = state[key]
else:
tolog("!!WARNING!!3332!! Do not know how to handle this tracing state: %s" % str(state))
# Store the tracing report to file
filename = getTracingReportFilename()
status = writeJSON(filename, report)
if status:
tolog("Wrote tracing report to file %s (cwd=%s)" % (filename, os.getcwd()))
else:
tolog("!!WARNING!!3333!! Failed to write tracing report to file")
# Send the report
#try:
# self.sendTrace(report)
#except Exception, e:
# tolog("!!WARNING!!3334!! Failed to send tracing report: %s" % (e))
else:
tolog("!!WARNING!!3331!! No timeStart found in tracing report, cannot send")
def sendReport(self, report):
""" Send Rucio tracing report. Set the client exit state and finish """
if report.has_key('timeStart'):
# finish instrumentation
report['timeEnd'] = time.time()
# send report
tolog("Sending tracing report: %s" % str(report))
self.sendTrace(report)
else:
tolog("!!WARNING!!21211! Tracing report does not have a timeStart entry: %s" % str(report))
@classmethod
def getSURLDictionaryFilename(self, directory, jobId):
""" return the name of the SURL dictionary file """
return os.path.join(directory, "surlDictionary-%s.%s" % (jobId, getExtension()))
@classmethod
def getSURLDictionary(self, directory, jobId):
""" get the SURL dictionary from file """
surlDictionary = {}
# open the dictionary for reading
filename = self.getSURLDictionaryFilename(directory, jobId)
if not os.path.exists(filename):
tolog("SURL dictionary does not exist: %s (will be created)" % (filename))
return surlDictionary
try:
fp = open(filename, "r")
except OSError, e:
tolog("!!WARNING!!1800!! Failed to open SURL dictionary for reading: %s" % str(e))
else:
# get the dictionary
importedLoad = False
if filename.endswith('json'):
try:
from json import load
except Exception, e:
tolog("!!WARNING!!1800!! Could not import load function from json module (too old python version?): %s" % str(e))
else:
importedLoad = True
else:
from pickle import load
importedLoad = True
if importedLoad:
# load the dictionary from file
try:
# load the dictionary from file
surlDictionary = load(fp)
except:
tolog("!!WARNING!!1800!! JobState could not deserialize file: %s" % (filename))
else:
tolog("Deserialized surl dictionary with %d keys: filename=%s" % (len(surlDictionary.keys()), filename))
#tolog("surlDictionary=%s" % str(surlDictionary))
fp.close()
return surlDictionary
@classmethod
def putSURLDictionary(self, surlDictionary, directory, jobId):
""" store the updated SURL dictionary """
status = False
# open the dictionary for writing
filename = self.getSURLDictionaryFilename(directory, jobId)
try:
fp = open(filename, "w")
except OSError, e:
tolog("!!WARNING!!1800!! Could not open SURL dictionary for writing: %s" % str(e))
else:
# write the dictionary
if filename.endswith('json'):
from json import dump
else:
from pickle import dump
try:
# write the dictionary to file
dump(surlDictionary, fp)
except Exception, e:
tolog("!!WARNING!!1800!! Could not encode data to SURL dictionary file: %s, %s" % (filename, str(e)))
else:
status = True
fp.close()
return status
@classmethod
def updateSURLDictionary(self, guid, surl, directory, jobId):
""" add the guid and surl to the surl dictionary """
status = False
tolog("Adding GUID (%s) and SURL (%s) to dictionary" % (guid, surl))
# (re-)open dictionary if possible (the dictionary will be empty if the function is called for the first time)
surlDictionary = self.getSURLDictionary(directory, jobId)
# add the guid and surl to the dictionary
surlDictionary[guid] = surl
# store the updated dictionary
if self.putSURLDictionary(surlDictionary, directory, jobId):
tolog("Successfully updated SURL dictionary (which currectly has %d key(s))" % len(surlDictionary.keys()))
status = True
else:
tolog("!!FAILED!!1800!! SURL dictionary could not be updated (later LFC registration will not work)")
return status
def getFileInfoFromRucio(self, scope, dataset, guid):
""" Get the file size and checksum from Rucio """
filesize = ""
checksum = ""
tolog("scope=%s"%scope)
tolog("dataset=%s"%dataset)
tolog("guid=%s"%guid)
pre = scope + ":"
if dataset.startswith(pre):
dataset = dataset.replace(pre, "")
try:
from rucio.client import Client
client = Client()
replica_list = [i for i in client.list_files(scope, dataset)]
except Exception, e:
tolog("!!WARNING!!2233!! Exception caught: %s" % (e))
else:
# Extract the info for the correct guid
tolog("Rucio returned a replica list with %d entries" % (len(replica_list)))
for i in range(0, len(replica_list)):
# replica = {u'adler32': u'9849e8ae', u'name': u'EVNT.01580095._002901.pool.root.1', u'bytes': 469906, u'scope': u'mc12_13TeV', u'guid': u'F88E0A836696344981358463A641A486', u'events': None}
# Is it the replica we are looking for?
if not "-" in replica_list[i]['guid']:
# Convert the guid (guids in Rucio might not have dashes)
guid = guid.replace('-', '')
if guid == replica_list[i]['guid']:
checksum = replica_list[i]['adler32']
filesize = str(replica_list[i]['bytes'])
events = replica_list[i]['events']
if events != None:
tolog("File %s has checksum %s, size %s and %d events" % (replica_list[i]['name'], checksum, filesize, str(replica_list[i]['events'])))
else:
tolog("File %s has checksum %s and size %s (no recorded events)" % (replica_list[i]['name'], checksum, filesize))
break