Skip to content

Commit

Permalink
Add threadinterrupt.h/cpp from Bitcoin upstream and
Browse files Browse the repository at this point in the history
Reimplement MilliSleep as a nodiscard wrapper around CThreadInterrupt
g_thread_interrupt.sleep_for()
  • Loading branch information
jamescowens committed Sep 14, 2021
1 parent 37e9835 commit 99f9e11
Show file tree
Hide file tree
Showing 20 changed files with 172 additions and 67 deletions.
2 changes: 2 additions & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ GRIDCOIN_CORE_H = \
support/cleanse.h \
support/lockedpool.h \
sync.h \
threadinterrupt.h \
threadsafety.h \
tinyformat.h \
txdb.h \
Expand Down Expand Up @@ -268,6 +269,7 @@ GRIDCOIN_CORE_CPP = addrdb.cpp \
support/cleanse.cpp \
support/lockedpool.cpp \
sync.cpp \
threadinterrupt.cpp \
txdb-leveldb.cpp \
uint256.cpp \
util/settings.cpp \
Expand Down
13 changes: 5 additions & 8 deletions src/gridcoin/scraper/scraper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ namespace boostio = boost::iostreams;
fs::path pathDataDir = {};
fs::path pathScraper = {};

extern bool fShutdown;
extern CWallet* pwalletMain;

CCriticalSection cs_Scraper;
Expand Down Expand Up @@ -169,8 +168,6 @@ bool ProcessProjectRacFileByCPID(const std::string& project, const fs::path& fil
bool AuthenticationETagUpdate(const std::string& project, const std::string& etag);
void AuthenticationETagClear();

extern void MilliSleep(int64_t n);

// Need to access from rpcblockchain.cpp
extern UniValue SuperblockToJson(const Superblock& superblock);

Expand Down Expand Up @@ -1037,7 +1034,7 @@ void Scraper(bool bSingleShot)
uiInterface.NotifyScraperEvent(scrapereventtypes::OutOfSync, CT_UPDATING, {});

_log(logattribute::INFO, "Scraper", "Wallet not in sync. Sleeping for 8 seconds.");
MilliSleep(8000);
if (!MilliSleep(8000)) return;
}

// Now that we are in sync, refresh from the AppCache and check for proper directory/file structure.
Expand Down Expand Up @@ -1112,7 +1109,7 @@ void Scraper(bool bSingleShot)
_log(logattribute::INFO, "Scraper", "Superblock not needed. age=" + ToString(sbage));
_log(logattribute::INFO, "Scraper", "Sleeping for " + ToString(nScraperSleep / 1000) +" seconds");

MilliSleep(nScraperSleep);
if (!MilliSleep(nScraperSleep)) return;
}
}

Expand Down Expand Up @@ -1232,7 +1229,7 @@ void Scraper(bool bSingleShot)
ScraperHousekeeping();

_log(logattribute::INFO, "Scraper", "Sleeping for " + ToString(nScraperSleep / 1000) +" seconds");
MilliSleep(nScraperSleep);
if (!MilliSleep(nScraperSleep)) return;
}
else
// This will break from the outer while loop if in singleshot mode and end execution after one pass.
Expand Down Expand Up @@ -1279,7 +1276,7 @@ void ScraperSubscriber()
uiInterface.NotifyScraperEvent(scrapereventtypes::OutOfSync, CT_NEW, {});

_log(logattribute::INFO, "ScraperSubscriber", "Wallet not in sync. Sleeping for 8 seconds.");
MilliSleep(8000);
if (!MilliSleep(8000)) return;
}

// ScraperHousekeeping items are only run in this thread if not handled by the Scraper() thread.
Expand Down Expand Up @@ -1309,7 +1306,7 @@ void ScraperSubscriber()
// Use the same sleep interval configured for the scraper.
_log(logattribute::INFO, "ScraperSubscriber", "Sleeping for " + ToString(nScraperSleep / 1000) +" seconds");

MilliSleep(nScraperSleep);
if (!MilliSleep(nScraperSleep)) return;
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/gridcoin/upgrade.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ void Upgrade::SnapshotMain()
std::cout << progress.Status() << std::flush;
}

MilliSleep(1000);
UninterruptibleSleep(std::chrono::milliseconds{1000});
}

// This is needed in some spots as the download can complete before the next progress update occurs so just 100% here
Expand Down Expand Up @@ -248,7 +248,7 @@ void Upgrade::SnapshotMain()
std::cout << progress.Status() << std::flush;
}

MilliSleep(1000);
UninterruptibleSleep(std::chrono::milliseconds{1000});
}

if (progress.Update(100)) std::cout << progress.Status() << std::flush;
Expand All @@ -272,7 +272,7 @@ void Upgrade::SnapshotMain()
std::cout << progress.Status() << std::flush;
}

MilliSleep(1000);
UninterruptibleSleep(std::chrono::milliseconds{1000});
}

if (progress.Update(100)) std::cout << progress.Status() << std::flush;
Expand All @@ -296,7 +296,7 @@ void Upgrade::SnapshotMain()
if (progress.Update(ExtractStatus.GetSnapshotExtractProgress()))
std::cout << progress.Status() << std::flush;

MilliSleep(1000);
UninterruptibleSleep(std::chrono::milliseconds{1000});
}

if (progress.Update(100)) std::cout << progress.Status() << std::flush;
Expand Down Expand Up @@ -369,7 +369,7 @@ void Upgrade::WorkerMain(Progress& progress)
}
}

MilliSleep(1000);
UninterruptibleSleep(std::chrono::milliseconds{1000});
}
}

Expand Down
9 changes: 5 additions & 4 deletions src/gridcoinresearchd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,11 @@ bool AppInit(int argc, char* argv[])

PrintException(nullptr, "AppInit()");
}
if(fRet)
{
while (!ShutdownRequested())
MilliSleep(500);

if (fRet) {
while (!ShutdownRequested()) {
UninterruptibleSleep(std::chrono::milliseconds{500});
}
}

Shutdown(nullptr);
Expand Down
23 changes: 19 additions & 4 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,31 @@ void Shutdown(void* parg)
if (fFirstThread)
{
LogPrintf("gridcoinresearch exiting...");

fShutdown = true;

// clean up the threads running serviceQueue:
// Signal to the scheduler to stop.
LogPrintf("INFO: %s: Stopping the scheduler.", __func__);
scheduler.stop();

// clean up any remaining threads running serviceQueue:
LogPrintf("INFO: %s: Cleaning up any remaining threads in scheduler.", __func__);
threadGroup.interrupt_all();
threadGroup.join_all();

LogPrintf("INFO: %s: Flushing wallet database.", __func__);
bitdb.Flush(false);

// Interrupt all sleeping threads.
LogPrintf("INFO: %s: Interrupting sleeping threads.", __func__);
g_thread_interrupt();

LogPrintf("INFO: %s: Stopping net (node) threads.", __func__);
StopNode();

LogPrintf("INFO: %s: Final flush of wallet database and closing wallet database file.", __func__);
bitdb.Flush(true);

LogPrintf("INFO: %s: Stopping RPC threads.", __func__);
StopRPCThreads();

// This is necessary here to prevent a snapshot download from failing at the cleanup
Expand All @@ -127,14 +142,14 @@ void Shutdown(void* parg)
// This causes issues on daemons where it tries to create a second
// lock file.
//CTxDB().Close();
MilliSleep(50);
UninterruptibleSleep(std::chrono::milliseconds{50});
LogPrintf("Gridcoin exited");
fExit = true;
}
else
{
while (!fExit)
MilliSleep(100);
UninterruptibleSleep(std::chrono::milliseconds{100});
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/miner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1207,7 +1207,7 @@ void StakeMiner(CWallet *pwallet)
if (fEnableSideStaking) vSideStakeAlloc = GetSideStakingStatusAndAlloc();

// wait for next round
MilliSleep(nMinerSleep);
if (!MilliSleep(nMinerSleep)) return;

g_timer.InitTimer("miner", LogInstance().WillLogCategory(BCLog::LogFlags::MISC));

Expand Down
44 changes: 20 additions & 24 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,7 @@ void ThreadSocketHandler2(void* parg)
}
FD_ZERO(&fdsetSend);
FD_ZERO(&fdsetError);
MilliSleep(timeout.tv_usec/1000);
if (!MilliSleep(timeout.tv_usec/1000)) return;
}


Expand Down Expand Up @@ -1111,7 +1111,7 @@ void ThreadSocketHandler2(void* parg)
pnode->Release();
}

MilliSleep(10);
UninterruptibleSleep(std::chrono::milliseconds{10});
}
}

Expand Down Expand Up @@ -1222,7 +1222,7 @@ void ThreadMapPort2(void* parg)
else
LogPrintf("UPnP Port Mapping successful.");;
}
MilliSleep(2000);
if (!MilliSleep(2000)) return;
i++;
}
} else {
Expand All @@ -1233,9 +1233,8 @@ void ThreadMapPort2(void* parg)
FreeUPNPUrls(&urls);
while (true)
{
if (fShutdown || !fUseUPnP)
return;
MilliSleep(2000);
if (fShutdown || !fUseUPnP) return;
if (!MilliSleep(2000)) return;
}
}
}
Expand Down Expand Up @@ -1352,16 +1351,16 @@ void DumpAddresses()
CAddrDB adb;
adb.Write(addrman);

LogPrint(BCLog::LogFlags::NET, "Flushed %d addresses to peers.dat %" PRId64 "ms", addrman.size(), GetTimeMillis() - nStart);

LogPrint(BCLog::LogFlags::NET, "Flushed %d addresses to peers.dat %" PRId64 "ms",
addrman.size(), GetTimeMillis() - nStart);
}

void ThreadDumpAddress2(void* parg)
{
while (!fShutdown)
{
DumpAddresses();
MilliSleep(600000);
if (!MilliSleep(600000)) return;
}
}

Expand Down Expand Up @@ -1498,12 +1497,12 @@ void ThreadOpenConnections2(void* parg)
OpenNetworkConnection(addr, nullptr, strAddr.c_str());
for (int i = 0; i < 10 && i < nLoop; i++)
{
MilliSleep(500);
UninterruptibleSleep(std::chrono::milliseconds{500});
if (fShutdown)
return;
}
}
MilliSleep(500);
UninterruptibleSleep(std::chrono::milliseconds{500});
}
}

Expand All @@ -1512,7 +1511,7 @@ void ThreadOpenConnections2(void* parg)
while (true)
{
ProcessOneShot();
MilliSleep(500);
UninterruptibleSleep(std::chrono::milliseconds{500});

if (fShutdown)
return;
Expand Down Expand Up @@ -1638,9 +1637,9 @@ void ThreadOpenAddedConnections2(void* parg)
CAddress addr;
CSemaphoreGrant grant(*semOutbound);
OpenNetworkConnection(addr, &grant, strAddNode.c_str());
MilliSleep(500);
UninterruptibleSleep(std::chrono::milliseconds{500});
}
MilliSleep(120000); // Retry every 2 minutes
if (!MilliSleep(120000)) return; // Retry every 2 minutes
}
return;
}
Expand Down Expand Up @@ -1682,15 +1681,12 @@ void ThreadOpenAddedConnections2(void* parg)
{
CSemaphoreGrant grant(*semOutbound);
OpenNetworkConnection(CAddress(*(vserv.begin())), &grant);
MilliSleep(500);
if (fShutdown)
return;
UninterruptibleSleep(std::chrono::milliseconds{500});
if (fShutdown) return;
}
if (fShutdown)
return;
MilliSleep(120000); // Retry every 2 minutes
if (fShutdown)
return;
if (fShutdown) return;
if (!MilliSleep(120000)) return; // Retry every 2 minutes
if (fShutdown) return;
}
}

Expand Down Expand Up @@ -1817,7 +1813,7 @@ void ThreadMessageHandler2(void* parg)

// Wait and allow messages to bunch up.
// we're sleeping, but we must always check fShutdown after doing this.
MilliSleep(100);
UninterruptibleSleep(std::chrono::milliseconds{100});
if (fRequestShutdown)
StartShutdown();
if (fShutdown)
Expand Down Expand Up @@ -2066,7 +2062,7 @@ bool StopNode()

netThreads->interruptAll();
netThreads->removeAll();
MilliSleep(50);
UninterruptibleSleep(std::chrono::milliseconds{50});
DumpAddresses();
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/qt/bitcoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ int StartGridcoinQt(int argc, char *argv[], QApplication& app, OptionsModel& opt
// The sleep here has to be pretty short to avoid a buffer overflow crash with
// fast CPUs due to too many events. It originally was set to 300 ms and has
// been reduced to 100 ms.
MilliSleep(100);
UninterruptibleSleep(std::chrono::milliseconds{100});
}

if (splashref)
Expand Down
2 changes: 1 addition & 1 deletion src/qt/qtipcserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ static void ipcThread2(void* pArg)
if (mq->timed_receive(&buffer, sizeof(buffer), nSize, nPriority, d))
{
uiInterface.ThreadSafeHandleURI(std::string(buffer, nSize));
MilliSleep(1000);
UninterruptibleSleep(std::chrono::seconds{1});
}

if (fShutdown)
Expand Down
8 changes: 4 additions & 4 deletions src/qt/upgradeqt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ bool UpgradeQt::SnapshotMain(QApplication& SnapshotApp)
}
}

MilliSleep(poll_delay);
UninterruptibleSleep(std::chrono::milliseconds{poll_delay});
}

m_Progress->reset();
Expand Down Expand Up @@ -186,7 +186,7 @@ bool UpgradeQt::SnapshotMain(QApplication& SnapshotApp)
}
}

MilliSleep(poll_delay);
UninterruptibleSleep(std::chrono::milliseconds{poll_delay});
}

m_Progress->reset();
Expand Down Expand Up @@ -232,7 +232,7 @@ bool UpgradeQt::SnapshotMain(QApplication& SnapshotApp)
}
}

MilliSleep(poll_delay);
UninterruptibleSleep(std::chrono::milliseconds{poll_delay});
}

m_Progress->reset();
Expand Down Expand Up @@ -297,7 +297,7 @@ bool UpgradeQt::SnapshotMain(QApplication& SnapshotApp)

SnapshotApp.processEvents();

MilliSleep(poll_delay);
UninterruptibleSleep(std::chrono::milliseconds{poll_delay});
}

m_Progress->setValue(100);
Expand Down
2 changes: 1 addition & 1 deletion src/rpc/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ void ServiceConnection(AcceptedConnection *conn)
If this results in a DOS the user really
shouldn't have their RPC port exposed.*/
if (gArgs.GetArgs("-rpcpassword").size() < 20)
MilliSleep(250);
UninterruptibleSleep(std::chrono::milliseconds{250});

conn->stream() << HTTPReply(HTTP_UNAUTHORIZED, "", false) << std::flush;
break;
Expand Down
Loading

0 comments on commit 99f9e11

Please sign in to comment.