Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report queued outgoing cluster messages as perfdata #7897

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion lib/remote/apilistener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1689,6 +1689,7 @@ std::pair<Dictionary::Ptr, Dictionary::Ptr> ApiListener::GetStatus()
double workQueueItemRate = JsonRpcConnection::GetWorkQueueRate();
double syncQueueItemRate = m_SyncQueue.GetTaskCount(60) / 60.0;
double relayQueueItemRate = m_RelayQueue.GetTaskCount(60) / 60.0;
auto queuedMessages (JsonRpcConnection::GetQueuedMessages());

Dictionary::Ptr status = new Dictionary({
{ "identity", GetIdentity() },
Expand All @@ -1706,7 +1707,8 @@ std::pair<Dictionary::Ptr, Dictionary::Ptr> ApiListener::GetStatus()
{ "relay_queue_items", relayQueueItems },
{ "work_queue_item_rate", workQueueItemRate },
{ "sync_queue_item_rate", syncQueueItemRate },
{ "relay_queue_item_rate", relayQueueItemRate }
{ "relay_queue_item_rate", relayQueueItemRate },
{ "queued_items", queuedMessages }
}) },

{ "http", new Dictionary({
Expand All @@ -1727,6 +1729,7 @@ std::pair<Dictionary::Ptr, Dictionary::Ptr> ApiListener::GetStatus()
perfdata->Set("num_json_rpc_work_queue_item_rate", workQueueItemRate);
perfdata->Set("num_json_rpc_sync_queue_item_rate", syncQueueItemRate);
perfdata->Set("num_json_rpc_relay_queue_item_rate", relayQueueItemRate);
perfdata->Set("num_json_rpc_queued_items", queuedMessages);

return std::make_pair(status, perfdata);
}
Expand Down
17 changes: 16 additions & 1 deletion lib/remote/jsonrpcconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler);

static RingBuffer l_TaskStats (15 * 60);

Atomic<uintmax_t> JsonRpcConnection::m_QueuedMessages (0);

JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role)
: JsonRpcConnection(identity, authenticated, stream, role, IoEngine::Get().GetIoContext())
Expand All @@ -46,6 +48,11 @@ JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
m_Endpoint = Endpoint::GetByName(identity);
}

JsonRpcConnection::~JsonRpcConnection()
{
m_QueuedMessages.fetch_sub(m_OutgoingMessagesQueue.size());
}

void JsonRpcConnection::Start()
{
namespace asio = boost::asio;
Expand Down Expand Up @@ -168,6 +175,8 @@ void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc)

break;
}

m_QueuedMessages.fetch_sub(queue.size());
}
} while (!m_ShuttingDown);

Expand Down Expand Up @@ -211,8 +220,12 @@ void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
}

Ptr keepAlive (this);
m_QueuedMessages.fetch_add(1);

m_IoStrand.post([this, keepAlive, message]() { SendMessageInternal(message); });
m_IoStrand.post([this, keepAlive, message]() {
m_OutgoingMessagesQueue.emplace_back(JsonEncode(message));
m_OutgoingMessagesQueued.Set();
});
}

void JsonRpcConnection::SendRawMessage(const String& message)
Expand All @@ -222,6 +235,7 @@ void JsonRpcConnection::SendRawMessage(const String& message)
}

Ptr keepAlive (this);
m_QueuedMessages.fetch_add(1);

m_IoStrand.post([this, keepAlive, message]() {
if (m_ShuttingDown) {
Expand All @@ -239,6 +253,7 @@ void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message)
return;
}

m_QueuedMessages.fetch_add(1);
m_OutgoingMessagesQueue.emplace_back(JsonEncode(message));
m_OutgoingMessagesQueued.Set();
}
Expand Down
10 changes: 10 additions & 0 deletions lib/remote/jsonrpcconnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "base/tlsstream.hpp"
#include "base/timer.hpp"
#include "base/workqueue.hpp"
#include <cstdint>
#include <memory>
#include <vector>
#include <boost/asio/io_context.hpp>
Expand Down Expand Up @@ -44,6 +45,7 @@ class JsonRpcConnection final : public Object
DECLARE_PTR_TYPEDEFS(JsonRpcConnection);

JsonRpcConnection(const String& identity, bool authenticated, const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role);
~JsonRpcConnection() override;

void Start();

Expand All @@ -65,7 +67,15 @@ class JsonRpcConnection final : public Object

static void SendCertificateRequest(const JsonRpcConnection::Ptr& aclient, const intrusive_ptr<MessageOrigin>& origin, const String& path);

static inline
uintmax_t GetQueuedMessages()
{
return m_QueuedMessages.load();
}

private:
static Atomic<uintmax_t> m_QueuedMessages;

String m_Identity;
bool m_Authenticated;
Endpoint::Ptr m_Endpoint;
Expand Down
Loading