Skip to content

Commit

Permalink
Use new throttler infra in key places
Browse files Browse the repository at this point in the history
Summary: This diff takes advantage of the new throttler infra to avoid spamming logs when problematic files are read.

Reviewed By: kiminoue7

Differential Revision: D53194749

fbshipit-source-id: 093ee8887444ba0e6c8a6724fa8d4237594913a0
  • Loading branch information
Georges Berenger authored and facebook-github-bot committed Jan 30, 2024
1 parent 51f2e58 commit 154df37
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 54 deletions.
15 changes: 13 additions & 2 deletions vrs/ContentBlockReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,22 @@
#include <logging/Verify.h>

#include <vrs/helpers/FileMacros.h>
#include <vrs/helpers/Throttler.h>
#include <vrs/os/CompilerAttributes.h>

#include "RecordFormatStreamPlayer.h"
#include "RecordReaders.h"

using namespace std;

namespace {

vrs::utils::Throttler& getThrottler() {
static vrs::utils::Throttler sThrottler;
return sThrottler;
}

} // namespace
namespace vrs {

namespace {
Expand All @@ -47,7 +56,8 @@ bool mayUsePastConfigurationReader(
? "no configuration record was read prior to reading this"
: "the most recent configuration record read for this stream has a newer"
" timestamp than this";
XR_LOGW(
THROTTLED_LOGW(
record.fileReader,
"Can't define the {} block format for {} to read this {} block with DataLayout. "
"This might be happening, because the {} format is defined in a configuration record using "
"datalayout conventions, but {} {} record.",
Expand Down Expand Up @@ -170,7 +180,8 @@ bool AudioBlockReader::readAudioContentBlock(
if (remainingBlockSize == ContentBlock::kSizeUnknown || remainingBlockSize == expectedSize) {
return player.onAudioRead(record, blockIndex_, contentBlock);
}
XR_LOGW(
THROTTLED_LOGW(
record.fileReader,
"Non-matching audio block size, got {} bytes, expected {} bytes.",
remainingBlockSize,
expectedSize);
Expand Down
51 changes: 35 additions & 16 deletions vrs/Decompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,23 @@
#include <logging/Log.h>

#include <vrs/helpers/FileMacros.h>
#include <vrs/helpers/Throttler.h>

#include "ErrorCode.h"
#include "FileHandler.h"
#include "Record.h"

using namespace std;

namespace {

vrs::utils::Throttler& getThrottler() {
static vrs::utils::Throttler sThrottler;
return sThrottler;
}

} // namespace

namespace vrs {

static size_t kMaxInputBufferSize = 2 * 1024 * 1024; // 2 MB
Expand Down Expand Up @@ -169,21 +179,29 @@ void Decompressor::reset() {
#define DECOMPRESSION_ERROR(code__, name__) \
domainErrorCode(ErrorDomain::ZstdDecompressionErrorDomain, code__, name__)

#define IF_DECOMP_ERROR_LOG_AND_RETURN(operation__) \
do { \
zresult = operation__; \
if (ZSTD_isError(zresult)) { \
const char* errorName = ZSTD_getErrorName(zresult); \
XR_LOGE("{} failed: {}, {}", #operation__, zresult, errorName); \
return DECOMPRESSION_ERROR(zresult, errorName); \
} \
#define IF_DECOMP_ERROR_LOG_AND_RETURN(operation__) \
do { \
zresult = operation__; \
if (ZSTD_isError(zresult)) { \
const char* errorName = ZSTD_getErrorName(zresult); \
THROTTLED_LOGE(zstdContext_.get(), "{} failed: {}, {}", #operation__, zresult, errorName); \
return DECOMPRESSION_ERROR(zresult, errorName); \
} \
} while (false)

#define READ_OR_LOG_AND_RETURN(size__) \
do { \
size_t readSize__ = min(min<size_t>(size__, inOutMaxReadSize), kMaxInputBufferSize); \
IF_ERROR_LOG_AND_RETURN(file.read(allocateCompressedDataBuffer(readSize__), readSize__)); \
inOutMaxReadSize -= readSize__; \
#define READ_OR_LOG_AND_RETURN(size__) \
do { \
size_t readSize__ = min(min<size_t>(size__, inOutMaxReadSize), kMaxInputBufferSize); \
int operationError_ = file.read(allocateCompressedDataBuffer(readSize__), readSize__); \
if (operationError_ != 0) { \
THROTTLED_LOGW( \
zstdContext_.get(), \
"file.read() failed: {}, {}", \
operationError_, \
errorCodeToMessage(operationError_)); \
return operationError_; \
} \
inOutMaxReadSize -= readSize__; \
} while (false)

int Decompressor::initFrame(FileHandler& file, size_t& outFrameSize, size_t& inOutMaxReadSize) {
Expand Down Expand Up @@ -215,7 +233,8 @@ int Decompressor::readFrame(
do {
if (getCompressedDataSize() == 0 && zresult > 0) {
if (inOutMaxReadSize == 0) {
XR_LOGW("Decompression error: {} more input bytes needed", zresult);
THROTTLED_LOGW(
zstdContext_.get(), "Decompression error: {} more input bytes needed", zresult);
return NOT_ENOUGH_DATA;
}
READ_OR_LOG_AND_RETURN(zresult);
Expand All @@ -235,7 +254,7 @@ int Decompressor::decompress(void* destination, uint32_t destinationSize, uint32
lastResult_ = lz4Context_->decompress(
destination, &writtenSize, compressedBuffer_.data() + decodedSize_, &sourceSize);
if (LZ4F_isError(lastResult_)) {
XR_LOGE("Decompression error {}", LZ4F_getErrorName(lastResult_));
THROTTLED_LOGW(lz4Context_.get(), "Decompression error {}", LZ4F_getErrorName(lastResult_));
return domainErrorCode(
ErrorDomain::Lz4DecompressionErrorDomain, lastResult_, LZ4F_getErrorName(lastResult_));
}
Expand All @@ -250,7 +269,7 @@ int Decompressor::decompress(void* destination, uint32_t destinationSize, uint32
destinationSize,
outReadSize);
if (ZSTD_isError(lastResult_)) {
XR_LOGE("Decompression error {}", ZSTD_getErrorName(lastResult_));
THROTTLED_LOGW(lz4Context_.get(), "Decompression error {}", ZSTD_getErrorName(lastResult_));
return domainErrorCode(
ErrorDomain::ZstdDecompressionErrorDomain, lastResult_, ZSTD_getErrorName(lastResult_));
}
Expand Down
5 changes: 5 additions & 0 deletions vrs/RecordReaders.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ class RecordReader {
return remainingDiskBytes_ != remainingUncompressedSize_;
}

// for warning/error throttling only
const void* getRef() const {
return file_;
}

protected:
FileHandler* file_;
uint32_t remainingDiskBytes_;
Expand Down
24 changes: 20 additions & 4 deletions vrs/utils/AudioExtractor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,19 @@
#include <vrs/RecordReaders.h>
#include <vrs/helpers/Endian.h>
#include <vrs/helpers/FileMacros.h>
#include <vrs/helpers/Throttler.h>
#include <vrs/os/Utils.h>

using namespace std;
using namespace vrs;

namespace {

utils::Throttler& getThrottler() {
static utils::Throttler sThrottler;
return sThrottler;
}

template <class T>
inline void writeHeader(void* p, const T t) {
memcpy(p, &t, sizeof(T));
Expand Down Expand Up @@ -155,7 +161,8 @@ bool AudioExtractor::onAudioRead(
audio_.resize(audioBlock.getBlockSize());
int readStatus = record.reader->read(audio_);
if (readStatus != 0) {
XR_LOGW(
THROTTLED_LOGW(
record.fileReader,
"{} - {} record @ {}: Failed read audio data ({}).",
record.streamId.getNumericName(),
toString(record.recordType),
Expand Down Expand Up @@ -207,9 +214,14 @@ bool AudioExtractor::onAudioRead(
double expectedTime =
static_cast<double>(segmentSamplesCount_) / currentAudioContentBlockSpec_.getSampleRate();
if (actualTime - expectedTime > kMaxJitter) {
XR_LOGW("Audio block at {:.3f} {:.3f} ms late.", actualTime, actualTime - expectedTime);
THROTTLED_LOGW(
record.fileReader,
"Audio block at {:.3f} {:.3f} ms late.",
actualTime,
actualTime - expectedTime);
} else if (expectedTime - actualTime > kMaxJitter) {
XR_LOGW(
THROTTLED_LOGW(
record.fileReader,
"Audio block at {:.3f} {:.3f} ms, {:.2f}% early.",
expectedTime,
expectedTime - actualTime,
Expand All @@ -229,7 +241,11 @@ bool AudioExtractor::onUnsupportedBlock(
const ContentBlock& cb) {
// the audio was not decoded ... not sure why?
if (cb.getContentType() == ContentType::AUDIO) {
XR_LOGW("Audio block skipped for {}, content: {}", record.streamId.getName(), cb.asString());
THROTTLED_LOGW(
record.fileReader,
"Audio block skipped for {}, content: {}",
record.streamId.getName(),
cb.asString());
}
return false;
}
Expand Down
20 changes: 16 additions & 4 deletions vrs/utils/DataExtractor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include <vrs/helpers/FileMacros.h>
#include <vrs/helpers/Rapidjson.hpp>
#include <vrs/helpers/Throttler.h>
#include <vrs/os/Utils.h>
#include <vrs/utils/PixelFrame.h>
#include <vrs/utils/RecordFileInfo.h>
Expand All @@ -34,11 +35,20 @@ const char* kReadMeContent =
#include "DataExtractorReadMe.hpp"
;

namespace vrs::utils {

using namespace std;
using namespace vrs;

namespace {

utils::Throttler& getThrottler() {
static utils::Throttler sThrottler;
return sThrottler;
}

} // namespace

namespace vrs::utils {

DataExtractor::DataExtractorStreamPlayer::DataExtractorStreamPlayer(
ofstream& output,
string outputFolder)
Expand Down Expand Up @@ -139,7 +149,8 @@ bool DataExtractor::DataExtractorStreamPlayer::onImageRead(
return true;
}
}
XR_LOGW(
THROTTLED_LOGW(
record.fileReader,
"Could not convert image for {}, format: {}",
record.streamId.getName(),
imageBlock.asString());
Expand Down Expand Up @@ -182,7 +193,8 @@ bool DataExtractor::DataExtractorStreamPlayer::onUnsupportedBlock(
const CurrentRecord& record,
size_t,
const ContentBlock& contentBlock) {
XR_LOGW(
THROTTLED_LOGW(
record.fileReader,
"Unsupported block: {} {} @ {:.6f}: {}, {}.",
record.streamId.getNumericName(),
toString(record.recordType),
Expand Down
18 changes: 16 additions & 2 deletions vrs/utils/FilterCopyHelpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,20 @@

#include <vrs/RecordFileReader.h>
#include <vrs/RecordFileWriter.h>
#include <vrs/helpers/Throttler.h>

using namespace std;
using namespace vrs;

namespace {

utils::Throttler& getThrottler() {
static utils::Throttler sThrottler;
return sThrottler;
}

} // namespace

namespace vrs::utils {

const Record* Writer::createStateRecord() {
Expand Down Expand Up @@ -128,7 +138,7 @@ ContentBlockChunk::ContentBlockChunk(const ContentBlock& contentBlock, const Cur
contentBlock_{contentBlock} {
int status = record.reader->read(getBuffer());
if (status != 0) {
XR_LOGW("Failed to read image block: {}", errorCodeToMessage(status));
THROTTLED_LOGW(record.fileReader, "Failed to read image block: {}", errorCodeToMessage(status));
}
}

Expand Down Expand Up @@ -249,7 +259,11 @@ bool RecordFilterCopier::onUnsupportedBlock(
unique_ptr<ContentChunk> bufferSourceChunk = make_unique<ContentChunk>(blockSize);
int status = record.reader->read(bufferSourceChunk->getBuffer());
if (status != 0) {
XR_LOGW("Failed to read {} block: {}", cb.asString(), errorCodeToMessage(status));
THROTTLED_LOGW(
record.fileReader,
"Failed to read {} block: {}",
cb.asString(),
errorCodeToMessage(status));
}
chunks_.emplace_back(std::move(bufferSourceChunk));
return readNext;
Expand Down
32 changes: 19 additions & 13 deletions vrs/utils/ImageExtractor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@
#include <logging/Log.h>
#include <logging/Verify.h>

#include <vrs/helpers/Throttler.h>

using namespace std;
using namespace vrs;
using namespace utils;
using namespace vrs::utils;

namespace vrs {
namespace utils {
namespace vrs::utils {

string ImageNamer::namePngImage(StreamId id, uint32_t imageCounter, double timestamp) {
return fmt::format("{}-{:05}-{:.3f}.png", id.getNumericName(), imageCounter, timestamp);
Expand Down Expand Up @@ -81,11 +82,15 @@ string ImageNamer::getRawImageFormatAsString(const ImageContentBlockSpec& imageS
return fmt::format("{}.{}", filenamePostfix, extension);
}

} // namespace utils
} // namespace vrs
} // namespace vrs::utils

namespace {

Throttler& getThrottler() {
static Throttler sThrottler;
return sThrottler;
}

const bool kSupportGrey16Export = true;

bool writeRawImage(const string& path, const vector<uint8_t>& imageData) {
Expand Down Expand Up @@ -182,8 +187,7 @@ ImageNamer& getDefaultImageNamer() {

} // namespace

namespace vrs {
namespace utils {
namespace vrs::utils {

ImageExtractor::ImageExtractor(const string& folderPath, uint32_t& counter, bool extractImagesRaw)
: ImageExtractor(getDefaultImageNamer(), folderPath, counter, extractImagesRaw) {}
Expand Down Expand Up @@ -233,7 +237,8 @@ bool ImageExtractor::onImageRead(const CurrentRecord& record, size_t, const Cont
imageData.resize(ib.getBlockSize());
int readStatus = record.reader->read(imageData.data(), ib.getBlockSize());
if (readStatus != 0) {
XR_LOGW(
THROTTLED_LOGW(
record.fileReader,
"{} - {} record @ {}: Failed read image data ({}).",
id.getNumericName(),
toString(record.recordType),
Expand All @@ -246,18 +251,19 @@ bool ImageExtractor::onImageRead(const CurrentRecord& record, size_t, const Cont
writeRawImage(filepath, imageData);
return true;
}
XR_LOGW("Could not convert image for {}, format: {}", id.getName(), ib.asString());
THROTTLED_LOGW(
record.fileReader, "Could not convert image for {}, format: {}", id.getName(), ib.asString());
return false;
}

bool ImageExtractor::onUnsupportedBlock(const CurrentRecord& r, size_t, const ContentBlock& cb) {
bool ImageExtractor::onUnsupportedBlock(const CurrentRecord& rec, size_t, const ContentBlock& cb) {
// the image was not decoded, probably because the image spec are incomplete
if (cb.getContentType() == ContentType::IMAGE) {
imageCounter_++;
XR_LOGW("Image skipped for {}, content: {}", r.streamId.getName(), cb.asString());
THROTTLED_LOGW(
rec.fileReader, "Image skipped for {}, content: {}", rec.streamId.getName(), cb.asString());
}
return false;
}

} // namespace utils
} // namespace vrs
} // namespace vrs::utils
Loading

0 comments on commit 154df37

Please sign in to comment.