From 94bedae197475cf3c4d16ec254c343e6d25abcec Mon Sep 17 00:00:00 2001 From: Manikandan Dhamodharan Date: Fri, 26 Nov 2021 22:15:45 -0500 Subject: [PATCH] Enhanced call site info to store probe name, file, function and line number of the probe location --- .gitignore | 1 + include/xpedite/framework/CallSiteInfo.H | 81 ------------ include/xpedite/framework/Persister.H | 86 ++++++++---- include/xpedite/framework/ProbeInfo.H | 153 ++++++++++++++++++++++ include/xpedite/framework/SamplesBuffer.H | 8 +- include/xpedite/framework/SamplesLoader.H | 21 ++- include/xpedite/transport/Socket.H | 2 +- lib/xpedite/framework/Collector.C | 6 +- lib/xpedite/framework/Collector.H | 4 +- lib/xpedite/framework/Persister.C | 47 ++++--- lib/xpedite/transport/Listener.C | 2 +- scripts/lib/ext | 1 + 12 files changed, 271 insertions(+), 141 deletions(-) delete mode 100644 include/xpedite/framework/CallSiteInfo.H create mode 100644 include/xpedite/framework/ProbeInfo.H create mode 120000 scripts/lib/ext diff --git a/.gitignore b/.gitignore index 24a2ad1e..f472e19b 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,4 @@ ko/xpedite.ko.unsigned scripts/bin/xpeditec *.class .gradle +**/.eggs diff --git a/include/xpedite/framework/CallSiteInfo.H b/include/xpedite/framework/CallSiteInfo.H deleted file mode 100644 index 5a5067be..00000000 --- a/include/xpedite/framework/CallSiteInfo.H +++ /dev/null @@ -1,81 +0,0 @@ -/////////////////////////////////////////////////////////////////////////////// -// -// Provides implementation for storing and locating information about call sites -// in a trager application. -// -// CallSiteInfo - Stores the address, attribtues and id of a call site -// -// CallSiteMap - A collection of call sites in a target application -// -// Author: Manikandan Dhamodharan, Morgan Stanley -// -/////////////////////////////////////////////////////////////////////////////// - -#pragma once -#include -#include -#include - -namespace xpedite { namespace framework { - - class CallSiteInfo - { - const void* _callSite; - const probes::CallSiteAttr _attr; - const uint32_t _id; - - public: - - CallSiteInfo(const void* callSite_, probes::CallSiteAttr attr_, uint32_t id_) - : _callSite {callSite_}, _attr {attr_}, _id {id_} { - } - - const void* callSite() const noexcept { - return _callSite; - } - - uint32_t id() const noexcept { return _id; } - bool canStoreData() const noexcept { return _attr.canStoreData(); } - bool canBeginTxn() const noexcept { return _attr.canBeginTxn(); } - bool canSuspendTxn() const noexcept { return _attr.canSuspendTxn(); } - bool canResumeTxn() const noexcept { return _attr.canResumeTxn(); } - bool canEndTxn() const noexcept { return _attr.canEndTxn(); } - - std::string toString() const { - std::ostringstream os; - os << "CallSite - " << callSite() << " | id - " << _id << " | " << _attr.toString(); - return os.str(); - } - - } __attribute__((packed)); - - - class CallSiteMap - { - std::unordered_map _map; - - public: - - void add(const CallSiteInfo callSiteInfo_) { - _map.emplace(callSiteInfo_.callSite(), callSiteInfo_); - } - - const CallSiteInfo* locateInfo(const void* callSite_) const noexcept { - auto it = _map.find(callSite_); - if(it != _map.end()) { - return &(it->second); - } - return {}; - } - - std::string toString() const { - std::ostringstream os; - for(auto& kvp : _map) { - os << kvp.second.toString() << std::endl; - } - return os.str(); - } - - }; - -}} diff --git a/include/xpedite/framework/Persister.H b/include/xpedite/framework/Persister.H index 0fc31fc3..bd31f2b6 100644 --- a/include/xpedite/framework/Persister.H +++ b/include/xpedite/framework/Persister.H @@ -8,7 +8,7 @@ #pragma once #include -#include +#include #include #include @@ -46,46 +46,88 @@ namespace xpedite { namespace framework { timeval _time; uint64_t _tscHz; uint32_t _pmcCount; - uint32_t _callSiteCount; - CallSiteInfo _callSites[0]; + uint32_t _probeCount; + uint32_t _probeInfoBufSize; + ProbeInfo _probes[0]; public: - static constexpr uint64_t XPEDITE_VERSION {0x0200}; + static constexpr uint64_t XPEDITE_MIN_COMPATIBLE_VERSION {0x0200}; + static constexpr uint64_t XPEDITE_VERSION {0x0210}; static constexpr uint64_t XPEDITE_FILE_HDR_SIG {0xC01DC01DC0FFEEEE}; - static size_t callSiteSize(uint64_t callSiteCount_) { - return sizeof(CallSiteInfo) * callSiteCount_; + size_t size() const noexcept { + return sizeof(FileHeader) + _probeInfoBufSize; } - static size_t capacity(uint64_t callSiteCount_) { - return sizeof(FileHeader) + callSiteSize(callSiteCount_); - } - - FileHeader(const std::vector& callSites_, timeval time_, uint64_t tscHz_, uint32_t pmcCount_) + FileHeader(timeval time_, uint64_t tscHz_, uint32_t pmcCount_) : _signature {XPEDITE_FILE_HDR_SIG}, _version {XPEDITE_VERSION}, _time (time_), - _tscHz {tscHz_}, _pmcCount {pmcCount_}, _callSiteCount {static_cast(callSites_.size())} { - memcpy(reinterpret_cast(_callSites), callSites_.data(), callSiteSize(callSites_.size())); + _tscHz {tscHz_}, _pmcCount {pmcCount_}, _probeCount {}, _probeInfoBufSize {} { } + FileHeader(const FileHeader&) = delete; + FileHeader& operator=(const FileHeader&) = delete; + FileHeader(FileHeader&&) = delete; + FileHeader& operator=(FileHeader&&) = delete; + bool isValid() const noexcept { - return _signature == XPEDITE_FILE_HDR_SIG && _version == XPEDITE_VERSION; + return _signature == XPEDITE_FILE_HDR_SIG && _version >= XPEDITE_MIN_COMPATIBLE_VERSION && _version <= XPEDITE_VERSION; } - timeval time() const noexcept { return _time; } - uint64_t tscHz() const noexcept { return _tscHz; } - uint32_t pmcCount() const noexcept { return _pmcCount; } + timeval time() const noexcept { return _time; } + uint64_t tscHz() const noexcept { return _tscHz; } + uint32_t pmcCount() const noexcept { return _pmcCount; } + uint32_t probeCount() const noexcept { return _probeCount; } const SegmentHeader* segmentHeader() const noexcept { - return reinterpret_cast(reinterpret_cast(this + 1) + callSiteSize(_callSiteCount)); + if(_version < XPEDITE_VERSION) { + auto bufSize = sizeof(FileHeader) - 4 + _probeCount * 16; + return reinterpret_cast(reinterpret_cast(this) + bufSize); + } + return reinterpret_cast(reinterpret_cast(this + 1) + _probeInfoBufSize); } - std::tuple callSites() const noexcept { - return std::make_tuple(&_callSites[0], _callSiteCount); + void add(const void* callSite_, probes::CallSiteAttr attr_, uint32_t id_, Name probeName_, + Name fileName_, Name function_, uint32_t lineNo_) { + ++_probeCount; + auto* probeInfo = new (reinterpret_cast(this + 1) + _probeInfoBufSize) ProbeInfo { + callSite_, attr_, id_, probeName_, fileName_, function_, lineNo_ + }; + _probeInfoBufSize += probeInfo->size(); } + + template + void forEachCallSiteInfo(T t) const { + if(_version < XPEDITE_VERSION) { + return; + } + const ProbeInfo* probeInfo {_probes}; + for(uint32_t i=0; i<_probeCount; ++i) { + t(probeInfo); + probeInfo = reinterpret_cast(reinterpret_cast(probeInfo) + probeInfo->size()); + } + assert(reinterpret_cast(this)+size() == reinterpret_cast(probeInfo)); + } + } __attribute__((packed)); - void persistHeader(int fd_); - void persistData(int fd_, const probes::Sample* begin_, const probes::Sample* end_); + class Persister + { + FileHeader* _hdr; + std::vector _buffer; + + void resizeBuffer(size_t objSize_); + + size_t freeSize() const noexcept { + return _buffer.size() - _hdr->size(); + } + + public: + + Persister(); + + void persistHeader(int fd_); + void persistData(int fd_, const probes::Sample* begin_, const probes::Sample* end_); + }; }} diff --git a/include/xpedite/framework/ProbeInfo.H b/include/xpedite/framework/ProbeInfo.H new file mode 100644 index 00000000..fe5f7291 --- /dev/null +++ b/include/xpedite/framework/ProbeInfo.H @@ -0,0 +1,153 @@ +/////////////////////////////////////////////////////////////////////////////// +// +// Provides implementation for storing and locating information about call sites +// in a trager application. +// +// ProbeInfo - Stores the address, attribtues and id of a call site +// +// CallSiteMap - A collection of call sites in a target application +// +// Author: Manikandan Dhamodharan, Morgan Stanley +// +/////////////////////////////////////////////////////////////////////////////// + +#pragma once +#include +#include +#include +#include +#include + +namespace xpedite { namespace framework { + + struct Name { + // null terminated c-string + const char* _data; + + // size of the string including the null-char + uint32_t _size; + }; + + class FileHeader; + + enum class ProbeType { + Invalid = 0, + TxnBeginProbe, + TxnSuspendProbe, + TxnResumeProbe, + TxnEndProbe + }; + + class ProbeInfo + { + friend class FileHeader; + const void* _callSite; + probes::CallSiteAttr _attr; + uint32_t _id; + uint32_t _probeNameOffset; + uint32_t _fileNameOffset; + uint32_t _functionNameOffset; + uint32_t _lineNo; + uint32_t _size; + char _data[0]; + + ProbeInfo(const void* callSite_, probes::CallSiteAttr attr_, uint32_t id_, Name probeName_, + Name fileName_, Name functionName_, uint32_t lineNo_) + : _callSite {callSite_}, _attr {attr_}, _id {id_}, _lineNo {lineNo_}, _size {} { + + _probeNameOffset = _size; + memcpy(_data+_size, probeName_._data, probeName_._size); + _size += probeName_._size; + + _fileNameOffset = _size; + memcpy(_data+_size, fileName_._data, fileName_._size); + _size += fileName_._size; + + _functionNameOffset = _size; + memcpy(_data+_size, functionName_._data, functionName_._size); + _size += functionName_._size; + } + + ProbeInfo (const ProbeInfo&) = delete; + ProbeInfo& operator=(const ProbeInfo&) = delete; + ProbeInfo (ProbeInfo&&) = delete; + ProbeInfo& operator=(ProbeInfo&&) = delete; + + public: + + const void* callSite() const noexcept { + return _callSite; + } + + uint32_t id() const noexcept { return _id; } + bool isActive() const noexcept { return _attr.isActive(); } + bool canStoreData() const noexcept { return _attr.canStoreData(); } + bool canBeginTxn() const noexcept { return _attr.canBeginTxn(); } + bool canSuspendTxn() const noexcept { return _attr.canSuspendTxn(); } + bool canResumeTxn() const noexcept { return _attr.canResumeTxn(); } + bool canEndTxn() const noexcept { return _attr.canEndTxn(); } + const char* probeName() const noexcept { return _data + _probeNameOffset; } + const char* fileName() const noexcept { return _data + _fileNameOffset; } + const char* functionName() const noexcept { return _data + _functionNameOffset; } + uint32_t lineNo() const noexcept { return _lineNo; } + size_t size() const noexcept { return sizeof(ProbeInfo) + _size; } + + ProbeType type() const noexcept { + if(canBeginTxn()) { + return ProbeType::TxnBeginProbe; + } else if(canSuspendTxn()) { + return ProbeType::TxnSuspendProbe; + } else if(canResumeTxn()) { + return ProbeType::TxnResumeProbe; + } + assert(canEndTxn()); + return ProbeType::TxnEndProbe; + } + + std::string toString() const { + std::ostringstream os; + os << "CallSite - " << callSite() << " | id - " << _id << " | " << _attr.toString() + << " | probe name - " << probeName() << " | file name - " << fileName() + << " | line no - " << _lineNo << " | function - " << functionName(); + return os.str(); + } + + } __attribute__((packed)); + + + class ProbeInfoMap + { + using Map = std::unordered_map; + Map _map; + + public: + + using value_type = Map::value_type; + + void add(const ProbeInfo* probeInfo_) { + _map.emplace(probeInfo_->callSite(), probeInfo_); + } + + const ProbeInfo* locateInfo(const void* callSite_) const noexcept { + auto it = _map.find(callSite_); + if(it != _map.end()) { + return it->second; + } + return {}; + } + + const Map& data() const noexcept { + return _map; + } + + std::string toString() const { + std::ostringstream os; + for(auto& kvp : _map) { + os << kvp.second->toString() << std::endl; + } + return os.str(); + } + + }; + +}} diff --git a/include/xpedite/framework/SamplesBuffer.H b/include/xpedite/framework/SamplesBuffer.H index e07efda4..27597006 100644 --- a/include/xpedite/framework/SamplesBuffer.H +++ b/include/xpedite/framework/SamplesBuffer.H @@ -58,11 +58,11 @@ namespace xpedite { namespace framework { return _head.load(std::memory_order_relaxed); } - static bool attachAll(const std::string& fileNamePattern_) noexcept { + static bool attachAll(Persister& persister_, const std::string& fileNamePattern_) noexcept { auto begin = SamplesBuffer::head(); auto buffer = begin; while(buffer) { - if(!buffer->attachReader(fileNamePattern_)) { + if(!buffer->attachReader(persister_, fileNamePattern_)) { break; } buffer = buffer->next(); @@ -97,7 +97,7 @@ namespace xpedite { namespace framework { return _fd >= 0; } - bool attachReader(const std::string& fileNamePattern_) noexcept { + bool attachReader(Persister& persister_, const std::string& fileNamePattern_) noexcept { if(isReaderAttached()) { XpediteLogError << "xpedite - failed to attach reader to thread " << tid() << " - reader already attached. attaching multiple readers not permitted" << XpediteLogEnd; @@ -112,7 +112,7 @@ namespace xpedite { namespace framework { return false; } - persistHeader(_fd); + persister_.persistHeader(_fd); uint64_t rindex, windex; std::tie(rindex, windex) = _bufferPool.attachReader(); XpediteLogInfo << "xpedite - attached reader to thread - " << tid() << " | buffer index state - [readIndex - " diff --git a/include/xpedite/framework/SamplesLoader.H b/include/xpedite/framework/SamplesLoader.H index be215a48..e8d57dfc 100644 --- a/include/xpedite/framework/SamplesLoader.H +++ b/include/xpedite/framework/SamplesLoader.H @@ -33,7 +33,7 @@ namespace xpedite { namespace framework { { int _fd; const FileHeader* _fileHeader; - CallSiteMap _callSiteMap; + ProbeInfoMap _probeInfoMap; const SegmentHeader* _segmentHeader; unsigned _size; @@ -99,7 +99,7 @@ namespace xpedite { namespace framework { }; explicit SamplesLoader(const char* path_) - : _fd {}, _fileHeader {}, _callSiteMap {}, _segmentHeader {}, _size {} { + : _fd {}, _fileHeader {}, _probeInfoMap {}, _segmentHeader {}, _size {} { load(path_); } @@ -141,21 +141,18 @@ namespace xpedite { namespace framework { throw std::runtime_error {errorMsg("detected data corruption - mismatch in header signature")}; } - const CallSiteInfo* callSites; - uint64_t callSiteCount; - std::tie(callSites, callSiteCount) = _fileHeader->callSites(); - for(unsigned i=0; iforEachCallSiteInfo([this](const ProbeInfo* callSiteInfo_) { + _probeInfoMap.add(callSiteInfo_); + }); _segmentHeader = _fileHeader->segmentHeader(); } - const CallSiteInfo* locateCallSite(const void* callSite_) const noexcept { - return _callSiteMap.locateInfo(callSite_); + const ProbeInfo* locateCallSite(const void* callSite_) const noexcept { + return _probeInfoMap.locateInfo(callSite_); } - uint32_t pmcCount() const noexcept { return _fileHeader->pmcCount(); } - const CallSiteMap callSiteMap() const noexcept { return _callSiteMap; } + uint32_t pmcCount() const noexcept { return _fileHeader->pmcCount(); } + const ProbeInfoMap probeInfoMap() const noexcept { return _probeInfoMap; } Iterator begin() const { return Iterator {_segmentHeader, samplesEnd()}; } Iterator end() const { return Iterator {samplesEnd(), samplesEnd()}; } diff --git a/include/xpedite/transport/Socket.H b/include/xpedite/transport/Socket.H index 025b17cf..6e0f1f19 100644 --- a/include/xpedite/transport/Socket.H +++ b/include/xpedite/transport/Socket.H @@ -166,7 +166,7 @@ namespace xpedite { namespace transport { namespace tcp { } std::string toString() noexcept { - std::array buffer; + std::array buffer {}; if(!inet_ntop(AF_INET, &_addr.sin_addr, buffer.data(), buffer.size())) { // error converting ip to string strcpy(buffer.data(), "unknown"); diff --git a/lib/xpedite/framework/Collector.C b/lib/xpedite/framework/Collector.C index 71674e2b..e7cf38b0 100644 --- a/lib/xpedite/framework/Collector.C +++ b/lib/xpedite/framework/Collector.C @@ -22,7 +22,7 @@ namespace xpedite { namespace framework { bool Collector::beginSamplesCollection() { XpediteLogInfo << "xpedite - begin out of band samples collection" << XpediteLogEnd; - _isCollecting = SamplesBuffer::attachAll(_fileNamePattern); + _isCollecting = SamplesBuffer::attachAll(_persister, _fileNamePattern); return _isCollecting; } @@ -39,7 +39,7 @@ namespace xpedite { namespace framework { void Collector::persistSamples(int fd_, const probes::Sample* begin_, const probes::Sample* end_) { auto size = reinterpret_cast(end_) - reinterpret_cast(begin_); if(_storageMgr.consume(size)) { - persistData(fd_, begin_, end_); + _persister.persistData(fd_, begin_, end_); } else if(!_capacityBreached) { // capacity breached - dropping all samples from now on _capacityBreached = true; @@ -141,7 +141,7 @@ namespace xpedite { namespace framework { while(buffer) { if(!buffer->isReaderAttached()) { //TODO, have to limit the number of attach operations attempted - buffer->attachReader(_fileNamePattern); + buffer->attachReader(_persister, _fileNamePattern); } if(buffer->isReaderAttached()) { diff --git a/lib/xpedite/framework/Collector.H b/lib/xpedite/framework/Collector.H index 74e4e852..95d7cb93 100644 --- a/lib/xpedite/framework/Collector.H +++ b/lib/xpedite/framework/Collector.H @@ -13,6 +13,7 @@ #pragma once #include "StorageMgr.H" +#include #include #include @@ -29,7 +30,7 @@ namespace xpedite { namespace framework { public: Collector(std::string fileNamePattern_, uint64_t samplesDataCapacity_) - : _storageMgr {samplesDataCapacity_}, _fileNamePattern {std::move(fileNamePattern_)}, + : _storageMgr {samplesDataCapacity_}, _persister {}, _fileNamePattern {std::move(fileNamePattern_)}, _isCollecting {}, _capacityBreached {} { } @@ -54,6 +55,7 @@ namespace xpedite { namespace framework { std::tuple flush(SamplesBuffer* buffer_); StorageMgr _storageMgr; + Persister _persister; std::string _fileNamePattern; bool _isCollecting; bool _capacityBreached; diff --git a/lib/xpedite/framework/Persister.C b/lib/xpedite/framework/Persister.C index ae77ab04..8d71ac65 100644 --- a/lib/xpedite/framework/Persister.C +++ b/lib/xpedite/framework/Persister.C @@ -17,35 +17,50 @@ #include #include #include +#include #include namespace xpedite { namespace framework { static unsigned batchCount; - std::vector buildCallSiteList() { - std::vector callSites; - for(auto& probe : probes::probeList()) { - callSites.emplace_back(probe.rawRecorderCallSite(), probe.attr(), probe.id()); + void Persister::resizeBuffer(size_t objSize_) { + auto size = _buffer.size(); + while(size < _hdr->size() + objSize_) { + size *= 2; } - return callSites; + _buffer.resize(size); + _hdr = reinterpret_cast(_buffer.data()); } - void persistHeader(int fd_) { - static auto tscHz = util::estimateTscHz(); - auto callSites = buildCallSiteList(); + Persister::Persister() + : _hdr {}, _buffer {} { + auto tscHz = util::estimateTscHz(); timeval time; gettimeofday(&time, nullptr); - auto capacity = FileHeader::capacity(callSites.size()); - std::unique_ptr buffer {new char[capacity]}; - new (buffer.get()) FileHeader {callSites, time, tscHz, pmu::pmuCtl().pmcCount()}; - write(fd_, buffer.get(), capacity); - XpediteLogInfo << "persisted file header with " << callSites.size() << " call sites | capacity " - << sizeof(FileHeader) << " + " << FileHeader::callSiteSize(callSites.size()) << " = " - << capacity << " bytes" << XpediteLogEnd; + _buffer.resize(2 * 1024 * 1024); + _hdr = new (_buffer.data()) FileHeader {time, tscHz, pmu::pmuCtl().pmcCount()}; + for(auto& probe : probes::probeList()) { + Name probeName {probe.name(), static_cast(strlen(probe.name()))+1}; + Name fileName {probe.file(), static_cast(strlen(probe.file()))+1}; + Name functionName {probe.func(), static_cast(strlen(probe.func()))+1}; + auto objSize = sizeof(ProbeInfo) + probeName._size + fileName._size + functionName._size; + if(freeSize() < objSize) { + resizeBuffer(objSize); + } + _hdr->add( + probe.rawRecorderCallSite(), probe.attr(), probe.id(), probeName, fileName, functionName, probe.line() + ); + } + } + + void Persister::persistHeader(int fd_) { + write(fd_, _buffer.data(), _hdr->size()); + XpediteLogInfo << "persisted file header with " << _hdr->probeCount() << " call sites | capacity " + << _hdr->size() << " bytes" << XpediteLogEnd; } - void persistData(int fd_, const probes::Sample* begin_, const probes::Sample* end_) { + void Persister::persistData(int fd_, const probes::Sample* begin_, const probes::Sample* end_) { if(!begin_ || begin_ == end_) { return; diff --git a/lib/xpedite/transport/Listener.C b/lib/xpedite/transport/Listener.C index c92f445b..f623e87b 100644 --- a/lib/xpedite/transport/Listener.C +++ b/lib/xpedite/transport/Listener.C @@ -134,7 +134,7 @@ namespace xpedite { namespace transport { namespace tcp { // should not be called in crit path std::string Listener::toString() const { - std::array buffer; + std::array buffer {}; if(!inet_ntop(AF_INET, &_addr.sin_addr, buffer.data(), buffer.size())) { // error converting ip to string strcpy(buffer.data(), "unknown"); diff --git a/scripts/lib/ext b/scripts/lib/ext new file mode 120000 index 00000000..6581736d --- /dev/null +++ b/scripts/lib/ext @@ -0,0 +1 @@ +../../ \ No newline at end of file