From 148e0cd05b337c3e52080babb038206b34b2e779 Mon Sep 17 00:00:00 2001 From: jamescowens Date: Sun, 12 Sep 2021 12:53:16 -0400 Subject: [PATCH] Implement interruptible MilliSleep without Boost chrono --- src/gridcoin/upgrade.cpp | 10 +++++----- src/gridcoinresearchd.cpp | 9 +++++---- src/init.cpp | 23 ++++++++++++++++++++--- src/net.cpp | 20 ++++++++++---------- src/qt/bitcoin.cpp | 2 +- src/qt/qtipcserver.cpp | 2 +- src/qt/upgradeqt.cpp | 8 ++++---- src/rpc/server.cpp | 2 +- src/util.cpp | 2 +- src/util/time.cpp | 22 ++++++++++++++++------ src/util/time.h | 8 ++++++-- src/wallet/db.cpp | 2 +- src/wallet/walletdb.cpp | 2 +- 13 files changed, 72 insertions(+), 40 deletions(-) diff --git a/src/gridcoin/upgrade.cpp b/src/gridcoin/upgrade.cpp index 2cb1170637..680bf77381 100644 --- a/src/gridcoin/upgrade.cpp +++ b/src/gridcoin/upgrade.cpp @@ -220,7 +220,7 @@ void Upgrade::SnapshotMain() std::cout << progress.Status() << std::flush; } - MilliSleep(1000); + UninterruptibleSleep(std::chrono::milliseconds{1000}); } // This is needed in some spots as the download can complete before the next progress update occurs so just 100% here @@ -248,7 +248,7 @@ void Upgrade::SnapshotMain() std::cout << progress.Status() << std::flush; } - MilliSleep(1000); + UninterruptibleSleep(std::chrono::milliseconds{1000}); } if (progress.Update(100)) std::cout << progress.Status() << std::flush; @@ -272,7 +272,7 @@ void Upgrade::SnapshotMain() std::cout << progress.Status() << std::flush; } - MilliSleep(1000); + UninterruptibleSleep(std::chrono::milliseconds{1000}); } if (progress.Update(100)) std::cout << progress.Status() << std::flush; @@ -296,7 +296,7 @@ void Upgrade::SnapshotMain() if (progress.Update(ExtractStatus.GetSnapshotExtractProgress())) std::cout << progress.Status() << std::flush; - MilliSleep(1000); + UninterruptibleSleep(std::chrono::milliseconds{1000}); } if (progress.Update(100)) std::cout << progress.Status() << std::flush; @@ -369,7 +369,7 @@ void Upgrade::WorkerMain(Progress& progress) } } - MilliSleep(1000); + UninterruptibleSleep(std::chrono::milliseconds{1000}); } } diff --git a/src/gridcoinresearchd.cpp b/src/gridcoinresearchd.cpp index 6e53d95b3e..2a78870c4d 100644 --- a/src/gridcoinresearchd.cpp +++ b/src/gridcoinresearchd.cpp @@ -214,10 +214,11 @@ bool AppInit(int argc, char* argv[]) PrintException(nullptr, "AppInit()"); } - if(fRet) - { - while (!ShutdownRequested()) - MilliSleep(500); + + if (fRet) { + while (!ShutdownRequested()) { + UninterruptibleSleep(std::chrono::milliseconds{500}); + } } Shutdown(nullptr); diff --git a/src/init.cpp b/src/init.cpp index c40035604d..2cc7bcd205 100755 --- a/src/init.cpp +++ b/src/init.cpp @@ -37,6 +37,7 @@ extern void ThreadAppInit2(void* parg); using namespace std; CWallet* pwalletMain; CClientUIInterface uiInterface; +extern std::condition_variable sleep_interrupt; extern bool fQtActive; extern bool bGridcoinCoreInitComplete; extern bool fConfChange; @@ -107,13 +108,29 @@ void Shutdown(void* parg) fShutdown = true; - // clean up the threads running serviceQueue: + // Signal to the scheduler to stop. + LogPrintf("INFO: %s: Stopping the scheduler.", __func__); + scheduler.stop(); + + // clean up any remaining threads running serviceQueue: + LogPrintf("INFO: %s: Cleaning up any remaining threads in scheduler.", __func__); threadGroup.interrupt_all(); threadGroup.join_all(); + LogPrintf("INFO: %s: Flushing wallet database.", __func__); bitdb.Flush(false); + + // Signal the sleep interrupt condition variable to interrupt sleeping threads. + LogPrintf("INFO: %s: Interrupting sleeping threads.", __func__); + sleep_interrupt.notify_all(); + + LogPrintf("INFO: %s: Stopping net (node) threads.", __func__); StopNode(); + + LogPrintf("INFO: %s: Final flush of wallet database and closing wallet database file.", __func__); bitdb.Flush(true); + + LogPrintf("INFO: %s: Stopping RPC threads.", __func__); StopRPCThreads(); // This is necessary here to prevent a snapshot download from failing at the cleanup @@ -127,14 +144,14 @@ void Shutdown(void* parg) // This causes issues on daemons where it tries to create a second // lock file. //CTxDB().Close(); - MilliSleep(50); + UninterruptibleSleep(std::chrono::milliseconds{50}); LogPrintf("Gridcoin exited"); fExit = true; } else { while (!fExit) - MilliSleep(100); + UninterruptibleSleep(std::chrono::milliseconds{100}); } } diff --git a/src/net.cpp b/src/net.cpp index b347501901..081e8b62f7 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1111,7 +1111,7 @@ void ThreadSocketHandler2(void* parg) pnode->Release(); } - MilliSleep(10); + UninterruptibleSleep(std::chrono::milliseconds{10}); } } @@ -1352,8 +1352,8 @@ void DumpAddresses() CAddrDB adb; adb.Write(addrman); - LogPrint(BCLog::LogFlags::NET, "Flushed %d addresses to peers.dat %" PRId64 "ms", addrman.size(), GetTimeMillis() - nStart); - + LogPrint(BCLog::LogFlags::NET, "Flushed %d addresses to peers.dat %" PRId64 "ms", + addrman.size(), GetTimeMillis() - nStart); } void ThreadDumpAddress2(void* parg) @@ -1498,12 +1498,12 @@ void ThreadOpenConnections2(void* parg) OpenNetworkConnection(addr, nullptr, strAddr.c_str()); for (int i = 0; i < 10 && i < nLoop; i++) { - MilliSleep(500); + UninterruptibleSleep(std::chrono::milliseconds{500}); if (fShutdown) return; } } - MilliSleep(500); + UninterruptibleSleep(std::chrono::milliseconds{500}); } } @@ -1512,7 +1512,7 @@ void ThreadOpenConnections2(void* parg) while (true) { ProcessOneShot(); - MilliSleep(500); + UninterruptibleSleep(std::chrono::milliseconds{500}); if (fShutdown) return; @@ -1638,7 +1638,7 @@ void ThreadOpenAddedConnections2(void* parg) CAddress addr; CSemaphoreGrant grant(*semOutbound); OpenNetworkConnection(addr, &grant, strAddNode.c_str()); - MilliSleep(500); + UninterruptibleSleep(std::chrono::milliseconds{500}); } MilliSleep(120000); // Retry every 2 minutes } @@ -1682,7 +1682,7 @@ void ThreadOpenAddedConnections2(void* parg) { CSemaphoreGrant grant(*semOutbound); OpenNetworkConnection(CAddress(*(vserv.begin())), &grant); - MilliSleep(500); + UninterruptibleSleep(std::chrono::milliseconds{500}); if (fShutdown) return; } @@ -1817,7 +1817,7 @@ void ThreadMessageHandler2(void* parg) // Wait and allow messages to bunch up. // we're sleeping, but we must always check fShutdown after doing this. - MilliSleep(100); + UninterruptibleSleep(std::chrono::milliseconds{100}); if (fRequestShutdown) StartShutdown(); if (fShutdown) @@ -2066,7 +2066,7 @@ bool StopNode() netThreads->interruptAll(); netThreads->removeAll(); - MilliSleep(50); + UninterruptibleSleep(std::chrono::milliseconds{50}); DumpAddresses(); return true; } diff --git a/src/qt/bitcoin.cpp b/src/qt/bitcoin.cpp index 77888c9714..dc851cada1 100755 --- a/src/qt/bitcoin.cpp +++ b/src/qt/bitcoin.cpp @@ -629,7 +629,7 @@ int StartGridcoinQt(int argc, char *argv[], QApplication& app, OptionsModel& opt // The sleep here has to be pretty short to avoid a buffer overflow crash with // fast CPUs due to too many events. It originally was set to 300 ms and has // been reduced to 100 ms. - MilliSleep(100); + UninterruptibleSleep(std::chrono::milliseconds{100}); } if (splashref) diff --git a/src/qt/qtipcserver.cpp b/src/qt/qtipcserver.cpp index f66d7c14a1..e9d3387f56 100644 --- a/src/qt/qtipcserver.cpp +++ b/src/qt/qtipcserver.cpp @@ -98,7 +98,7 @@ static void ipcThread2(void* pArg) if (mq->timed_receive(&buffer, sizeof(buffer), nSize, nPriority, d)) { uiInterface.ThreadSafeHandleURI(std::string(buffer, nSize)); - MilliSleep(1000); + UninterruptibleSleep(std::chrono::seconds{1}); } if (fShutdown) diff --git a/src/qt/upgradeqt.cpp b/src/qt/upgradeqt.cpp index 5a734ed482..78ab3881ca 100644 --- a/src/qt/upgradeqt.cpp +++ b/src/qt/upgradeqt.cpp @@ -140,7 +140,7 @@ bool UpgradeQt::SnapshotMain(QApplication& SnapshotApp) } } - MilliSleep(poll_delay); + UninterruptibleSleep(std::chrono::milliseconds{poll_delay}); } m_Progress->reset(); @@ -186,7 +186,7 @@ bool UpgradeQt::SnapshotMain(QApplication& SnapshotApp) } } - MilliSleep(poll_delay); + UninterruptibleSleep(std::chrono::milliseconds{poll_delay}); } m_Progress->reset(); @@ -232,7 +232,7 @@ bool UpgradeQt::SnapshotMain(QApplication& SnapshotApp) } } - MilliSleep(poll_delay); + UninterruptibleSleep(std::chrono::milliseconds{poll_delay}); } m_Progress->reset(); @@ -297,7 +297,7 @@ bool UpgradeQt::SnapshotMain(QApplication& SnapshotApp) SnapshotApp.processEvents(); - MilliSleep(poll_delay); + UninterruptibleSleep(std::chrono::milliseconds{poll_delay}); } m_Progress->setValue(100); diff --git a/src/rpc/server.cpp b/src/rpc/server.cpp index f1b78d9513..51f14d3a39 100644 --- a/src/rpc/server.cpp +++ b/src/rpc/server.cpp @@ -827,7 +827,7 @@ void ServiceConnection(AcceptedConnection *conn) If this results in a DOS the user really shouldn't have their RPC port exposed.*/ if (gArgs.GetArgs("-rpcpassword").size() < 20) - MilliSleep(250); + UninterruptibleSleep(std::chrono::milliseconds{250}); conn->stream() << HTTPReply(HTTP_UNAUTHORIZED, "", false) << std::flush; break; diff --git a/src/util.cpp b/src/util.cpp index d557461034..794a4547e7 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -90,7 +90,7 @@ class CInit if (seed_successful) break; - MilliSleep(100); + UninterruptibleSleep(std::chrono::milliseconds{100}); } if (!seed_successful) { diff --git a/src/util/time.cpp b/src/util/time.cpp index f8405f4bfb..c35e22c208 100644 --- a/src/util/time.cpp +++ b/src/util/time.cpp @@ -17,10 +17,26 @@ #include #include #include +#include #include #include "logging.h" +/** + * The global condition variable for MilliSleep, a non-Boost implementation of interruptible sleep + */ +std::condition_variable sleep_interrupt; +/** + * The non-recursive mutex for MilliSleep, a non-Boost implementation of interruptible sleep + */ +std::mutex cs_interruptible_sleep; + +void MilliSleep(int64_t n) +{ + std::unique_lock lock(cs_interruptible_sleep); + sleep_interrupt.wait_for(lock, std::chrono::milliseconds{n}); +} + void UninterruptibleSleep(const std::chrono::microseconds& n) { std::this_thread::sleep_for(n); } static std::atomic nMockTime(0); //!< For testing @@ -294,9 +310,3 @@ int64_t MilliTimer::GetElapsedTime(const std::string& log_string, const std::str // Create global timer instance. MilliTimer g_timer; - -void MilliSleep(int64_t n) -{ - std::this_thread::sleep_for(std::chrono::milliseconds(n)); -} - diff --git a/src/util/time.h b/src/util/time.h index 5e86ea59a0..2cfcf56e00 100644 --- a/src/util/time.h +++ b/src/util/time.h @@ -17,6 +17,12 @@ using namespace std::chrono_literals; +/** + * @brief Implements an interruptible sleep using the std::mutex, std::condition_variable pattern + * @param int64_t n: number of milliseconds to sleep. + */ +void MilliSleep(int64_t n); + void UninterruptibleSleep(const std::chrono::microseconds& n); /** @@ -92,8 +98,6 @@ struct timeval MillisToTimeval(std::chrono::milliseconds ms); /** Sanity check epoch match normal Unix epoch */ bool ChronoSanityCheck(); -void MilliSleep(int64_t n); - /** * @brief The MilliTimer class * diff --git a/src/wallet/db.cpp b/src/wallet/db.cpp index 4a77b5aabe..00ec2462ce 100644 --- a/src/wallet/db.cpp +++ b/src/wallet/db.cpp @@ -441,7 +441,7 @@ bool CDB::Rewrite(const string& strFile, const char* pszSkip) return fSuccess; } } - MilliSleep(100); + UninterruptibleSleep(std::chrono::milliseconds{100}); } return false; } diff --git a/src/wallet/walletdb.cpp b/src/wallet/walletdb.cpp index bb6b9243a5..863960fd0b 100644 --- a/src/wallet/walletdb.cpp +++ b/src/wallet/walletdb.cpp @@ -638,7 +638,7 @@ void ThreadFlushWalletDB(void* parg) int64_t nLastWalletUpdate = GetTime(); while (!fShutdown) { - MilliSleep(500); + UninterruptibleSleep(std::chrono::milliseconds{500}); if (nLastSeen != nWalletDBUpdated) {