Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added functions to get address information from readers/writers and p… #25

Merged
merged 2 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions src/dds_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
#include <dds/DCPS/Service_Participant.h>
#include <dds/DCPS/RTPS/RtpsDiscovery.h>
#include <dds/DCPS/ServiceEventDispatcher.h>
#include <dds/DCPS/DataWriterImpl.h>
#include <dds/DCPS/DataReaderImpl.h>
#include <dds/DCPS/LogAddr.h>

#ifdef WIN32
#pragma warning(pop)
Expand All @@ -35,6 +38,26 @@
#include <dds/DCPS/transport/rtps_udp/RtpsUdp.h>
#endif

//Helper function to get the address list for a sequence
std::string GetAddressInfo(const OpenDDS::DCPS::TransportLocatorSeq& info)
{
std::string strAddress;
for (unsigned int idx = 0; idx != info.length(); ++idx) {
const auto locators = OpenDDS::RTPS::transport_locator_to_locator_seq(info[idx]);
for (unsigned int idx2 = 0; idx2 != locators.length(); ++idx2) {
ACE_INET_Addr addr;
if (locator_to_address(addr, locators[idx2], false) == 0) {
if (!strAddress.empty()) {
strAddress += ",";
}
strAddress += OpenDDS::DCPS::LogAddr(addr).c_str();
}
}
}

return strAddress;
}

std::map<int, int> g_transportInstances;

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -1149,6 +1172,28 @@ DDS::DataReader_var DDSManager::getReader(const std::string& topicName,
}


//------------------------------------------------------------------------------
std::string DDSManager::getWriterAddress(const std::string& topicName) const
{
DDS::DataWriter_var writer = getWriter(topicName);
auto dwi = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(writer.in());
if (dwi == nullptr) {
return "Invalid Writer";
}
return GetAddressInfo(dwi->connection_info());
}

//------------------------------------------------------------------------------
std::string DDSManager::getReaderAddress(const std::string& topicName, const std::string& readerName) const
{
DDS::DataReader_var reader = getReader(topicName, readerName);
auto dri = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(reader.in());
if (dri == nullptr) {
return "Invalid Reader";
}
return GetAddressInfo(dri->connection_info());
}

//------------------------------------------------------------------------------
DDS::DataWriter_var DDSManager::getWriter(const std::string& topicName) const
{
Expand Down
15 changes: 15 additions & 0 deletions src/dds_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,21 @@ class DDSManager
DDS::DataReader_var getReader(const std::string& topicName,
const std::string& readerName) const;

/**
* @brief Get the addresses associated with a topic.
* @param[in] topicName The name of the topic.
* @return String containing the addresses used by the writer.
*/
std::string getWriterAddress(const std::string& topicName) const;

/**
* @brief Get the addresses associated with a topic.
* @param[in] topicName The name of the topic.
* @param[in] readerName The name of the reader.
* @return String containing the addresses used by the reader.
*/
std::string getReaderAddress(const std::string& topicName, const std::string& readerName) const;

/**
* @brief Get the data writer associated with a topic.
* @param[in] topicName The name of the topic.
Expand Down
64 changes: 57 additions & 7 deletions src/dds_simple.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,22 +242,22 @@ class DDSSimpleManager : public DDSManager
// This function waits until it finds one (or more) Subscriber of topic T OR secondsToWait seconds expires.
// Useful when making sure your messages are being Published on a certain topic.
template <class T>
bool WaitForSubscriber(std::chrono::milliseconds timeToWait = std::chrono::seconds(2))
bool WaitForSubscriber(std::chrono::milliseconds timeToWait = std::chrono::seconds(15))
{
return GetNumberOfSubscribers<T>(1, timeToWait) > 0;
}

//Call WaitForPublisher(0) if you have already been discovered and want to see if you've lost all connection to publishers.
// readerName is not required unless user specifies a readerName when creating their Subscriber / Callback
template <class T>
bool WaitForPublisher(std::chrono::milliseconds timeToWait = std::chrono::seconds(2), std::string readerName = "")
bool WaitForPublisher(std::chrono::milliseconds timeToWait = std::chrono::seconds(15), std::string readerName = "")
{
return GetNumberOfPublishers<T>(1, timeToWait, readerName) > 0;
}

// Function that will wait until [max_wait] passes or until we find [min_count] number of Subscribers, whichever is faster
template<class T>
int GetNumberOfSubscribers(int min_count, std::chrono::milliseconds max_wait = std::chrono::seconds(2))
int GetNumberOfSubscribers(int min_count, std::chrono::milliseconds max_wait = std::chrono::seconds(15))
{
const std::chrono::milliseconds waitIncriment(100);
std::chrono::milliseconds timeWaited(0);
Expand Down Expand Up @@ -309,7 +309,8 @@ class DDSSimpleManager : public DDSManager
}
}

sstr << "Failed to find " << min_count << " Subscribers(s)... Only found " << pubStatus.current_count;
std::string addressInfo = getWriterAddress(temp);
sstr << "Failed to find " << min_count << " on " << addressInfo << ". Subscribers(s)... Only found " << pubStatus.current_count;
m_messageHandler(LogMessageType::DDS_INFO, sstr.str());

return pubStatus.current_count;
Expand All @@ -322,9 +323,31 @@ class DDSSimpleManager : public DDSManager
return 0;
}

template <class T>
std::string GetSubscriberAddress()
{
std::string topic_name = typeid(T).name();
try {
std::string temp;
{
decltype(m_sharedLock) lck(mutex_shr);
auto iter = m_pubMap.find(topic_name);
if (iter == m_pubMap.end()) {
return std::string("Invalid Publisher for ") + topic_name;
}

temp = iter->second;
}
return getWriterAddress(temp);
}
catch (...) {
}
return std::string("Invalid Publisher for ") + topic_name;
}

// Function that will wait until [max_wait] passes or until we find [min_count] number of Publishers, whichever is faster
template <class T>
int GetNumberOfPublishers(int min_count, std::chrono::milliseconds max_wait = std::chrono::seconds(2), std::string reader_name = "")
int GetNumberOfPublishers(int min_count, std::chrono::milliseconds max_wait = std::chrono::seconds(15), std::string reader_name = "")
{
const std::chrono::milliseconds waitIncriment(100);
std::chrono::milliseconds timeWaited(0);
Expand All @@ -349,7 +372,9 @@ class DDSSimpleManager : public DDSManager
std::string temp = iter->second;
lck.unlock();

auto dr = getReader(temp, GenerateReaderName(temp, reader_name)); // Reader name == Topic name + "Reader", unless user-specified
auto genReaderName = GenerateReaderName(temp, reader_name);

auto dr = getReader(temp, genReaderName); // Reader name == Topic name + "Reader", unless user-specified

if (dr == nullptr) {
sstr << "No reader found for: " << topic_name << ".";
Expand All @@ -375,7 +400,9 @@ class DDSSimpleManager : public DDSManager
return subStatus.current_count;
}
}
sstr << "Failed to find " << min_count << " Publisher(s)... Only found " << subStatus.current_count;

std::string addressInfo = getReaderAddress(temp, genReaderName);
sstr << "Failed to find " << min_count << " on " << addressInfo << ". Publisher(s)... Only found " << subStatus.current_count;
m_messageHandler(LogMessageType::DDS_INFO, sstr.str());

return subStatus.current_count;
Expand All @@ -389,6 +416,29 @@ class DDSSimpleManager : public DDSManager
return 0;
}

template <class T>
std::string GetPublisherAddress(std::string reader_name = "")
{
std::string topic_name = typeid(T).name();
try {
std::string temp;
{
decltype(m_sharedLock) lck(mutex_shr);
auto iter = m_subMap.find(topic_name);
if (iter == m_subMap.end()) {
return std::string("Invalid Subscirber for ") + topic_name;
doug1234 marked this conversation as resolved.
Show resolved Hide resolved
}
temp = iter->second;
}

return getReaderAddress(temp, GenerateReaderName(temp, reader_name)); // Reader name == Topic name + "Reader", unless user-specified
}
catch (...) {
}

return std::string("Invalid subscriber for ") + topic_name;
}

void EventID(int id) { m_eventID = id; }

int EventID() { return m_eventID; }
Expand Down