-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathdbsClient.py
1993 lines (1633 loc) · 86.1 KB
/
dbsClient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from dbs.exceptions.dbsClientException import dbsClientException
from RestClient.ErrorHandling.RestClientExceptions import HTTPError
from RestClient.RestApi import RestApi
from RestClient.AuthHandling.X509Auth import X509Auth
from RestClient.ProxyPlugins.Socks5Proxy import Socks5Proxy
import json
import gzip
import os
import socket
import sys
import urllib.request, urllib.parse, urllib.error
def fixUrlPath(path):
"""Fix url path"""
arr = path.split("://")
if len(arr) != 2:
raise Exception("wrong URL pattern %s" % path)
path = '/'.join([i for i in arr[1].split("/") if i])
return '{}://{}'.format(arr[0], path)
def serverCode(data):
"""
Extract server code from provided DBS error data
"""
if isinstance(data, dict) and 'exception' in data:
dbsErr = data.get('error', {})
return dbsErr.get('code', 0)
if isinstance(data, list) and len(data) == 1 and 'exception' in data[0]:
data = data[0]
dbsErr = data.get('error', {})
return dbsErr.get('code', 0)
return 0
def headerHas(headers, content, mime):
"""
Check mime content in HTTP headers which represented as multi line string.
The content is pattern a particular header should start with and mime
is a value of the content. For instance, a typical usage is to check if
given content type exists in HTTP Header, e.g. Content-Type: application/json
"""
for hdr in headers.split('\n'):
if content.lower() in hdr.lower():
if mime.lower() in hdr.lower():
return True
return False
def compress(body):
"""
Compress data using gzip
"""
if isinstance(body, str):
return gzip.compress(bytes(body, 'utf-8'))
return gzip.compress(body)
def decompress(body):
"""
Decompress given data from gzip'ed format
"""
return gzip.decompress(body).decode('utf-8')
def parseStream(results):
"""
Parse given stream of results
:param: results: list JSON records
:type: list
:return: generator of JSON records
"""
for rec in results.split('\n'):
if rec:
yield json.loads(rec)
def aggAttribute(results, attr):
"""
performs aggregation based on given attribute
:param: results: list JSON records
:type: list
:param: attr is name of the attribute in JSON record, e.g. run_num
:type: string
:return: aggregated list of JSON records
"""
rows = []
for row in results:
if attr in row and not isinstance(row[attr], list):
rows.append(row[attr])
if len(rows) > 0:
return [{attr: rows}]
return results
def aggRuns(results):
"""
performs runs API aggregation
:param: results: list JSON records
:type: list
:return: aggregated list of JSON records
"""
return aggAttribute(results, 'run_num')
def aggReleaseVersions(results):
"""
performs release version API aggregation
:param: results: list JSON records
:type: list
:return: aggregated list of JSON records
"""
return aggAttribute(results, 'release_version')
def aggDatasetAccessTypes(results):
"""
performs dataset access types API aggregation
:param: results: list JSON records
:type: list
:return: aggregated list of JSON records
"""
return aggAttribute(results, 'dataset_access_type')
def aggFileLumis(results):
"""
performs filelumis API aggregation
:param: results: list JSON records
:type: list
:return: aggregated list of JSON records
"""
output = []
file_run_lumi = {}
event_ct = False
if results and 'event_count' in results[0]:
event_ct = True
for rec in results:
run = rec['run_num']
lfn = rec['logical_file_name']
lumi = rec['lumi_section_num']
if isinstance(lumi, list):
break
if event_ct:
file_run_lumi.setdefault((lfn, run), []).append(
[rec['lumi_section_num'], rec['event_count']])
else:
file_run_lumi.setdefault((lfn, run), []).append(
rec['lumi_section_num'])
# if we get results from Python server no aggregation is required
if len(file_run_lumi) == 0:
return results
# if we got results from GO server we need to perform
# aggregation of results based on file/run pair
for key in file_run_lumi.keys():
val = file_run_lumi[key]
if event_ct:
lumi=[]
event=[]
for item in val:
lumi.append(item[0])
event.append(item[1])
rec = {
'logical_file_name':key[0],
'run_num':key[1],
'lumi_section_num':lumi,
'event_count':event
}
else:
rec = {
'logical_file_name':key[0],
'run_num':key[1],
'lumi_section_num':val
}
output.append(rec)
return output
def aggFileParents(results):
"""
performs FileParents API aggregation
:param: results: list JSON records
:type: list
:return: aggregated list of JSON records
"""
output = []
parents = {}
for rec in results:
lfn = rec['logical_file_name']
parent = rec['parent_logical_file_name']
if isinstance(parent, list):
break
parents.setdefault(lfn, []).append(parent)
# if we get results from Python server no aggregation is required
if len(parents) == 0:
return results
# if we got results from GO server we need to perform aggregation of results based on file/run pair
for lfn in parents.keys():
rec = {'logical_file_name': lfn, 'parent_logical_file_name': parents[lfn]}
output.append(rec)
return output
def aggFileChildren(results):
"""
performs FileChildren API aggregation
:param: results: list JSON records
:type: list
:return: aggregated list of JSON records
"""
output = []
children = {}
for rec in results:
lfn = rec['logical_file_name']
child = rec['child_logical_file_name']
if isinstance(child, list):
break
children.setdefault(lfn, []).append(child)
# if we get results from Python server no aggregation is required
if len(children) == 0:
return results
# if we got results from GO server we need to perform
# aggregation of results based on file/run pair
for lfn in children.keys():
rec = {'logical_file_name': lfn, 'child_logical_file_name': children[lfn]}
output.append(rec)
return output
def aggFileParentsByLumi(results):
"""
performs fileparentsbylumi API aggregation
:param: results: list JSON records
:type: list
:return: aggregated list of JSON records
"""
ids = []
for rec in results:
if 'cid' not in rec:
return results
ids.append([rec['cid'], rec['pid']])
return [{'child_parent_id_list': ids}]
def aggParentDSTrio(results):
"""
performs parentDSTrio API aggregation
:param: results: list JSON records
:type: list
:return: aggregated list of JSON records
"""
rdict = {}
for row in results:
if 'pfid' not in row:
return results
pfid = row['pfid']
rdict.setdefault(pfid, []).append([row['r'], row['l']])
return [rdict]
def aggBlockTrio(results):
"""
performs listBlockTrio API aggregation
:param: results: list JSON records
:type: list
:return: aggregated list of JSON records
"""
rdict = {}
for row in results:
if 'cfid' not in row:
return results
cfid = row['cfid']
rdict.setdefault(cfid, []).append([row['r'], row['l']])
return [rdict]
def slicedIterator(sourceList, sliceSize):
"""
:param: sourceList: list which need to be sliced
:type: list
:param: sliceSize: size of the slice
:type: int
:return: iterator of the sliced list
"""
start = 0
end = 0
while len(sourceList) > end:
end = start + sliceSize
yield sourceList[start: end]
start = end
def checkInputParameter(method, parameters, validParameters, requiredParameters=None):
"""
Helper function to check input by using before sending to the server
:param method: Name of the API
:type method: str
:param validParameters: Allow parameters for the API call
:type validParameters: list
:param requiredParameters: Required parameters for the API call (Default: None)
:type requiredParameters: list
"""
for parameter in parameters:
if parameter not in validParameters:
raise dbsClientException("Invalid input",
"API %s does not support parameter %s. Supported parameters are %s" \
% (method, parameter, validParameters))
if requiredParameters is not None:
if 'multiple' in requiredParameters:
match = False
for requiredParameter in requiredParameters['multiple']:
if requiredParameter!='detail' and requiredParameter in parameters:
match = True
break
if not match:
raise dbsClientException("Invalid input",
"API %s does require one of the parameters %s" \
% (method, requiredParameters['multiple']))
if 'forced' in requiredParameters:
for requiredParameter in requiredParameters['forced']:
if requiredParameter not in parameters:
raise dbsClientException("Invalid input",
"API %s does require the parameter %s. Forced required parameters are %s" \
% (method, requiredParameter, requiredParameters['forced']))
if 'standalone' in requiredParameters:
overlap = []
for requiredParameter in requiredParameters['standalone']:
if requiredParameter in parameters:
overlap.append(requiredParameter)
if len(overlap) != 1:
raise dbsClientException("Invalid input",
"API %s does requires only *one* of the parameters %s." \
% (method, requiredParameters['standalone']))
def list_parameter_splitting(data, key, size_limit=8000, method='GET'):
"""
Helper function split list used as input parameter for requests,
since Apache has a limitation to 8190 Bytes for the lenght of an URI.
We extended it to also split lfn and dataset list length for POST calls to avoid
DB abuse even if there is no limit on hoe long the list can be. YG 2015-5-13
:param data: url parameters
:type data: dict
:param key: key of parameter dictionary to split by lenght
:type used_size: str
:param size_limit: Split list in chunks of maximal size_limit bytes
:type size_limit: int
"""
values = list(data[key])
data[key] = []
for element in values:
data[key].append(element)
if method =='GET':
size = len(urllib.parse.urlencode(data))
else:
size = len(data)
if size > size_limit:
last_element = data[key].pop()
yield data
data[key] = [last_element]
yield data
def split_calls(func):
"""
Decorator to split up server calls for methods using url parameters, due to the lenght
limitation of the URI in Apache. By default 8190 bytes
"""
def wrapper(*args, **kwargs):
"""
The size limit is 8190 bytes minus url and api to call
For example (https://cmsweb-testbed.cern.ch:8443/dbs/prod/global/filechildren), so 192 bytes should be safe.
"""
size_limit = 8000
encoded_url = urllib.parse.urlencode(kwargs)
if len(encoded_url) > size_limit:
for key, value in kwargs.items():
###only one (first) list at a time is splitted,
###currently only file lists are supported
if key in ('logical_file_name', 'block_name', 'lumi_list', 'run_num') and isinstance(value, list):
ret_val = []
for splitted_param in list_parameter_splitting(data=dict(kwargs), #make a copy, since it is manipulated
key=key,
size_limit=size_limit):
try:
ret_val.extend(func(*args, **splitted_param))
except (TypeError, AttributeError):#update function call do not return lists
ret_val= []
return ret_val
raise dbsClientException("Invalid input",
"The lenght of the urlencoded parameters to API %s \
is exceeding %s bytes and cannot be splitted." % (func.__name__, size_limit))
else:
return func(*args, **kwargs)
return wrapper
class DbsApi(object):
#added CAINFO and userAgent (see github issue #431 & #432)
def __init__(self, url="", proxy=None, key=None, cert=None, verifypeer=True, debug=0, ca_info=None, userAgent="", port=8443, accept="application/json", aggregate=True, useGzip=False):
"""
DbsApi Constructor
:param url: server URL without port
:type url: str
:param proxy: socks5 proxy format=(socks5://username:password@host:port)
:type proxy: str
:param key: full path to the private key to use
:type key: str
:param cert: full path to the certificate to use
:type cert: str
:param port: server port
:type port int
.. note::
By default the DbsApi is trying to lookup the private key and the certificate in the common locations
"""
if isinstance(url, bytes):
url = url.decode("utf-8")
if url.find(":", 6) == -1:
self.url = url.replace(".cern.ch/dbs/", ".cern.ch:" + str(port) + "/dbs/", 1)
else:
self.url = url
self.url = fixUrlPath(self.url)
self.proxy = proxy
self.key = key
self.cert = cert
self.userAgent = userAgent
self.accept = accept
self.aggregate = aggregate
self.debug = debug
self.http_response = None
self.gzip = useGzip
self.rest_api = RestApi(auth=X509Auth(ssl_cert=cert, ssl_key=key, ssl_verifypeer=verifypeer, ca_info=ca_info),
proxy=Socks5Proxy(proxy_url=self.proxy) if self.proxy else None)
def __callServer(self, method="", params={}, data={}, callmethod='GET', content='application/json', aggFunc=None):
"""
A private method to make HTTP call to the DBS Server
:param method: REST API to call, e.g. 'datasets, blocks, files, ...'.
:type method: str
:param params: Parameters to the GET API call, e.g. {'dataset':'/PrimaryDS/ProcessedDS/TIER'}.
:type params: dict
:param callmethod: The HTTP method used, by default it is HTTP-GET, possible values are GET, POST and PUT.
:type callmethod: str
:param content: The type of content the server is expected to return. DBS3 only supports application/json
:type content: str
:param data: Parameters to POST API call
:type data: dict
"""
UserID = os.environ['USER']+'@'+socket.gethostname()
try:
UserAgent = "DBSClient/"+os.environ['DBS3_CLIENT_VERSION']+"/"+ self.userAgent
except:
UserAgent = "DBSClient/Unknown"+"/"+ self.userAgent
request_headers = {"Content-Type": content, "Accept": self.accept, "UserID": UserID, "User-Agent":UserAgent}
method_func = getattr(self.rest_api, callmethod.lower())
data = json.dumps(data)
if self.gzip and callmethod == 'POST':
request_headers['Content-Encoding'] = 'gzip'
data = compress(data)
try:
if self.debug:
print("HTTP={} URL={} method={} params={} data={} headers={}".format(callmethod, self.url, method, params, data, request_headers))
self.http_response = method_func(self.url, method, params, data, request_headers)
except HTTPError as http_error:
if headerHas(http_error.header, 'Content-Type', 'application/json'):
self.__parseForException(http_error)
else:
msg = "\nURL={}\nCode={}\nMessage={}\nHeader={}\nBody={}\n".format(
http_error.url,
http_error.code,
http_error.msg,
http_error.header,
http_error.body)
data = HTTPError(http_error.url, http_error.code, msg, http_error.header, http_error.body, serverCode(http_error))
self.__parseForException(data)
if self.accept == "application/ndjson":
return parseStream(self.http_response.body)
if self.accept != "application/json":
return self.http_response.body
try:
json_ret=json.loads(self.http_response.body)
except ValueError as ex:
print("The server output is not a valid json, most probably you have a typo in the url.\n%s.\n" % self.url, file=sys.stderr)
raise dbsClientException("Invalid url", "Possible urls are %s" % self.http_response.body)
if self.aggregate and aggFunc:
return aggFunc(json_ret)
return json_ret
def __parseForException(self, http_error):
"""
An internal method, should not be used by clients
:param httperror: Thrown httperror by the server
"""
data = http_error.body
if type(data) == str and str(data).find("<html>")!=-1 and str(data).find("</html>")!=-1:
raise http_error
try:
data = json.loads(data)
print("DBS Server error:", data)
# re-raise with more detail
if isinstance(data, dict) and 'exception' in data:
raise HTTPError(http_error.url, data['exception'], data['message'], http_error.header, http_error.body, serverCode(data))
# DBS go server provides errors as list data-type
if isinstance(data, list) and len(data) == 1 and 'exception' in data[0]:
data = data[0]
raise HTTPError(http_error.url, data['exception'], data['message'], http_error.header, http_error.body, serverCode(data))
except Exception as exp:
raise exp
raise http_error
@property
def requestTimingInfo(self):
"""
Returns the time needed to process the request by the frontend server in microseconds
and the EPOC timestamp of the request in microseconds.
:rtype: tuple containing processing time and timestamp
"""
try:
return tuple(item.split('=')[1] for item in self.http_response.header.get('CMS-Server-Time').split())
except AttributeError:
return None, None
@property
def requestContentLength(self):
"""
Returns the content-length of the content return by the server
:rtype: str
"""
try:
return self.http_response.header.get('Content-Length')
except AttributeError:
return None
def blockDump(self,**kwargs):
"""
API the list all information related with the block_name
:param block_name: Name of the block to be dumped (Required)
:type block_name: str
"""
validParameters = ['block_name']
requiredParameters = {'forced':validParameters}
checkInputParameter(method="blockDump", parameters=list(kwargs.keys()), validParameters=validParameters,
requiredParameters=requiredParameters)
return self.__callServer("blockdump", params=kwargs)
def help(self, **kwargs):
"""
API to get a list of supported REST APIs. In the case a particular API is specified,
the docstring of that API is displayed.
:param call: REST API call for which help is desired (Optional)
:type call: str
:return: List of APIs or detailed information about a specific call (parameters and docstring)
:rtype: List of strings or a dictionary containing params and doc keys depending on the input parameter
"""
validParameters = ['call']
checkInputParameter(method="help", parameters=list(kwargs.keys()), validParameters=validParameters)
return self.__callServer("help", params=kwargs)
def insertAcquisitionEra(self, acqEraObj):
"""
API to insert an Acquisition Era in DBS
:param acqEraObj: Acquisition Era object
:type acqEraObj: dict
:key acquisition_era_name: Acquisition Era Name (Required)
:key start_date: start date of the acquisition era (unixtime, int) (Optional, default current date)
:key end_date: end data of the acquisition era (unixtime, int) (Optional)
"""
return self.__callServer("acquisitioneras", data=acqEraObj, callmethod='POST' )
def insertBlock(self, blockObj):
"""
API to insert a block into DBS
:param blockObj: Block object
:type blockObj: dict
:key open_for_writing: Open For Writing (1/0) (Optional, default 1)
:key block_size: Block Size (Optional, default 0)
:key file_count: File Count (Optional, default 0)
:key block_name: Block Name (Required)
:key origin_site_name: Origin Site Name (Required)
"""
return self.__callServer("blocks", data=blockObj, callmethod='POST' )
def insertBulkBlock(self, blockDump):
"""
API to insert a bulk block
:param blockDump: Output of the block dump command, example can be found in https://svnweb.cern.ch/trac/CMSDMWM/browser/DBS/trunk/Client/tests/dbsclient_t/unittests/blockdump.dict
:type blockDump: dict
"""
#We first check if the first lumi section has event_count or not
frst = True
if len(blockDump['files']) > 0:
fileLumiList = blockDump['files'][0]['file_lumi_list']
if len(fileLumiList) > 0:
if isinstance(fileLumiList, dict) and fileLumiList.get('event_count') == None:
frst = False
elif isinstance(fileLumiList, list):
flDict = fileLumiList[0]
if isinstance(flDict, dict) and flDict.get('event_count') == None:
frst = False
# when frst == True, we are looking for event_count == None in the data, if we did not find None (redFlg = False),
# eveything is good. Otherwise, we have to remove all even_count in lumis and raise exception.
# when frst == False, weare looking for event_count != None in the data, if we did not find Not None (redFlg = False), # everything is good. Otherwise, we have to remove all even_count in lumis and raise exception.
redFlag = False
if frst:
eventCT = (fl.get('event_count') is None for f in blockDump['files'] for fl in f['file_lumi_list'])
else:
eventCT = (fl.get('event_count') is not None for f in blockDump['files'] for fl in f['file_lumi_list'])
redFlag = any(eventCT)
if redFlag:
for f in blockDump['files']:
for fl in f['file_lumi_list']:
if 'event_count' in fl: del fl['event_count']
result = self.__callServer("bulkblocks", data=blockDump, callmethod='POST' )
if redFlag:
raise dbsClientException("Mixed event_count per lumi in the block: %s" %blockDump['block']['block_name'],
"The block was inserted into DBS, but you need to check if the data is valid.")
else:
return result
def insertDataset(self, datasetObj):
"""
API to insert a dataset in DBS
:param datasetObj: Dataset object
:type datasetObj: dict
:key primary_ds_name: Primary Dataset Name (Required)
:key dataset: Name of the dataset (Required)
:key dataset_access_type: Dataset Access Type (Required)
:key processed_ds_name: Processed Dataset Name (Required)
:key data_tier_name: Data Tier Name (Required)
:key acquisition_era_name: Acquisition Era Name (Required)
:key processing_version: Processing Version (Required)
:key physics_group_name: Physics Group Name (Optional, default None)
:key prep_id: ID of the Production and Reprocessing management tool (Optional, default None)
:key xtcrosssection: Xtcrosssection (Optional, default None)
:key output_configs: List(dict) with keys release_version, pset_hash, app_name, output_module_label and global tag
"""
return self.__callServer("datasets", data = datasetObj, callmethod='POST' )
def insertDataTier(self, dataTierObj):
"""
API to insert A Data Tier in DBS
:param dataTierObj: Data Tier object
:type dataTierObj: dict
:key data_tier_name: Data Tier that needs to be inserted
"""
return self.__callServer("datatiers", data = dataTierObj, callmethod='POST' )
def insertFiles(self, filesList, qInserts=False):
"""
API to insert a list of file into DBS in DBS. Up to 10 files can be inserted in one request.
:param qInserts: True means that inserts will be queued instead of done immediately. INSERT QUEUE Manager will perform the inserts, within few minutes.
:type qInserts: bool
:param filesList: List of dictionaries containing following information
:type filesList: list of dicts
:key logical_file_name: File to be inserted (str) (Required)
:key is_file_valid: (optional, default = 1): (bool)
:key block: required: /a/b/c#d (str)
:key dataset: required: /a/b/c (str)
:key file_type: (optional, default = EDM) one of the predefined types, (str)
:key check_sum: (optional, default = '-1') (str)
:key event_count: (optional, default = -1) (int)
:key file_size: (optional, default = -1.) (float)
:key adler32: (optional, default = '') (str)
:key md5: (optional, default = '') (str)
:key auto_cross_section: (optional, default = -1.) (float)
:key file_lumi_list: (optional, default = []) [{'run_num': 123, 'lumi_section_num': 12},{}....]
:key file_parent_list: (optional, default = []) [{'file_parent_lfn': 'mylfn'},{}....]
:key file_assoc_list: (optional, default = []) [{'file_parent_lfn': 'mylfn'},{}....]
:key file_output_config_list: (optional, default = []) [{'app_name':..., 'release_version':..., 'pset_hash':...., output_module_label':...},{}.....]
"""
if not qInserts: #turn off qInserts
return self.__callServer("files", params={'qInserts': qInserts}, data=filesList, callmethod='POST' )
return self.__callServer("files", data=filesList, callmethod='POST' )
def insertOutputConfig(self, outputConfigObj):
"""
API to insert An OutputConfig in DBS
:param outputConfigObj: Output Config object
:type outputConfigObj: dict
:key app_name: App Name (Required)
:key release_version: Release Version (Required)
:key pset_hash: Pset Hash (Required)
:key output_module_label: Output Module Label (Required)
:key global_tag: Global Tag (Required)
:key scenario: Scenario (Optional, default is None)
:key pset_name: Pset Name (Optional, default is None)
"""
return self.__callServer("outputconfigs", data=outputConfigObj, callmethod='POST' )
def insertPrimaryDataset(self, primaryDSObj):
"""
API to insert A primary dataset in DBS
:param primaryDSObj: primary dataset object
:type primaryDSObj: dict
:key primary_ds_type: TYPE (out of valid types in DBS, MC, DATA) (Required)
:key primary_ds_name: Name of the primary dataset (Required)
"""
return self.__callServer("primarydatasets", data=primaryDSObj, callmethod='POST' )
def insertProcessingEra(self, procEraObj):
"""
API to insert A Processing Era in DBS
:param procEraObj: Processing Era object
:type procEraObj: dict
:key processing_version: Processing Version (Required)
:key description: Description (Optional)
"""
return self.__callServer("processingeras", data=procEraObj, callmethod='POST' )
def insertFileParents(self, fileParentObj):
"""
API to insert file parentage in DBS. It will also get the block parentage from file's and insert them at the same time.
All the child files are from the same block namded by block_name.
:param fileParentObj: file parent object
:type fileParentObj: dict
:key block_name: child block name (required)
:key child_parent_id_list: a list of [child_file_id, parent_file_id] pairs (required)
"""
return self.__callServer("fileparents", data=fileParentObj, callmethod='POST' )
def listParentDSTrio(self, **kwargs):
"""
API to list file parent dataset's trio (run, lumi and file) for the given block.
:param block_name: name of block who's parents dataset's trio needs to be found (Required)
:type block_name: str
:returns: List of dictionaries containing following keys return: [{f1: [{r1:(l1, l2, ...)}, {r2:(l20, l21, ...)}, ...]} ..., {fn: [{r10:(l1, l2, ...)}, {r20:(l20, l21, ...)}, ...]}]
:rtype: list of dicts
"""
validParameters = ['dataset']
requiredParameters = {'forced': ['dataset']}
checkInputParameter(method="listParentDSTrio", parameters=list(kwargs.keys()), validParameters=validParameters,
requiredParameters=requiredParameters)
return self.__callServer("parentDSTrio", params=kwargs, callmethod='GET', aggFunc=aggParentDSTrio)
def listBlockTrio(self, **kwargs):
"""
API to list the block's trio (run, lumi and file).
:param block_name: name of the block whose trio needed to be found (Required)
:type block_name: str
:param logical_file_name: if not all the file under the block needed, this lfn list gives the files that needs to find their trio (optional).
:type logical_file_name: list of string
:returns: List of dictionaries containing following return: [{f1: [{r1:(l1, l2, ...)}, {r2:(l20, l21, ...)}, ...]} ..., {fn: [{r10:(l1, l2, ...)}, {r20:(l20, l21, ...)}, ...]}]
:rtype: list of dicts
"""
validParameters = ['block_name']
requiredParameters = {'forced': ['block_name']}
checkInputParameter(method="listBlockTrio", parameters=list(kwargs.keys()), validParameters=validParameters,
requiredParameters=requiredParameters)
return self.__callServer("blockTrio", params=kwargs, callmethod='GET', aggFunc=aggBlockTrio)
def listFileParentsByLumi(self, **kwargs):
"""
API to list file parents using lumi section info.
:param block_name: name of block that has files who's parents needs to be found (Required)
:type block_name: str
:param logical_file_name: if not all the file parentages under the block needed, this lfn list gives the files that needs to find its parents(optional).
:type logical_file_name: list of string
:returns: List of dictionaries containing following keys [cid,pid]
:rtype: list of dicts
"""
validParameters = ['block_name', 'logical_file_name']
requiredParameters = {'forced': ['block_name']}
checkInputParameter(method="listFileParentsByLumi", parameters=list(kwargs.keys()), validParameters=validParameters,
requiredParameters=requiredParameters)
return self.__callServer("fileparentsbylumi", data=kwargs, callmethod='POST', aggFunc=aggFileParentsByLumi)
def listApis(self):
"""
API to retrieve list of APIs for DBS server
"""
return self.__callServer("help")
def listApiDocumentation(self):
"""
API to retrieve the auto-generated documentation page from server
"""
return self.__callServer(content="text/html")
def listAcquisitionEras(self, **kwargs):
"""
API to list all Acquisition Eras in DBS.
:param acquisition_era_name: Acquisition era name (Optional, wild cards allowed)
:type acquisition_era_name: str
:returns: List of dictionaries containing following keys (description, end_date, acquisition_era_name, create_by, creation_date and start_date)
:rtype: list of dicts
"""
validParameters = ['acquisition_era_name']
checkInputParameter(method="listAcquisitionEras", parameters=list(kwargs.keys()), validParameters=validParameters)
return self.__callServer("acquisitioneras", params=kwargs)
def listAcquisitionEras_ci(self, **kwargs):
"""
API to list all Acquisition Eras (case insensitive) in DBS.
:param acquisition_era_name: Acquisition era name (Optional, wild cards allowed)
:type acquisition_era_name: str
:returns: List of dictionaries containing following keys (description, end_date, acquisition_era_name, create_by, creation_date and start_date)
:rtype: list of dicts
"""
validParameters = ['acquisition_era_name']
checkInputParameter(method="listAcquisitionEras", parameters=list(kwargs.keys()), validParameters=validParameters)
return self.__callServer("acquisitioneras_ci", params=kwargs)
def listBlockChildren(self, **kwargs):
"""
API to list block children.
:param block_name: name of block who's children needs to be found (Required)
:type block_name: str
:returns: List of dictionaries containing following keys (block_name)
:rtype: list of dicts
"""
validParameters = ['block_name']
requiredParameters = {'forced': validParameters}
checkInputParameter(method="listBlockChildren", parameters=list(kwargs.keys()), validParameters=validParameters,
requiredParameters=requiredParameters)
return self.__callServer("blockchildren", params=kwargs)
def listBlockParents(self, **kwargs):
"""
API to list block parents.
:param block_name: name of block who's parents needs to be found (Required)
:type block_name: str
:returns: List of dictionaries containing following keys (block_name)
:rtype: list of dicts
"""
validParameters = ['block_name']
requiredParameters = {'forced': validParameters}
checkInputParameter(method="listBlockParents", parameters=list(kwargs.keys()), validParameters=validParameters,
requiredParameters=requiredParameters)
if isinstance(kwargs["block_name"], list):
return self.__callServer("blockparents", data=kwargs, callmethod='POST')
else:
return self.__callServer("blockparents", params=kwargs)
def listBlocks(self, **kwargs):
"""
API to list a block in DBS. At least one of the parameters block_name, dataset, data_tier_name or
logical_file_name are required. If data_tier_name is provided, min_cdate and max_cdate have to be specified and
the difference in time have to be less than 31 days.
:param block_name: name of the block
:type block_name: str
:param dataset: dataset
:type dataset: str
:param data_tier_name: data tier
:type data_tier_name: str
:param logical_file_name: Logical File Name
:type logical_file_name: str
:param origin_site_name: Origin Site Name (Optional)
:type origin_site_name: str
:param run_num: run numbers (Optional). Possible format: run_num, "run_min-run_max", or ["run_min-run_max", run1, run2, ...]
:type run_num: int, list of runs or list of run ranges
:param min_cdate: Lower limit for the creation date (unixtime) (Optional)
:type min_cdate: int, str
:param max_cdate: Upper limit for the creation date (unixtime) (Optional)
:type max_cdate: int, str
:param min_ldate: Lower limit for the last modification date (unixtime) (Optional)
:type min_ldate: int, str
:param max_ldate: Upper limit for the last modification date (unixtime) (Optional)
:type max_ldate: int, str
:param cdate: creation date (unixtime) (Optional)
:type cdate: int, str
:param ldate: last modification date (unixtime) (Optional)
:type ldate: int, str
:param detail: Get detailed information of a block (Optional)
:type detail: bool
:returns: List of dictionaries containing following keys (block_name). If option detail is used the dictionaries contain the following keys (block_id, create_by, creation_date, open_for_writing, last_modified_by, dataset, block_name, file_count, origin_site_name, last_modification_date, dataset_id and block_size)
:rtype: list of dicts
"""
validParameters = ['dataset', 'block_name', 'data_tier_name', 'origin_site_name',
'logical_file_name', 'run_num', 'open_for_writing', 'min_cdate',
'max_cdate', 'min_ldate', 'max_ldate',
'cdate', 'ldate', 'detail']
#requiredParameters = {'multiple': validParameters}
requiredParameters = {'multiple': ['dataset', 'block_name', 'data_tier_name', 'logical_file_name']}
#set defaults
if 'detail' not in list(kwargs.keys()):
kwargs['detail'] = False
checkInputParameter(method="listBlocks", parameters=list(kwargs.keys()), validParameters=validParameters,
requiredParameters=requiredParameters)
return self.__callServer("blocks", params=kwargs)
def listBlockSummaries_doc(self, **kwargs):
"""
API that returns summary information like total size and total number of events in a dataset or a list of blocks
:param block_name: list block summaries for block_name(s)
:type block_name: str, list
:param dataset: list block summaries for all blocks in dataset
:type dataset: str
:param detail: list block summary by block names if detail=True, default=False
:type detail: str, bool
:returns: list of dicts containing total block_sizes, file_counts and event_counts of dataset or blocks provided
"""
pass
@split_calls
def listBlockSummaries(self, **kwargs):
"""
API that returns summary information like total size and total number of events in a dataset or a list of blocks
:param block_name: list block summaries for block_name(s)
:type block_name: str, list
:param dataset: list block summaries for all blocks in dataset
:type dataset: str
:param detail: list block summary by block names if detail=True, default=False
:type detail: str, bool
:returns: list of dicts containing total block_sizes, file_counts and event_counts of dataset or blocks provided
"""
validParameters = ['block_name', 'dataset', 'detail']
requiredParameters = {'standalone': ['block_name', 'dataset']}
checkInputParameter(method="listBlockSummaries", parameters=list(kwargs.keys()), validParameters=validParameters,
requiredParameters=requiredParameters)