Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

extend opt. max-tf-per-file to raw-tf reader, move it to device #13707

Merged
merged 1 commit into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Detectors/CTF/workflow/src/CTFReaderSpec.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ void CTFReaderSpec::init(InitContext& ic)
mUseLocalTFCounter = ic.options().get<bool>("local-tf-counter");
mImposeRunStartMS = ic.options().get<int64_t>("impose-run-start-timstamp");
mInput.checkTFLimitBeforeReading = ic.options().get<bool>("limit-tf-before-reading");
mInput.maxTFs = ic.options().get<int>("max-tf");
mInput.maxTFs = mInput.maxTFs > 0 ? mInput.maxTFs : 0x7fffffff;
mInput.maxTFsPerFile = ic.options().get<int>("max-tf-per-file");
mInput.maxTFsPerFile = mInput.maxTFsPerFile > 0 ? mInput.maxTFsPerFile : 0x7fffffff;
mRunning = true;
mFileFetcher = std::make_unique<o2::utils::FileFetcher>(mInput.inpdata, mInput.tffileRegex, mInput.remoteRegex, mInput.copyCmd);
mFileFetcher->setMaxFilesInQueue(mInput.maxFileCache);
Expand Down Expand Up @@ -474,6 +478,9 @@ DataProcessorSpec getCTFReaderSpec(const CTFReaderInp& inp)
options.emplace_back(ConfigParamSpec{"local-tf-counter", VariantType::Bool, false, {"reassign header.tfCounter from local TF counter"}});
options.emplace_back(ConfigParamSpec{"fetch-failure-threshold", VariantType::Float, 0.f, {"Fail if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
options.emplace_back(ConfigParamSpec{"limit-tf-before-reading", VariantType::Bool, false, {"Check TF limiting before reading new TF, otherwhise before injecting it"}});
options.emplace_back(ConfigParamSpec{"max-tf", VariantType::Int, -1, {"max CTFs to process (<= 0 : infinite)"}});
options.emplace_back(ConfigParamSpec{"max-tf-per-file", VariantType::Int, -1, {"max TFs to process per ctf file (<= 0 : infinite)"}});

if (!inp.metricChannel.empty()) {
options.emplace_back(ConfigParamSpec{"channel-config", VariantType::String, inp.metricChannel, {"Out-of-band channel config for TF throttling"}});
}
Expand Down
7 changes: 0 additions & 7 deletions Detectors/CTF/workflow/src/ctf-reader-workflow.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
options.push_back(ConfigParamSpec{"ctf-input", VariantType::String, "none", {"comma-separated list CTF input files"}});
options.push_back(ConfigParamSpec{"onlyDet", VariantType::String, std::string{DetID::ALL}, {"comma-separated list of detectors to accept. Overrides skipDet"}});
options.push_back(ConfigParamSpec{"skipDet", VariantType::String, std::string{DetID::NONE}, {"comma-separate list of detectors to skip"}});
options.push_back(ConfigParamSpec{"max-tf", VariantType::Int, -1, {"max CTFs to process (<= 0 : infinite)"}});
options.push_back(ConfigParamSpec{"max-tf-per-file", VariantType::Int, -1, {"max TFs to process per ctf file (<= 0 : infinite)"}});
options.push_back(ConfigParamSpec{"loop", VariantType::Int, 0, {"loop N times (infinite for N<0)"}});
options.push_back(ConfigParamSpec{"delay", VariantType::Float, 0.f, {"delay in seconds between consecutive TFs sending"}});
options.push_back(ConfigParamSpec{"copy-cmd", VariantType::String, "alien_cp ?src file://?dst", {"copy command for remote files or no-copy to avoid copying"}}); // Use "XrdSecPROTOCOL=sss,unix xrdcp -N root://eosaliceo2.cern.ch/?src ?dst" for direct EOS access
Expand Down Expand Up @@ -117,11 +115,6 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)
if (ctfInput.delay_us < 0) {
ctfInput.delay_us = 0;
}
int n = configcontext.options().get<int>("max-tf");
ctfInput.maxTFs = n > 0 ? n : 0x7fffffff;

n = configcontext.options().get<int>("max-tf-per-file");
ctfInput.maxTFsPerFile = n > 0 ? n : 0x7fffffff;

ctfInput.maxFileCache = std::max(1, configcontext.options().get<int>("max-cached-files"));

Expand Down
23 changes: 18 additions & 5 deletions Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ TFReaderSpec::TFReaderSpec(const TFReaderInp& rinp) : mInput(rinp)
void TFReaderSpec::init(o2f::InitContext& ic)
{
mInput.tfIDs = o2::RangeTokenizer::tokenize<int>(ic.options().get<std::string>("select-tf-ids"));
mInput.maxTFs = ic.options().get<int>("max-tf");
mInput.maxTFs = mInput.maxTFs > 0 ? mInput.maxTFs : 0x7fffffff;
mInput.maxTFsPerFile = ic.options().get<int>("max-tf-per-file");
mInput.maxTFsPerFile = mInput.maxTFsPerFile > 0 ? mInput.maxTFsPerFile : 0x7fffffff;
mInput.maxTFCache = std::max(1, ic.options().get<int>("max-cached-tf"));
mInput.maxFileCache = std::max(1, ic.options().get<int>("max-cached-files"));
mFileFetcher = std::make_unique<o2::utils::FileFetcher>(mInput.inpdata, mInput.tffileRegex, mInput.remoteRegex, mInput.copyCmd);
mFileFetcher->setMaxFilesInQueue(mInput.maxFileCache);
mFileFetcher->setMaxLoops(mInput.maxLoops);
Expand Down Expand Up @@ -417,15 +423,17 @@ void TFReaderSpec::TFBuilder()
}
mTFBuilderCounter++;
}
if (!acceptTF) {
continue;
}
if (mRunning && tf) {
mWaitSendingLast = true;
mTFQueue.push(std::move(tf));
if (acceptTF) {
mWaitSendingLast = true;
mTFQueue.push(std::move(tf));
}
} else {
break;
}
if (mInput.maxTFsPerFile > 0 && mInput.maxTFsPerFile >= locID) { // go to next file
break;
}
}
// remove already processed file from the queue, unless they are needed for further looping
if (mFileFetcher) {
Expand Down Expand Up @@ -527,6 +535,11 @@ o2f::DataProcessorSpec o2::rawdd::getTFReaderSpec(o2::rawdd::TFReaderInp& rinp)
}
spec.options.emplace_back(o2f::ConfigParamSpec{"select-tf-ids", o2f::VariantType::String, "", {"comma-separated list TF IDs to inject (from cumulative counter of TFs seen)"}});
spec.options.emplace_back(o2f::ConfigParamSpec{"fetch-failure-threshold", o2f::VariantType::Float, 0.f, {"Fatil if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf", o2f::VariantType::Int, -1, {"max TF ID to process (<= 0 : infinite)"}});
spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf-per-file", o2f::VariantType::Int, -1, {"max TFs to process per raw-tf file (<= 0 : infinite)"}});
spec.options.emplace_back(o2f::ConfigParamSpec{"max-cached-tf", o2f::VariantType::Int, 3, {"max TFs to cache in memory"}});
spec.options.emplace_back(o2f::ConfigParamSpec{"max-cached-files", o2f::VariantType::Int, 3, {"max TF files queued (copied for remote source)"}});

spec.algorithm = o2f::adaptFromTask<TFReaderSpec>(rinp);

return spec;
Expand Down
1 change: 1 addition & 0 deletions Detectors/Raw/TFReaderDD/src/TFReaderSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct TFReaderInp {
int64_t delay_us = 0;
int maxLoops = 0;
int maxTFs = -1;
int maxTFsPerFile = -1;
bool sendDummyForMissing = true;
bool sup0xccdb = false;
std::vector<o2::header::DataHeader> hdVec;
Expand Down
8 changes: 0 additions & 8 deletions Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,16 @@ void customize(std::vector<ConfigParamSpec>& workflowOptions)
options.push_back(ConfigParamSpec{"onlyDet", VariantType::String, "all", {"list of dectors"}});
options.push_back(ConfigParamSpec{"raw-only-det", VariantType::String, "none", {"do not open non-raw channel for these detectors"}});
options.push_back(ConfigParamSpec{"non-raw-only-det", VariantType::String, "none", {"do not open raw channel for these detectors"}});
options.push_back(ConfigParamSpec{"max-tf", VariantType::Int, -1, {"max TF ID to process (<= 0 : infinite)"}});
options.push_back(ConfigParamSpec{"loop", VariantType::Int, 0, {"loop N times (-1 = infinite)"}});
options.push_back(ConfigParamSpec{"delay", VariantType::Float, 0.f, {"delay in seconds between consecutive TFs sending"}});
options.push_back(ConfigParamSpec{"copy-cmd", VariantType::String, "alien_cp ?src file://?dst", {"copy command for remote files"}}); // Use "XrdSecPROTOCOL=sss,unix xrdcp -N root://eosaliceo2.cern.ch/?src ?dst" for direct EOS access
options.push_back(ConfigParamSpec{"tf-file-regex", VariantType::String, ".+\\.tf$", {"regex string to identify TF files"}});
options.push_back(ConfigParamSpec{"remote-regex", VariantType::String, "^(alien://|)/alice/data/.+", {"regex string to identify remote files"}}); // Use "^/eos/aliceo2/.+" for direct EOS access
options.push_back(ConfigParamSpec{"max-cached-tf", VariantType::Int, 3, {"max TFs to cache in memory"}});
options.push_back(ConfigParamSpec{"max-cached-files", VariantType::Int, 3, {"max TF files queued (copied for remote source)"}});
options.push_back(ConfigParamSpec{"tf-reader-verbosity", VariantType::Int, 0, {"verbosity level (1 or 2: check RDH, print DH/DPH for 1st or all slices, >2 print RDH)"}});
options.push_back(ConfigParamSpec{"raw-channel-config", VariantType::String, "", {"optional raw FMQ channel for non-DPL output"}});
options.push_back(ConfigParamSpec{"send-diststf-0xccdb", VariantType::Bool, false, {"send explicit FLP/DISTSUBTIMEFRAME/0xccdb output"}});
options.push_back(ConfigParamSpec{"disable-dummy-output", VariantType::Bool, false, {"Disable sending empty output if corresponding data is not found in the data"}});
options.push_back(ConfigParamSpec{"configKeyValues", VariantType::String, "", {"semicolon separated key=value strings"}});

options.push_back(ConfigParamSpec{"timeframes-shm-limit", VariantType::String, "0", {"Minimum amount of SHM required in order to publish data"}});
options.push_back(ConfigParamSpec{"metric-feedback-channel-format", VariantType::String, "name=metric-feedback,type=pull,method=connect,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0", {"format for the metric-feedback channel for TF rate limiting"}});

Expand All @@ -59,8 +55,6 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)
o2::rawdd::TFReaderInp rinp;
rinp.inpdata = configcontext.options().get<std::string>("input-data");
rinp.maxLoops = configcontext.options().get<int>("loop");
int n = configcontext.options().get<int>("max-tf");
rinp.maxTFs = n > 0 ? n : 0x7fffffff;
auto detlistSelect = configcontext.options().get<std::string>("onlyDet");
if (detlistSelect == "all") {
// Exclude FOCAL from default detlist (must be selected on request)
Expand All @@ -74,8 +68,6 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)
rinp.rawChannelConfig = configcontext.options().get<std::string>("raw-channel-config");
rinp.delay_us = uint64_t(1e6 * configcontext.options().get<float>("delay")); // delay in microseconds
rinp.verbosity = configcontext.options().get<int>("tf-reader-verbosity");
rinp.maxTFCache = std::max(1, configcontext.options().get<int>("max-cached-tf"));
rinp.maxFileCache = std::max(1, configcontext.options().get<int>("max-cached-files"));
rinp.copyCmd = configcontext.options().get<std::string>("copy-cmd");
rinp.tffileRegex = configcontext.options().get<std::string>("tf-file-regex");
rinp.remoteRegex = configcontext.options().get<std::string>("remote-regex");
Expand Down
Loading