From 911e0db3fa6996d54f0be8aa77e613d3e66b462e Mon Sep 17 00:00:00 2001 From: Qi Ma Date: Wed, 20 Aug 2014 13:23:19 +0100 Subject: [PATCH 01/15] log messages for tracking down the bad_alloc issue --- src/maidsafe/rudp/connection.cc | 3 ++- src/maidsafe/rudp/connection_manager.cc | 13 ++++++++++--- src/maidsafe/rudp/core/dispatcher.cc | 9 +++++++++ src/maidsafe/rudp/managed_connections.cc | 8 +++++++- 4 files changed, 28 insertions(+), 5 deletions(-) diff --git a/src/maidsafe/rudp/connection.cc b/src/maidsafe/rudp/connection.cc index 19d6471a..14851cbf 100644 --- a/src/maidsafe/rudp/connection.cc +++ b/src/maidsafe/rudp/connection.cc @@ -98,7 +98,8 @@ void Connection::Close() { } void Connection::DoClose(bool timed_out) { - LOG(kVerbose) << "RUDP Connection::DoClose"; + LOG(kVerbose) << "RUDP Connection::DoClose " << DebugId(socket_.PeerNodeId()) + << " " << socket_.PeerEndpoint(); probe_interval_timer_.cancel(); lifespan_timer_.cancel(); if (std::shared_ptr transport = transport_.lock()) { diff --git a/src/maidsafe/rudp/connection_manager.cc b/src/maidsafe/rudp/connection_manager.cc index ae46436b..08686168 100644 --- a/src/maidsafe/rudp/connection_manager.cc +++ b/src/maidsafe/rudp/connection_manager.cc @@ -113,7 +113,10 @@ bool ConnectionManager::CloseConnection(const NodeId& peer_id) { ConnectionPtr connection(*itr); lock.unlock(); - strand_.dispatch([=] { connection->Close(); }); // NOLINT (Fraser) + strand_.dispatch([=] { + LOG(kVerbose) << "closing connection to " << DebugId(peer_id); + connection->Close(); + }); // NOLINT (Fraser) return true; } @@ -163,7 +166,7 @@ Socket* ConnectionManager::GetSocket(const asio::const_buffer& data, const Endpo LOG(kError) << DebugId(kThisNodeId_) << " Received a non-RUDP packet from " << endpoint; return nullptr; } - +// std::unique_lock lock(mutex_); SocketMap::const_iterator socket_iter(sockets_.end()); if (socket_id == 0) { HandshakePacket handshake_packet; @@ -175,7 +178,8 @@ Socket* ConnectionManager::GetSocket(const asio::const_buffer& data, const Endpo if (handshake_packet.ConnectionReason() == Session::kNormal) { // This is a handshake packet on a newly-added socket LOG(kVerbose) << DebugId(kThisNodeId_) - << " This is a handshake packet on a newly-added socket from " << endpoint; + << " This is a handshake packet on a newly-added socket from " << endpoint + << " with socket id " << socket_id; socket_iter = std::find_if(sockets_.begin(), sockets_.end(), [endpoint](const SocketMap::value_type & socket_pair) { return socket_pair.second->PeerEndpoint() == endpoint && !socket_pair.second->IsConnected(); @@ -225,6 +229,7 @@ Socket* ConnectionManager::GetSocket(const asio::const_buffer& data, const Endpo } if (socket_iter != sockets_.end()) { + LOG(kVerbose) << DebugId(kThisNodeId_) << " find socket for endpoint " << endpoint; return socket_iter->second; } else { const unsigned char* p = asio::buffer_cast(data); @@ -315,6 +320,8 @@ uint32_t ConnectionManager::AddSocket(Socket* socket) { } void ConnectionManager::RemoveSocket(uint32_t id) { +// std::unique_lock lock(mutex_); + LOG(kVerbose) << "removing socket " << id; if (id) sockets_.erase(id); } diff --git a/src/maidsafe/rudp/core/dispatcher.cc b/src/maidsafe/rudp/core/dispatcher.cc index 3f5e6405..db3a34af 100644 --- a/src/maidsafe/rudp/core/dispatcher.cc +++ b/src/maidsafe/rudp/core/dispatcher.cc @@ -69,6 +69,7 @@ void Dispatcher::RemoveSocket(uint32_t id) { void Dispatcher::HandleReceiveFrom(const asio::const_buffer& data, const ip::udp::endpoint& endpoint) { + LOG(kVerbose) << "HandleReceiveFrom " << endpoint; auto in_use(use_count_); ConnectionManager* connection_manager; { @@ -76,11 +77,19 @@ void Dispatcher::HandleReceiveFrom(const asio::const_buffer& data, connection_manager = connection_manager_; } if (connection_manager) { + LOG(kVerbose) << "trying to fetch socket"; Socket* socket(connection_manager->GetSocket(data, endpoint)); if (socket) { + try { + LOG(kVerbose) << "fetched socket : " << socket->PeerEndpoint() + << " , " << DebugId(socket->PeerNodeId()); + } catch (const std::exception& e) { + LOG(kError) << boost::diagnostic_information(e); + } socket->HandleReceiveFrom(data, endpoint); } } + LOG(kVerbose) << "returning from HandleReceiveFrom"; } } // namespace detail diff --git a/src/maidsafe/rudp/managed_connections.cc b/src/maidsafe/rudp/managed_connections.cc index 4fc287b4..a6c5faac 100644 --- a/src/maidsafe/rudp/managed_connections.cc +++ b/src/maidsafe/rudp/managed_connections.cc @@ -725,6 +725,8 @@ unsigned ManagedConnections::GetActiveConnectionCount() const { void ManagedConnections::OnConnectionLostSlot(const NodeId& peer_id, TransportPtr transport, bool temporary_connection) { + LOG(kVerbose) << "ManagedConnections::OnConnectionLostSlot " << DebugId(peer_id) + << " is_temporary " << std::boolalpha << temporary_connection; std::lock_guard lock(mutex_); if (transport->IsIdle()) { assert(transport->IsAvailable()); @@ -748,6 +750,7 @@ void ManagedConnections::OnConnectionLostSlot(const NodeId& peer_id, TransportPt << (*itr).second->local_endpoint() << " not " << transport->local_endpoint(); BOOST_ASSERT(false); } + LOG(kVerbose) << "Erasing connection of " << (*itr).second->local_endpoint(); connections_.erase(itr); if (peer_id == chosen_bootstrap_node_id_) chosen_bootstrap_node_id_ = NodeId(); @@ -759,7 +762,10 @@ void ManagedConnections::OnConnectionLostSlot(const NodeId& peer_id, TransportPt if (local_callback) { LOG(kVerbose) << "Firing connection_lost_functor_ for " << DebugId(peer_id); - asio_service_.service().post([=] { local_callback(peer_id); }); + asio_service_.service().post([=] { + LOG(kVerbose) << "calling back for " << DebugId(peer_id); + local_callback(peer_id); + }); } } } From db43b5f2235fa958bd29dc54c2c090ca2e853c90 Mon Sep 17 00:00:00 2001 From: Qi Ma Date: Wed, 20 Aug 2014 17:31:12 +0100 Subject: [PATCH 02/15] avoiding mis-handling as symmetric NAT when running multiple vaults on same machine --- src/maidsafe/rudp/connection_manager.cc | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/maidsafe/rudp/connection_manager.cc b/src/maidsafe/rudp/connection_manager.cc index 08686168..c5b5825b 100644 --- a/src/maidsafe/rudp/connection_manager.cc +++ b/src/maidsafe/rudp/connection_manager.cc @@ -178,8 +178,7 @@ Socket* ConnectionManager::GetSocket(const asio::const_buffer& data, const Endpo if (handshake_packet.ConnectionReason() == Session::kNormal) { // This is a handshake packet on a newly-added socket LOG(kVerbose) << DebugId(kThisNodeId_) - << " This is a handshake packet on a newly-added socket from " << endpoint - << " with socket id " << socket_id; + << " This is a handshake packet on a newly-added socket from " << endpoint; socket_iter = std::find_if(sockets_.begin(), sockets_.end(), [endpoint](const SocketMap::value_type & socket_pair) { return socket_pair.second->PeerEndpoint() == endpoint && !socket_pair.second->IsConnected(); @@ -187,6 +186,16 @@ Socket* ConnectionManager::GetSocket(const asio::const_buffer& data, const Endpo // If the socket wasn't found, this could be a connect attempt from a peer using symmetric // NAT, so the peer's port may be different to what this node was told to expect. if (socket_iter == sockets_.end()) { + auto count(std::count_if(sockets_.begin(), sockets_.end(), + [endpoint](const SocketMap::value_type & socket_pair) { + return socket_pair.second->PeerEndpoint().address() == endpoint.address(); + })); + if (count > 1) { + LOG(kWarning) << "multiple vaults running on same machine " << count; + // if running multiple vaults on same machine, shall not consider symmetric NAT situation + return nullptr; + } + LOG(kVerbose) << "updating for symmetric"; socket_iter = std::find_if(sockets_.begin(), sockets_.end(), [endpoint](const SocketMap::value_type & socket_pair) { return socket_pair.second->PeerEndpoint().address() == endpoint.address() && @@ -197,9 +206,6 @@ Socket* ConnectionManager::GetSocket(const asio::const_buffer& data, const Endpo LOG(kVerbose) << DebugId(kThisNodeId_) << " Updating peer's endpoint from " << socket_iter->second->PeerEndpoint() << " to " << endpoint; socket_iter->second->UpdatePeerEndpoint(endpoint); - LOG(kVerbose) << DebugId(kThisNodeId_) - << " Peer's endpoint now: " << socket_iter->second->PeerEndpoint() - << " and guessed port = " << socket_iter->second->PeerGuessedPort(); } } } else { // Session::mode_ != kNormal From 655cece2c1003a05f0e54ef88545cb8ce501c3e3 Mon Sep 17 00:00:00 2001 From: Qi Ma Date: Thu, 21 Aug 2014 14:15:25 +0100 Subject: [PATCH 03/15] log around remove socket --- src/maidsafe/rudp/connection_manager.cc | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/maidsafe/rudp/connection_manager.cc b/src/maidsafe/rudp/connection_manager.cc index c5b5825b..cdce7d1b 100644 --- a/src/maidsafe/rudp/connection_manager.cc +++ b/src/maidsafe/rudp/connection_manager.cc @@ -327,9 +327,12 @@ uint32_t ConnectionManager::AddSocket(Socket* socket) { void ConnectionManager::RemoveSocket(uint32_t id) { // std::unique_lock lock(mutex_); - LOG(kVerbose) << "removing socket " << id; - if (id) + LOG(kVerbose) << "removing socket " << sockets_[id]->PeerEndpoint() + << " of peer " << sockets_[id]->PeerNodeId() << " of id " << id; + if (id) { sockets_.erase(id); + LOG(kVerbose) << "removed socket " << id; + } } size_t ConnectionManager::NormalConnectionsCount() const { From db7750cf1f8ce0fcb7a205422adf6fca127dc6a1 Mon Sep 17 00:00:00 2001 From: Qi Ma Date: Fri, 22 Aug 2014 13:17:15 +0100 Subject: [PATCH 04/15] temp fix to the socket thread safe issue --- src/maidsafe/rudp/core/dispatcher.cc | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/maidsafe/rudp/core/dispatcher.cc b/src/maidsafe/rudp/core/dispatcher.cc index db3a34af..9bf1a541 100644 --- a/src/maidsafe/rudp/core/dispatcher.cc +++ b/src/maidsafe/rudp/core/dispatcher.cc @@ -81,12 +81,14 @@ void Dispatcher::HandleReceiveFrom(const asio::const_buffer& data, Socket* socket(connection_manager->GetSocket(data, endpoint)); if (socket) { try { - LOG(kVerbose) << "fetched socket : " << socket->PeerEndpoint() - << " , " << DebugId(socket->PeerNodeId()); + LOG(kVerbose) << "fetched socket : " << socket->PeerEndpoint() + << " , " << DebugId(socket->PeerNodeId()); + socket->HandleReceiveFrom(data, endpoint); } catch (const std::exception& e) { + // TODO - This is only a temp fix. The socket shall be held as shared_ptr across + // owners to be thread safe avoid causing bad_alloc issue LOG(kError) << boost::diagnostic_information(e); } - socket->HandleReceiveFrom(data, endpoint); } } LOG(kVerbose) << "returning from HandleReceiveFrom"; From c6ff3c018b14c4156ea8e4f1130f26c41ac39f9b Mon Sep 17 00:00:00 2001 From: Qi Ma Date: Fri, 22 Aug 2014 14:35:04 +0100 Subject: [PATCH 05/15] minor style correction --- src/maidsafe/rudp/core/dispatcher.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/maidsafe/rudp/core/dispatcher.cc b/src/maidsafe/rudp/core/dispatcher.cc index 9bf1a541..1bfbc27e 100644 --- a/src/maidsafe/rudp/core/dispatcher.cc +++ b/src/maidsafe/rudp/core/dispatcher.cc @@ -85,8 +85,8 @@ void Dispatcher::HandleReceiveFrom(const asio::const_buffer& data, << " , " << DebugId(socket->PeerNodeId()); socket->HandleReceiveFrom(data, endpoint); } catch (const std::exception& e) { - // TODO - This is only a temp fix. The socket shall be held as shared_ptr across - // owners to be thread safe avoid causing bad_alloc issue + // TODO(Team) - This is only a temp fix. The socket shall be held as shared_ptr + // among owners to be thread safe avoid causing bad_alloc issue LOG(kError) << boost::diagnostic_information(e); } } From e2a1b083c3188918ebbaef97c60c06cd3ff31640 Mon Sep 17 00:00:00 2001 From: Qi Ma Date: Fri, 22 Aug 2014 15:31:44 +0100 Subject: [PATCH 06/15] reduce logs --- src/maidsafe/rudp/connection_manager.cc | 8 ++++---- src/maidsafe/rudp/core/dispatcher.cc | 10 +++++----- src/maidsafe/rudp/managed_connections.cc | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/maidsafe/rudp/connection_manager.cc b/src/maidsafe/rudp/connection_manager.cc index cdce7d1b..7f0964c8 100644 --- a/src/maidsafe/rudp/connection_manager.cc +++ b/src/maidsafe/rudp/connection_manager.cc @@ -114,7 +114,7 @@ bool ConnectionManager::CloseConnection(const NodeId& peer_id) { ConnectionPtr connection(*itr); lock.unlock(); strand_.dispatch([=] { - LOG(kVerbose) << "closing connection to " << DebugId(peer_id); +// LOG(kVerbose) << "closing connection to " << DebugId(peer_id); connection->Close(); }); // NOLINT (Fraser) return true; @@ -195,7 +195,7 @@ Socket* ConnectionManager::GetSocket(const asio::const_buffer& data, const Endpo // if running multiple vaults on same machine, shall not consider symmetric NAT situation return nullptr; } - LOG(kVerbose) << "updating for symmetric"; +// LOG(kVerbose) << "updating for symmetric"; socket_iter = std::find_if(sockets_.begin(), sockets_.end(), [endpoint](const SocketMap::value_type & socket_pair) { return socket_pair.second->PeerEndpoint().address() == endpoint.address() && @@ -235,7 +235,7 @@ Socket* ConnectionManager::GetSocket(const asio::const_buffer& data, const Endpo } if (socket_iter != sockets_.end()) { - LOG(kVerbose) << DebugId(kThisNodeId_) << " find socket for endpoint " << endpoint; +// LOG(kVerbose) << DebugId(kThisNodeId_) << " find socket for endpoint " << endpoint; return socket_iter->second; } else { const unsigned char* p = asio::buffer_cast(data); @@ -331,7 +331,7 @@ void ConnectionManager::RemoveSocket(uint32_t id) { << " of peer " << sockets_[id]->PeerNodeId() << " of id " << id; if (id) { sockets_.erase(id); - LOG(kVerbose) << "removed socket " << id; +// LOG(kVerbose) << "removed socket " << id; } } diff --git a/src/maidsafe/rudp/core/dispatcher.cc b/src/maidsafe/rudp/core/dispatcher.cc index 1bfbc27e..7968703c 100644 --- a/src/maidsafe/rudp/core/dispatcher.cc +++ b/src/maidsafe/rudp/core/dispatcher.cc @@ -69,7 +69,7 @@ void Dispatcher::RemoveSocket(uint32_t id) { void Dispatcher::HandleReceiveFrom(const asio::const_buffer& data, const ip::udp::endpoint& endpoint) { - LOG(kVerbose) << "HandleReceiveFrom " << endpoint; +// LOG(kVerbose) << "HandleReceiveFrom " << endpoint; auto in_use(use_count_); ConnectionManager* connection_manager; { @@ -77,12 +77,12 @@ void Dispatcher::HandleReceiveFrom(const asio::const_buffer& data, connection_manager = connection_manager_; } if (connection_manager) { - LOG(kVerbose) << "trying to fetch socket"; +// LOG(kVerbose) << "trying to fetch socket"; Socket* socket(connection_manager->GetSocket(data, endpoint)); if (socket) { try { - LOG(kVerbose) << "fetched socket : " << socket->PeerEndpoint() - << " , " << DebugId(socket->PeerNodeId()); +// LOG(kVerbose) << "fetched socket : " << socket->PeerEndpoint() +// << " , " << DebugId(socket->PeerNodeId()); socket->HandleReceiveFrom(data, endpoint); } catch (const std::exception& e) { // TODO(Team) - This is only a temp fix. The socket shall be held as shared_ptr @@ -91,7 +91,7 @@ void Dispatcher::HandleReceiveFrom(const asio::const_buffer& data, } } } - LOG(kVerbose) << "returning from HandleReceiveFrom"; +// LOG(kVerbose) << "returning from HandleReceiveFrom"; } } // namespace detail diff --git a/src/maidsafe/rudp/managed_connections.cc b/src/maidsafe/rudp/managed_connections.cc index a6c5faac..8fcc87e6 100644 --- a/src/maidsafe/rudp/managed_connections.cc +++ b/src/maidsafe/rudp/managed_connections.cc @@ -763,7 +763,7 @@ void ManagedConnections::OnConnectionLostSlot(const NodeId& peer_id, TransportPt if (local_callback) { LOG(kVerbose) << "Firing connection_lost_functor_ for " << DebugId(peer_id); asio_service_.service().post([=] { - LOG(kVerbose) << "calling back for " << DebugId(peer_id); +// LOG(kVerbose) << "calling back for " << DebugId(peer_id); local_callback(peer_id); }); } From 10d96e7a671a3a2b5a03e61cfec5ad97183393db Mon Sep 17 00:00:00 2001 From: Qi Ma Date: Tue, 26 Aug 2014 13:02:30 +0100 Subject: [PATCH 07/15] try to catch certain serious system exception as well --- src/maidsafe/rudp/core/dispatcher.cc | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/maidsafe/rudp/core/dispatcher.cc b/src/maidsafe/rudp/core/dispatcher.cc index 7968703c..1a984eab 100644 --- a/src/maidsafe/rudp/core/dispatcher.cc +++ b/src/maidsafe/rudp/core/dispatcher.cc @@ -80,14 +80,17 @@ void Dispatcher::HandleReceiveFrom(const asio::const_buffer& data, // LOG(kVerbose) << "trying to fetch socket"; Socket* socket(connection_manager->GetSocket(data, endpoint)); if (socket) { + // TODO(Team) - This is only a temp fix. The socket shall be held as shared_ptr + // among owners and connection_manager needs to be thread safe + // to avoid causing bad_alloc or even certain serious system exception try { // LOG(kVerbose) << "fetched socket : " << socket->PeerEndpoint() // << " , " << DebugId(socket->PeerNodeId()); socket->HandleReceiveFrom(data, endpoint); } catch (const std::exception& e) { - // TODO(Team) - This is only a temp fix. The socket shall be held as shared_ptr - // among owners to be thread safe avoid causing bad_alloc issue - LOG(kError) << boost::diagnostic_information(e); + LOG(kError) << "caught library exception : " << boost::diagnostic_information(e); + } catch (...) { + LOG(kError) << "caught system level exceptions"; } } } From 45728ee8bc5de4dd44070387149fa7f865a2f700 Mon Sep 17 00:00:00 2001 From: "Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com)" Date: Wed, 27 Aug 2014 16:58:02 +0100 Subject: [PATCH 08/15] Revert "try to catch certain serious system exception as well" This reverts commit 10d96e7a671a3a2b5a03e61cfec5ad97183393db. --- src/maidsafe/rudp/core/dispatcher.cc | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/maidsafe/rudp/core/dispatcher.cc b/src/maidsafe/rudp/core/dispatcher.cc index 1a984eab..7968703c 100644 --- a/src/maidsafe/rudp/core/dispatcher.cc +++ b/src/maidsafe/rudp/core/dispatcher.cc @@ -80,17 +80,14 @@ void Dispatcher::HandleReceiveFrom(const asio::const_buffer& data, // LOG(kVerbose) << "trying to fetch socket"; Socket* socket(connection_manager->GetSocket(data, endpoint)); if (socket) { - // TODO(Team) - This is only a temp fix. The socket shall be held as shared_ptr - // among owners and connection_manager needs to be thread safe - // to avoid causing bad_alloc or even certain serious system exception try { // LOG(kVerbose) << "fetched socket : " << socket->PeerEndpoint() // << " , " << DebugId(socket->PeerNodeId()); socket->HandleReceiveFrom(data, endpoint); } catch (const std::exception& e) { - LOG(kError) << "caught library exception : " << boost::diagnostic_information(e); - } catch (...) { - LOG(kError) << "caught system level exceptions"; + // TODO(Team) - This is only a temp fix. The socket shall be held as shared_ptr + // among owners to be thread safe avoid causing bad_alloc issue + LOG(kError) << boost::diagnostic_information(e); } } } From 6a117bca154c53f7ffabfb77b1447b338b134d87 Mon Sep 17 00:00:00 2001 From: "Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com)" Date: Wed, 27 Aug 2014 16:58:15 +0100 Subject: [PATCH 09/15] Revert "reduce logs" This reverts commit e2a1b083c3188918ebbaef97c60c06cd3ff31640. --- src/maidsafe/rudp/connection_manager.cc | 8 ++++---- src/maidsafe/rudp/core/dispatcher.cc | 10 +++++----- src/maidsafe/rudp/managed_connections.cc | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/maidsafe/rudp/connection_manager.cc b/src/maidsafe/rudp/connection_manager.cc index 7f0964c8..cdce7d1b 100644 --- a/src/maidsafe/rudp/connection_manager.cc +++ b/src/maidsafe/rudp/connection_manager.cc @@ -114,7 +114,7 @@ bool ConnectionManager::CloseConnection(const NodeId& peer_id) { ConnectionPtr connection(*itr); lock.unlock(); strand_.dispatch([=] { -// LOG(kVerbose) << "closing connection to " << DebugId(peer_id); + LOG(kVerbose) << "closing connection to " << DebugId(peer_id); connection->Close(); }); // NOLINT (Fraser) return true; @@ -195,7 +195,7 @@ Socket* ConnectionManager::GetSocket(const asio::const_buffer& data, const Endpo // if running multiple vaults on same machine, shall not consider symmetric NAT situation return nullptr; } -// LOG(kVerbose) << "updating for symmetric"; + LOG(kVerbose) << "updating for symmetric"; socket_iter = std::find_if(sockets_.begin(), sockets_.end(), [endpoint](const SocketMap::value_type & socket_pair) { return socket_pair.second->PeerEndpoint().address() == endpoint.address() && @@ -235,7 +235,7 @@ Socket* ConnectionManager::GetSocket(const asio::const_buffer& data, const Endpo } if (socket_iter != sockets_.end()) { -// LOG(kVerbose) << DebugId(kThisNodeId_) << " find socket for endpoint " << endpoint; + LOG(kVerbose) << DebugId(kThisNodeId_) << " find socket for endpoint " << endpoint; return socket_iter->second; } else { const unsigned char* p = asio::buffer_cast(data); @@ -331,7 +331,7 @@ void ConnectionManager::RemoveSocket(uint32_t id) { << " of peer " << sockets_[id]->PeerNodeId() << " of id " << id; if (id) { sockets_.erase(id); -// LOG(kVerbose) << "removed socket " << id; + LOG(kVerbose) << "removed socket " << id; } } diff --git a/src/maidsafe/rudp/core/dispatcher.cc b/src/maidsafe/rudp/core/dispatcher.cc index 7968703c..1bfbc27e 100644 --- a/src/maidsafe/rudp/core/dispatcher.cc +++ b/src/maidsafe/rudp/core/dispatcher.cc @@ -69,7 +69,7 @@ void Dispatcher::RemoveSocket(uint32_t id) { void Dispatcher::HandleReceiveFrom(const asio::const_buffer& data, const ip::udp::endpoint& endpoint) { -// LOG(kVerbose) << "HandleReceiveFrom " << endpoint; + LOG(kVerbose) << "HandleReceiveFrom " << endpoint; auto in_use(use_count_); ConnectionManager* connection_manager; { @@ -77,12 +77,12 @@ void Dispatcher::HandleReceiveFrom(const asio::const_buffer& data, connection_manager = connection_manager_; } if (connection_manager) { -// LOG(kVerbose) << "trying to fetch socket"; + LOG(kVerbose) << "trying to fetch socket"; Socket* socket(connection_manager->GetSocket(data, endpoint)); if (socket) { try { -// LOG(kVerbose) << "fetched socket : " << socket->PeerEndpoint() -// << " , " << DebugId(socket->PeerNodeId()); + LOG(kVerbose) << "fetched socket : " << socket->PeerEndpoint() + << " , " << DebugId(socket->PeerNodeId()); socket->HandleReceiveFrom(data, endpoint); } catch (const std::exception& e) { // TODO(Team) - This is only a temp fix. The socket shall be held as shared_ptr @@ -91,7 +91,7 @@ void Dispatcher::HandleReceiveFrom(const asio::const_buffer& data, } } } -// LOG(kVerbose) << "returning from HandleReceiveFrom"; + LOG(kVerbose) << "returning from HandleReceiveFrom"; } } // namespace detail diff --git a/src/maidsafe/rudp/managed_connections.cc b/src/maidsafe/rudp/managed_connections.cc index 8fcc87e6..a6c5faac 100644 --- a/src/maidsafe/rudp/managed_connections.cc +++ b/src/maidsafe/rudp/managed_connections.cc @@ -763,7 +763,7 @@ void ManagedConnections::OnConnectionLostSlot(const NodeId& peer_id, TransportPt if (local_callback) { LOG(kVerbose) << "Firing connection_lost_functor_ for " << DebugId(peer_id); asio_service_.service().post([=] { -// LOG(kVerbose) << "calling back for " << DebugId(peer_id); + LOG(kVerbose) << "calling back for " << DebugId(peer_id); local_callback(peer_id); }); } From afe631dc5f4a42e4574489facda7765ec578f9c5 Mon Sep 17 00:00:00 2001 From: "Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com)" Date: Wed, 27 Aug 2014 16:58:17 +0100 Subject: [PATCH 10/15] Revert "minor style correction" This reverts commit c6ff3c018b14c4156ea8e4f1130f26c41ac39f9b. --- src/maidsafe/rudp/core/dispatcher.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/maidsafe/rudp/core/dispatcher.cc b/src/maidsafe/rudp/core/dispatcher.cc index 1bfbc27e..9bf1a541 100644 --- a/src/maidsafe/rudp/core/dispatcher.cc +++ b/src/maidsafe/rudp/core/dispatcher.cc @@ -85,8 +85,8 @@ void Dispatcher::HandleReceiveFrom(const asio::const_buffer& data, << " , " << DebugId(socket->PeerNodeId()); socket->HandleReceiveFrom(data, endpoint); } catch (const std::exception& e) { - // TODO(Team) - This is only a temp fix. The socket shall be held as shared_ptr - // among owners to be thread safe avoid causing bad_alloc issue + // TODO - This is only a temp fix. The socket shall be held as shared_ptr across + // owners to be thread safe avoid causing bad_alloc issue LOG(kError) << boost::diagnostic_information(e); } } From c1a2298d39e64fb1d10987b616852fc532a5c476 Mon Sep 17 00:00:00 2001 From: "Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com)" Date: Wed, 27 Aug 2014 16:58:19 +0100 Subject: [PATCH 11/15] Revert "temp fix to the socket thread safe issue" This reverts commit db7750cf1f8ce0fcb7a205422adf6fca127dc6a1. --- src/maidsafe/rudp/core/dispatcher.cc | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/maidsafe/rudp/core/dispatcher.cc b/src/maidsafe/rudp/core/dispatcher.cc index 9bf1a541..db3a34af 100644 --- a/src/maidsafe/rudp/core/dispatcher.cc +++ b/src/maidsafe/rudp/core/dispatcher.cc @@ -81,14 +81,12 @@ void Dispatcher::HandleReceiveFrom(const asio::const_buffer& data, Socket* socket(connection_manager->GetSocket(data, endpoint)); if (socket) { try { - LOG(kVerbose) << "fetched socket : " << socket->PeerEndpoint() - << " , " << DebugId(socket->PeerNodeId()); - socket->HandleReceiveFrom(data, endpoint); + LOG(kVerbose) << "fetched socket : " << socket->PeerEndpoint() + << " , " << DebugId(socket->PeerNodeId()); } catch (const std::exception& e) { - // TODO - This is only a temp fix. The socket shall be held as shared_ptr across - // owners to be thread safe avoid causing bad_alloc issue LOG(kError) << boost::diagnostic_information(e); } + socket->HandleReceiveFrom(data, endpoint); } } LOG(kVerbose) << "returning from HandleReceiveFrom"; From 36c818fbddcdb45e77eff2669554e9dd0d90016f Mon Sep 17 00:00:00 2001 From: "Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com)" Date: Wed, 27 Aug 2014 16:58:20 +0100 Subject: [PATCH 12/15] Revert "log around remove socket" This reverts commit 655cece2c1003a05f0e54ef88545cb8ce501c3e3. --- src/maidsafe/rudp/connection_manager.cc | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/maidsafe/rudp/connection_manager.cc b/src/maidsafe/rudp/connection_manager.cc index cdce7d1b..c5b5825b 100644 --- a/src/maidsafe/rudp/connection_manager.cc +++ b/src/maidsafe/rudp/connection_manager.cc @@ -327,12 +327,9 @@ uint32_t ConnectionManager::AddSocket(Socket* socket) { void ConnectionManager::RemoveSocket(uint32_t id) { // std::unique_lock lock(mutex_); - LOG(kVerbose) << "removing socket " << sockets_[id]->PeerEndpoint() - << " of peer " << sockets_[id]->PeerNodeId() << " of id " << id; - if (id) { + LOG(kVerbose) << "removing socket " << id; + if (id) sockets_.erase(id); - LOG(kVerbose) << "removed socket " << id; - } } size_t ConnectionManager::NormalConnectionsCount() const { From be612465f9cd9a563bb3d4f8876502be9d9d2752 Mon Sep 17 00:00:00 2001 From: "Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com)" Date: Wed, 27 Aug 2014 16:58:21 +0100 Subject: [PATCH 13/15] Revert "avoiding mis-handling as symmetric NAT when running multiple vaults on same machine" This reverts commit db43b5f2235fa958bd29dc54c2c090ca2e853c90. --- src/maidsafe/rudp/connection_manager.cc | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/src/maidsafe/rudp/connection_manager.cc b/src/maidsafe/rudp/connection_manager.cc index c5b5825b..08686168 100644 --- a/src/maidsafe/rudp/connection_manager.cc +++ b/src/maidsafe/rudp/connection_manager.cc @@ -178,7 +178,8 @@ Socket* ConnectionManager::GetSocket(const asio::const_buffer& data, const Endpo if (handshake_packet.ConnectionReason() == Session::kNormal) { // This is a handshake packet on a newly-added socket LOG(kVerbose) << DebugId(kThisNodeId_) - << " This is a handshake packet on a newly-added socket from " << endpoint; + << " This is a handshake packet on a newly-added socket from " << endpoint + << " with socket id " << socket_id; socket_iter = std::find_if(sockets_.begin(), sockets_.end(), [endpoint](const SocketMap::value_type & socket_pair) { return socket_pair.second->PeerEndpoint() == endpoint && !socket_pair.second->IsConnected(); @@ -186,16 +187,6 @@ Socket* ConnectionManager::GetSocket(const asio::const_buffer& data, const Endpo // If the socket wasn't found, this could be a connect attempt from a peer using symmetric // NAT, so the peer's port may be different to what this node was told to expect. if (socket_iter == sockets_.end()) { - auto count(std::count_if(sockets_.begin(), sockets_.end(), - [endpoint](const SocketMap::value_type & socket_pair) { - return socket_pair.second->PeerEndpoint().address() == endpoint.address(); - })); - if (count > 1) { - LOG(kWarning) << "multiple vaults running on same machine " << count; - // if running multiple vaults on same machine, shall not consider symmetric NAT situation - return nullptr; - } - LOG(kVerbose) << "updating for symmetric"; socket_iter = std::find_if(sockets_.begin(), sockets_.end(), [endpoint](const SocketMap::value_type & socket_pair) { return socket_pair.second->PeerEndpoint().address() == endpoint.address() && @@ -206,6 +197,9 @@ Socket* ConnectionManager::GetSocket(const asio::const_buffer& data, const Endpo LOG(kVerbose) << DebugId(kThisNodeId_) << " Updating peer's endpoint from " << socket_iter->second->PeerEndpoint() << " to " << endpoint; socket_iter->second->UpdatePeerEndpoint(endpoint); + LOG(kVerbose) << DebugId(kThisNodeId_) + << " Peer's endpoint now: " << socket_iter->second->PeerEndpoint() + << " and guessed port = " << socket_iter->second->PeerGuessedPort(); } } } else { // Session::mode_ != kNormal From 9ebbbf1d28cb7e8d9ad050ef62bd2df18d2177c4 Mon Sep 17 00:00:00 2001 From: "Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com)" Date: Wed, 27 Aug 2014 16:58:23 +0100 Subject: [PATCH 14/15] Revert "log messages for tracking down the bad_alloc issue" This reverts commit 911e0db3fa6996d54f0be8aa77e613d3e66b462e. --- src/maidsafe/rudp/connection.cc | 3 +-- src/maidsafe/rudp/connection_manager.cc | 13 +++---------- src/maidsafe/rudp/core/dispatcher.cc | 9 --------- src/maidsafe/rudp/managed_connections.cc | 8 +------- 4 files changed, 5 insertions(+), 28 deletions(-) diff --git a/src/maidsafe/rudp/connection.cc b/src/maidsafe/rudp/connection.cc index 14851cbf..19d6471a 100644 --- a/src/maidsafe/rudp/connection.cc +++ b/src/maidsafe/rudp/connection.cc @@ -98,8 +98,7 @@ void Connection::Close() { } void Connection::DoClose(bool timed_out) { - LOG(kVerbose) << "RUDP Connection::DoClose " << DebugId(socket_.PeerNodeId()) - << " " << socket_.PeerEndpoint(); + LOG(kVerbose) << "RUDP Connection::DoClose"; probe_interval_timer_.cancel(); lifespan_timer_.cancel(); if (std::shared_ptr transport = transport_.lock()) { diff --git a/src/maidsafe/rudp/connection_manager.cc b/src/maidsafe/rudp/connection_manager.cc index 08686168..ae46436b 100644 --- a/src/maidsafe/rudp/connection_manager.cc +++ b/src/maidsafe/rudp/connection_manager.cc @@ -113,10 +113,7 @@ bool ConnectionManager::CloseConnection(const NodeId& peer_id) { ConnectionPtr connection(*itr); lock.unlock(); - strand_.dispatch([=] { - LOG(kVerbose) << "closing connection to " << DebugId(peer_id); - connection->Close(); - }); // NOLINT (Fraser) + strand_.dispatch([=] { connection->Close(); }); // NOLINT (Fraser) return true; } @@ -166,7 +163,7 @@ Socket* ConnectionManager::GetSocket(const asio::const_buffer& data, const Endpo LOG(kError) << DebugId(kThisNodeId_) << " Received a non-RUDP packet from " << endpoint; return nullptr; } -// std::unique_lock lock(mutex_); + SocketMap::const_iterator socket_iter(sockets_.end()); if (socket_id == 0) { HandshakePacket handshake_packet; @@ -178,8 +175,7 @@ Socket* ConnectionManager::GetSocket(const asio::const_buffer& data, const Endpo if (handshake_packet.ConnectionReason() == Session::kNormal) { // This is a handshake packet on a newly-added socket LOG(kVerbose) << DebugId(kThisNodeId_) - << " This is a handshake packet on a newly-added socket from " << endpoint - << " with socket id " << socket_id; + << " This is a handshake packet on a newly-added socket from " << endpoint; socket_iter = std::find_if(sockets_.begin(), sockets_.end(), [endpoint](const SocketMap::value_type & socket_pair) { return socket_pair.second->PeerEndpoint() == endpoint && !socket_pair.second->IsConnected(); @@ -229,7 +225,6 @@ Socket* ConnectionManager::GetSocket(const asio::const_buffer& data, const Endpo } if (socket_iter != sockets_.end()) { - LOG(kVerbose) << DebugId(kThisNodeId_) << " find socket for endpoint " << endpoint; return socket_iter->second; } else { const unsigned char* p = asio::buffer_cast(data); @@ -320,8 +315,6 @@ uint32_t ConnectionManager::AddSocket(Socket* socket) { } void ConnectionManager::RemoveSocket(uint32_t id) { -// std::unique_lock lock(mutex_); - LOG(kVerbose) << "removing socket " << id; if (id) sockets_.erase(id); } diff --git a/src/maidsafe/rudp/core/dispatcher.cc b/src/maidsafe/rudp/core/dispatcher.cc index db3a34af..3f5e6405 100644 --- a/src/maidsafe/rudp/core/dispatcher.cc +++ b/src/maidsafe/rudp/core/dispatcher.cc @@ -69,7 +69,6 @@ void Dispatcher::RemoveSocket(uint32_t id) { void Dispatcher::HandleReceiveFrom(const asio::const_buffer& data, const ip::udp::endpoint& endpoint) { - LOG(kVerbose) << "HandleReceiveFrom " << endpoint; auto in_use(use_count_); ConnectionManager* connection_manager; { @@ -77,19 +76,11 @@ void Dispatcher::HandleReceiveFrom(const asio::const_buffer& data, connection_manager = connection_manager_; } if (connection_manager) { - LOG(kVerbose) << "trying to fetch socket"; Socket* socket(connection_manager->GetSocket(data, endpoint)); if (socket) { - try { - LOG(kVerbose) << "fetched socket : " << socket->PeerEndpoint() - << " , " << DebugId(socket->PeerNodeId()); - } catch (const std::exception& e) { - LOG(kError) << boost::diagnostic_information(e); - } socket->HandleReceiveFrom(data, endpoint); } } - LOG(kVerbose) << "returning from HandleReceiveFrom"; } } // namespace detail diff --git a/src/maidsafe/rudp/managed_connections.cc b/src/maidsafe/rudp/managed_connections.cc index a6c5faac..4fc287b4 100644 --- a/src/maidsafe/rudp/managed_connections.cc +++ b/src/maidsafe/rudp/managed_connections.cc @@ -725,8 +725,6 @@ unsigned ManagedConnections::GetActiveConnectionCount() const { void ManagedConnections::OnConnectionLostSlot(const NodeId& peer_id, TransportPtr transport, bool temporary_connection) { - LOG(kVerbose) << "ManagedConnections::OnConnectionLostSlot " << DebugId(peer_id) - << " is_temporary " << std::boolalpha << temporary_connection; std::lock_guard lock(mutex_); if (transport->IsIdle()) { assert(transport->IsAvailable()); @@ -750,7 +748,6 @@ void ManagedConnections::OnConnectionLostSlot(const NodeId& peer_id, TransportPt << (*itr).second->local_endpoint() << " not " << transport->local_endpoint(); BOOST_ASSERT(false); } - LOG(kVerbose) << "Erasing connection of " << (*itr).second->local_endpoint(); connections_.erase(itr); if (peer_id == chosen_bootstrap_node_id_) chosen_bootstrap_node_id_ = NodeId(); @@ -762,10 +759,7 @@ void ManagedConnections::OnConnectionLostSlot(const NodeId& peer_id, TransportPt if (local_callback) { LOG(kVerbose) << "Firing connection_lost_functor_ for " << DebugId(peer_id); - asio_service_.service().post([=] { - LOG(kVerbose) << "calling back for " << DebugId(peer_id); - local_callback(peer_id); - }); + asio_service_.service().post([=] { local_callback(peer_id); }); } } } From cd7f09ffc09043f46ef10d2742e0177888c5b88e Mon Sep 17 00:00:00 2001 From: "Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com)" Date: Wed, 27 Aug 2014 17:52:40 +0100 Subject: [PATCH 15/15] Backed out my original change to make Dispatcher not get deleted while it is being used in favour of Transport::Close() dispatching the connection teardown onto the appropriate ASIO thread instead. Unfortunately this creates occasional deadlocks, so I disabled all ASIO multithreading, thus solving mutex brittleness in one fell swoop. If the performance hit isn't terrible I'm keeping ASIO single threaded. --- src/maidsafe/rudp/connection_manager.cc | 9 ++----- src/maidsafe/rudp/core/dispatcher.cc | 32 +++++-------------------- src/maidsafe/rudp/core/dispatcher.h | 4 ---- src/maidsafe/rudp/parameters.cc | 2 +- src/maidsafe/rudp/transport.cc | 13 ++++++++-- src/maidsafe/rudp/transport.h | 2 ++ 6 files changed, 22 insertions(+), 40 deletions(-) diff --git a/src/maidsafe/rudp/connection_manager.cc b/src/maidsafe/rudp/connection_manager.cc index ae46436b..2a20209c 100644 --- a/src/maidsafe/rudp/connection_manager.cc +++ b/src/maidsafe/rudp/connection_manager.cc @@ -72,14 +72,9 @@ ConnectionManager::ConnectionManager(std::shared_ptr transport, ConnectionManager::~ConnectionManager() { Close(); } void ConnectionManager::Close() { - { - std::lock_guard lock(mutex_); - for (auto connection : connections_) - strand_.post(std::bind(&Connection::Close, connection)); - } - // Ugly, but we must not reset dispatcher until he's done - while (multiplexer_->dispatcher_.use_count()) std::this_thread::yield(); multiplexer_->dispatcher_.SetConnectionManager(nullptr); + for (auto& connection : connections_) + strand_.post(std::bind(&Connection::Close, connection)); } void ConnectionManager::Connect(const NodeId& peer_id, const Endpoint& peer_endpoint, diff --git a/src/maidsafe/rudp/core/dispatcher.cc b/src/maidsafe/rudp/core/dispatcher.cc index 3f5e6405..b8a71114 100644 --- a/src/maidsafe/rudp/core/dispatcher.cc +++ b/src/maidsafe/rudp/core/dispatcher.cc @@ -38,45 +38,25 @@ namespace rudp { namespace detail { -Dispatcher::Dispatcher() : use_count_(std::make_shared()), - connection_manager_(nullptr) {} +Dispatcher::Dispatcher() : connection_manager_(nullptr) {} void Dispatcher::SetConnectionManager(ConnectionManager *connection_manager) { - std::lock_guard guard(mutex_); connection_manager_ = std::move(connection_manager); } uint32_t Dispatcher::AddSocket(Socket* socket) { - auto in_use(use_count_); - ConnectionManager *connection_manager; - { - std::lock_guard guard(mutex_); - connection_manager = connection_manager_; - } - return connection_manager ? connection_manager->AddSocket(socket) : 0; + return connection_manager_ ? connection_manager_->AddSocket(socket) : 0; } void Dispatcher::RemoveSocket(uint32_t id) { - auto in_use(use_count_); - ConnectionManager *connection_manager; - { - std::lock_guard guard(mutex_); - connection_manager = connection_manager_; - } - if (connection_manager) - connection_manager->RemoveSocket(id); + if (connection_manager_) + connection_manager_->RemoveSocket(id); } void Dispatcher::HandleReceiveFrom(const asio::const_buffer& data, const ip::udp::endpoint& endpoint) { - auto in_use(use_count_); - ConnectionManager* connection_manager; - { - std::lock_guard guard(mutex_); - connection_manager = connection_manager_; - } - if (connection_manager) { - Socket* socket(connection_manager->GetSocket(data, endpoint)); + if (connection_manager_) { + Socket* socket(connection_manager_->GetSocket(data, endpoint)); if (socket) { socket->HandleReceiveFrom(data, endpoint); } diff --git a/src/maidsafe/rudp/core/dispatcher.h b/src/maidsafe/rudp/core/dispatcher.h index f047c8d6..c3fc95fc 100644 --- a/src/maidsafe/rudp/core/dispatcher.h +++ b/src/maidsafe/rudp/core/dispatcher.h @@ -42,8 +42,6 @@ class Dispatcher { void SetConnectionManager(ConnectionManager* connection_manager); - size_t use_count() const { return use_count_.use_count()-1; } - // Add a socket. Returns a new unique id for the socket. uint32_t AddSocket(Socket* socket); @@ -59,8 +57,6 @@ class Dispatcher { Dispatcher(const Dispatcher&); Dispatcher& operator=(const Dispatcher&); - std::mutex mutex_; - std::shared_ptr use_count_; ConnectionManager* connection_manager_; }; diff --git a/src/maidsafe/rudp/parameters.cc b/src/maidsafe/rudp/parameters.cc index 59939aa9..4d46f703 100644 --- a/src/maidsafe/rudp/parameters.cc +++ b/src/maidsafe/rudp/parameters.cc @@ -24,7 +24,7 @@ namespace maidsafe { namespace rudp { -uint32_t Parameters::thread_count(2); +uint32_t Parameters::thread_count(1); int Parameters::max_transports(10); const uint32_t Parameters::maximum_segment_size(16); uint32_t Parameters::default_window_size(4*Parameters::maximum_segment_size); diff --git a/src/maidsafe/rudp/transport.cc b/src/maidsafe/rudp/transport.cc index dd3cd478..77796272 100644 --- a/src/maidsafe/rudp/transport.cc +++ b/src/maidsafe/rudp/transport.cc @@ -259,8 +259,12 @@ void Transport::Close() { on_connection_added_ = nullptr; on_connection_lost_ = nullptr; } - if (connection_manager_) - connection_manager_->Close(); + if (connection_manager_) { + std::promise _done; + std::future done=_done.get_future(); + strand_.dispatch(std::bind(&Transport::DoClose, this, std::ref(_done))); + done.wait(); + } if (multiplexer_) { strand_.post(std::bind(&Multiplexer::Close, multiplexer_)); while (IsValid(multiplexer_->external_endpoint())) @@ -268,6 +272,11 @@ void Transport::Close() { } } +void Transport::DoClose(std::promise& done) { + connection_manager_->Close(); + done.set_value(); +} + void Transport::Connect(const NodeId& peer_id, const EndpointPair& peer_endpoint_pair, const std::string& validation_data) { strand_.dispatch(std::bind(&Transport::DoConnect, shared_from_this(), peer_id, peer_endpoint_pair, diff --git a/src/maidsafe/rudp/transport.h b/src/maidsafe/rudp/transport.h index bc528a2f..836b9762 100644 --- a/src/maidsafe/rudp/transport.h +++ b/src/maidsafe/rudp/transport.h @@ -127,6 +127,8 @@ class Transport : public std::enable_shared_from_this { const boost::asio::ip::udp::endpoint& bootstrap_endpoint, const boost::posix_time::time_duration& lifespan); void DetectNatType(NodeId const& peer_id); + + void DoClose(std::promise& done); void DoConnect(const NodeId& peer_id, const EndpointPair& peer_endpoint_pair, const std::string& validation_data);