Skip to content

Commit

Permalink
Implement interruptible MilliSleep without Boost chrono
Browse files Browse the repository at this point in the history
  • Loading branch information
jamescowens committed Sep 13, 2021
1 parent 8104b57 commit 148e0cd
Show file tree
Hide file tree
Showing 13 changed files with 72 additions and 40 deletions.
10 changes: 5 additions & 5 deletions src/gridcoin/upgrade.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ void Upgrade::SnapshotMain()
std::cout << progress.Status() << std::flush;
}

MilliSleep(1000);
UninterruptibleSleep(std::chrono::milliseconds{1000});
}

// This is needed in some spots as the download can complete before the next progress update occurs so just 100% here
Expand Down Expand Up @@ -248,7 +248,7 @@ void Upgrade::SnapshotMain()
std::cout << progress.Status() << std::flush;
}

MilliSleep(1000);
UninterruptibleSleep(std::chrono::milliseconds{1000});
}

if (progress.Update(100)) std::cout << progress.Status() << std::flush;
Expand All @@ -272,7 +272,7 @@ void Upgrade::SnapshotMain()
std::cout << progress.Status() << std::flush;
}

MilliSleep(1000);
UninterruptibleSleep(std::chrono::milliseconds{1000});
}

if (progress.Update(100)) std::cout << progress.Status() << std::flush;
Expand All @@ -296,7 +296,7 @@ void Upgrade::SnapshotMain()
if (progress.Update(ExtractStatus.GetSnapshotExtractProgress()))
std::cout << progress.Status() << std::flush;

MilliSleep(1000);
UninterruptibleSleep(std::chrono::milliseconds{1000});
}

if (progress.Update(100)) std::cout << progress.Status() << std::flush;
Expand Down Expand Up @@ -369,7 +369,7 @@ void Upgrade::WorkerMain(Progress& progress)
}
}

MilliSleep(1000);
UninterruptibleSleep(std::chrono::milliseconds{1000});
}
}

Expand Down
9 changes: 5 additions & 4 deletions src/gridcoinresearchd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,11 @@ bool AppInit(int argc, char* argv[])

PrintException(nullptr, "AppInit()");
}
if(fRet)
{
while (!ShutdownRequested())
MilliSleep(500);

if (fRet) {
while (!ShutdownRequested()) {
UninterruptibleSleep(std::chrono::milliseconds{500});
}
}

Shutdown(nullptr);
Expand Down
23 changes: 20 additions & 3 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ extern void ThreadAppInit2(void* parg);
using namespace std;
CWallet* pwalletMain;
CClientUIInterface uiInterface;
extern std::condition_variable sleep_interrupt;
extern bool fQtActive;
extern bool bGridcoinCoreInitComplete;
extern bool fConfChange;
Expand Down Expand Up @@ -107,13 +108,29 @@ void Shutdown(void* parg)

fShutdown = true;

// clean up the threads running serviceQueue:
// Signal to the scheduler to stop.
LogPrintf("INFO: %s: Stopping the scheduler.", __func__);
scheduler.stop();

// clean up any remaining threads running serviceQueue:
LogPrintf("INFO: %s: Cleaning up any remaining threads in scheduler.", __func__);
threadGroup.interrupt_all();
threadGroup.join_all();

LogPrintf("INFO: %s: Flushing wallet database.", __func__);
bitdb.Flush(false);

// Signal the sleep interrupt condition variable to interrupt sleeping threads.
LogPrintf("INFO: %s: Interrupting sleeping threads.", __func__);
sleep_interrupt.notify_all();

LogPrintf("INFO: %s: Stopping net (node) threads.", __func__);
StopNode();

LogPrintf("INFO: %s: Final flush of wallet database and closing wallet database file.", __func__);
bitdb.Flush(true);

LogPrintf("INFO: %s: Stopping RPC threads.", __func__);
StopRPCThreads();

// This is necessary here to prevent a snapshot download from failing at the cleanup
Expand All @@ -127,14 +144,14 @@ void Shutdown(void* parg)
// This causes issues on daemons where it tries to create a second
// lock file.
//CTxDB().Close();
MilliSleep(50);
UninterruptibleSleep(std::chrono::milliseconds{50});
LogPrintf("Gridcoin exited");
fExit = true;
}
else
{
while (!fExit)
MilliSleep(100);
UninterruptibleSleep(std::chrono::milliseconds{100});
}
}

Expand Down
20 changes: 10 additions & 10 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1111,7 +1111,7 @@ void ThreadSocketHandler2(void* parg)
pnode->Release();
}

MilliSleep(10);
UninterruptibleSleep(std::chrono::milliseconds{10});
}
}

Expand Down Expand Up @@ -1352,8 +1352,8 @@ void DumpAddresses()
CAddrDB adb;
adb.Write(addrman);

LogPrint(BCLog::LogFlags::NET, "Flushed %d addresses to peers.dat %" PRId64 "ms", addrman.size(), GetTimeMillis() - nStart);

LogPrint(BCLog::LogFlags::NET, "Flushed %d addresses to peers.dat %" PRId64 "ms",
addrman.size(), GetTimeMillis() - nStart);
}

void ThreadDumpAddress2(void* parg)
Expand Down Expand Up @@ -1498,12 +1498,12 @@ void ThreadOpenConnections2(void* parg)
OpenNetworkConnection(addr, nullptr, strAddr.c_str());
for (int i = 0; i < 10 && i < nLoop; i++)
{
MilliSleep(500);
UninterruptibleSleep(std::chrono::milliseconds{500});
if (fShutdown)
return;
}
}
MilliSleep(500);
UninterruptibleSleep(std::chrono::milliseconds{500});
}
}

Expand All @@ -1512,7 +1512,7 @@ void ThreadOpenConnections2(void* parg)
while (true)
{
ProcessOneShot();
MilliSleep(500);
UninterruptibleSleep(std::chrono::milliseconds{500});

if (fShutdown)
return;
Expand Down Expand Up @@ -1638,7 +1638,7 @@ void ThreadOpenAddedConnections2(void* parg)
CAddress addr;
CSemaphoreGrant grant(*semOutbound);
OpenNetworkConnection(addr, &grant, strAddNode.c_str());
MilliSleep(500);
UninterruptibleSleep(std::chrono::milliseconds{500});
}
MilliSleep(120000); // Retry every 2 minutes
}
Expand Down Expand Up @@ -1682,7 +1682,7 @@ void ThreadOpenAddedConnections2(void* parg)
{
CSemaphoreGrant grant(*semOutbound);
OpenNetworkConnection(CAddress(*(vserv.begin())), &grant);
MilliSleep(500);
UninterruptibleSleep(std::chrono::milliseconds{500});
if (fShutdown)
return;
}
Expand Down Expand Up @@ -1817,7 +1817,7 @@ void ThreadMessageHandler2(void* parg)

// Wait and allow messages to bunch up.
// we're sleeping, but we must always check fShutdown after doing this.
MilliSleep(100);
UninterruptibleSleep(std::chrono::milliseconds{100});
if (fRequestShutdown)
StartShutdown();
if (fShutdown)
Expand Down Expand Up @@ -2066,7 +2066,7 @@ bool StopNode()

netThreads->interruptAll();
netThreads->removeAll();
MilliSleep(50);
UninterruptibleSleep(std::chrono::milliseconds{50});
DumpAddresses();
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/qt/bitcoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ int StartGridcoinQt(int argc, char *argv[], QApplication& app, OptionsModel& opt
// The sleep here has to be pretty short to avoid a buffer overflow crash with
// fast CPUs due to too many events. It originally was set to 300 ms and has
// been reduced to 100 ms.
MilliSleep(100);
UninterruptibleSleep(std::chrono::milliseconds{100});
}

if (splashref)
Expand Down
2 changes: 1 addition & 1 deletion src/qt/qtipcserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ static void ipcThread2(void* pArg)
if (mq->timed_receive(&buffer, sizeof(buffer), nSize, nPriority, d))
{
uiInterface.ThreadSafeHandleURI(std::string(buffer, nSize));
MilliSleep(1000);
UninterruptibleSleep(std::chrono::seconds{1});
}

if (fShutdown)
Expand Down
8 changes: 4 additions & 4 deletions src/qt/upgradeqt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ bool UpgradeQt::SnapshotMain(QApplication& SnapshotApp)
}
}

MilliSleep(poll_delay);
UninterruptibleSleep(std::chrono::milliseconds{poll_delay});
}

m_Progress->reset();
Expand Down Expand Up @@ -186,7 +186,7 @@ bool UpgradeQt::SnapshotMain(QApplication& SnapshotApp)
}
}

MilliSleep(poll_delay);
UninterruptibleSleep(std::chrono::milliseconds{poll_delay});
}

m_Progress->reset();
Expand Down Expand Up @@ -232,7 +232,7 @@ bool UpgradeQt::SnapshotMain(QApplication& SnapshotApp)
}
}

MilliSleep(poll_delay);
UninterruptibleSleep(std::chrono::milliseconds{poll_delay});
}

m_Progress->reset();
Expand Down Expand Up @@ -297,7 +297,7 @@ bool UpgradeQt::SnapshotMain(QApplication& SnapshotApp)

SnapshotApp.processEvents();

MilliSleep(poll_delay);
UninterruptibleSleep(std::chrono::milliseconds{poll_delay});
}

m_Progress->setValue(100);
Expand Down
2 changes: 1 addition & 1 deletion src/rpc/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ void ServiceConnection(AcceptedConnection *conn)
If this results in a DOS the user really
shouldn't have their RPC port exposed.*/
if (gArgs.GetArgs("-rpcpassword").size() < 20)
MilliSleep(250);
UninterruptibleSleep(std::chrono::milliseconds{250});

conn->stream() << HTTPReply(HTTP_UNAUTHORIZED, "", false) << std::flush;
break;
Expand Down
2 changes: 1 addition & 1 deletion src/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class CInit

if (seed_successful) break;

MilliSleep(100);
UninterruptibleSleep(std::chrono::milliseconds{100});
}

if (!seed_successful) {
Expand Down
22 changes: 16 additions & 6 deletions src/util/time.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,26 @@
#include <ctime>
#include <thread>
#include <inttypes.h>
#include <condition_variable>

#include <tinyformat.h>
#include "logging.h"

/**
* The global condition variable for MilliSleep, a non-Boost implementation of interruptible sleep
*/
std::condition_variable sleep_interrupt;
/**
* The non-recursive mutex for MilliSleep, a non-Boost implementation of interruptible sleep
*/
std::mutex cs_interruptible_sleep;

void MilliSleep(int64_t n)
{
std::unique_lock<std::mutex> lock(cs_interruptible_sleep);
sleep_interrupt.wait_for(lock, std::chrono::milliseconds{n});
}

void UninterruptibleSleep(const std::chrono::microseconds& n) { std::this_thread::sleep_for(n); }

static std::atomic<int64_t> nMockTime(0); //!< For testing
Expand Down Expand Up @@ -294,9 +310,3 @@ int64_t MilliTimer::GetElapsedTime(const std::string& log_string, const std::str

// Create global timer instance.
MilliTimer g_timer;

void MilliSleep(int64_t n)
{
std::this_thread::sleep_for(std::chrono::milliseconds(n));
}

8 changes: 6 additions & 2 deletions src/util/time.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@

using namespace std::chrono_literals;

/**
* @brief Implements an interruptible sleep using the std::mutex, std::condition_variable pattern
* @param int64_t n: number of milliseconds to sleep.
*/
void MilliSleep(int64_t n);

void UninterruptibleSleep(const std::chrono::microseconds& n);

/**
Expand Down Expand Up @@ -92,8 +98,6 @@ struct timeval MillisToTimeval(std::chrono::milliseconds ms);
/** Sanity check epoch match normal Unix epoch */
bool ChronoSanityCheck();

void MilliSleep(int64_t n);

/**
* @brief The MilliTimer class
*
Expand Down
2 changes: 1 addition & 1 deletion src/wallet/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ bool CDB::Rewrite(const string& strFile, const char* pszSkip)
return fSuccess;
}
}
MilliSleep(100);
UninterruptibleSleep(std::chrono::milliseconds{100});
}
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion src/wallet/walletdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ void ThreadFlushWalletDB(void* parg)
int64_t nLastWalletUpdate = GetTime();
while (!fShutdown)
{
MilliSleep(500);
UninterruptibleSleep(std::chrono::milliseconds{500});

if (nLastSeen != nWalletDBUpdated)
{
Expand Down

0 comments on commit 148e0cd

Please sign in to comment.