Skip to content

Commit

Permalink
Merge pull request #38 from germanfgv/skippedStream
Browse files Browse the repository at this point in the history
Adds skipped_streamers table to T0 Data Service
  • Loading branch information
LinaresToine authored Jul 11, 2023
2 parents 6ce1e27 + 1a700d4 commit 2b23419
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 1 deletion.
8 changes: 8 additions & 0 deletions etc/OracleSchema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ CREATE TABLE promptreco_status (
);
INSERT INTO promptreco_status (status, change_time) VALUES (1, CURRENT_TIMESTAMP);

CREATE TABLE skipped_streamers (
run int not null,
stream varchar2(255) not null,
lumi int not null,
events int not null,
primary key (run, stream, lumi)
) ORGANIZATION INDEX;

CREATE TABLE FILE_TRANSFER_STATUS_OFFLINE (
P5_FILEID NUMBER(27) NOT NULL,
FILENAME VARCHAR2(1000) NOT NULL,
Expand Down
5 changes: 4 additions & 1 deletion src/python/Data.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from T0WmaDataSvc.DataPromptRecoStatus import *
from T0WmaDataSvc.DataDatasetLocked import *
from T0WmaDataSvc.DataRepackStats import *
from T0WmaDataSvc.DataRunStreamSkippedLumis import *


class Data(DatabaseRESTApi):
"""Server object for REST data access API."""
Expand All @@ -31,5 +33,6 @@ def __init__(self, app, config, mount):
"run_dataset_done": RunDatasetDone(app,self, config, mount),
"dataset_locked": DatasetLocked(app, self, config, mount),
"promptreco_status": PromptRecoStatus(app, self, config, mount),
"repack_stats": RepackStats(app, self, config, mount)
"repack_stats": RepackStats(app, self, config, mount),
"skipped_streamers": RunStreamSkippedLumis(app, self, config, mount)
})
53 changes: 53 additions & 0 deletions src/python/DataRunStreamSkippedLumis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from WMCore.REST.Server import RESTEntity, restcall, rows
from WMCore.REST.Tools import tools
from WMCore.REST.Validation import *
from WMCore.REST.Format import JSONFormat, PrettyJSONFormat
from T0WmaDataSvc.Regexps import *
from operator import itemgetter

class RunStreamSkippedLumis(RESTEntity):
"""REST entity for retrieving run/stream/lumis that have been skipped during processing."""
def validate(self, apiobj, method, api, param, safe):
"""Validate request input data."""
validate_str('run', param, safe, RX_RUN, optional = True)
validate_str('stream', param, safe, RX_STREAM, optional = True)

@restcall(formats=[('text/plain', PrettyJSONFormat()), ('application/json', JSONFormat())])
@tools.expires(secs=300)
def get(self,run, stream):
"""Retrieve run/stream/lumis that have been skipped during processing. If no run or stream is
specified, returns the latest 100 skipped runs
:arg int run: the run number
:arg str primary_dataset: the primary dataset name (optional, otherwise queries for all)
:returns: list of skipped lumis in each run/stream combination, and number of events in the lumi"""

sql = """SELECT run, stream, lumi, events
FROM skipped_streamers
WHERE run is not null"""
sqlWithRun = " AND skipped_streamers.run = :run"
sqlWithStream = " AND skipped_streamers.stream = :stream"
sqlOrder = " ORDER BY run desc, stream asc, lumi asc"
sqlLimit = " FETCH FIRST 100 ROWS ONLY"

binds = {}
if run is not None:
sql += sqlWithRun
binds.update({"run":run})
if stream is not None:
sql += sqlWithStream
binds.update({"stream":stream})
sql += sqlOrder
if run is None:
sql += sqlLimit

c, _ = self.api.execute(sql, binds)
results=c.fetchall()

runs={}
for run, stream, lumi, events in results:
runDict=runs.setdefault(run,{})
streamDict=runDict.setdefault(stream,{})
streamDict[lumi]=events

return [runs]

0 comments on commit 2b23419

Please sign in to comment.