Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return error if drain fails #132

Merged
merged 1 commit into from
Aug 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions quantum/impl/quantum_dispatcher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ bool Dispatcher::empty(IQueue::QueueType type,
}

inline
void Dispatcher::drain(std::chrono::milliseconds timeout,
bool Dispatcher::drain(std::chrono::milliseconds timeout,
bool isFinal)
{
DrainGuard guard(_drain, !isFinal);
Expand All @@ -364,7 +364,7 @@ void Dispatcher::drain(std::chrono::milliseconds timeout,

if (timeout == std::chrono::milliseconds::zero())
{
return; //skip draining
return true; //skip draining
}

auto start = std::chrono::steady_clock::now();
Expand All @@ -382,7 +382,7 @@ void Dispatcher::drain(std::chrono::milliseconds timeout,
if (std::chrono::duration_cast<std::chrono::milliseconds>(present-start) > timeout)
{
//timeout reached
break;
return false;
}
}
}
Expand All @@ -391,6 +391,7 @@ void Dispatcher::drain(std::chrono::milliseconds timeout,
std::lock_guard<std::mutex> guard(Util::LogMutex());
std::cout << "All queues have drained." << std::endl;
#endif
return true;
}

inline
Expand Down
3 changes: 2 additions & 1 deletion quantum/quantum_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,10 @@ class Dispatcher : public ITerminate
/// @brief Drains all queues on this dispatcher object.
/// @param[in] timeout Maximum time for this function to wait. Set to -1 to wait indefinitely until all queues drain.
/// @param[in] isFinal If set to true, the dispatcher will not allow any more processing after the drain completes.
/// @return True if everything drains before timeout, false otherwise.
/// @note This function blocks until all coroutines and IO tasks have completed. During this time, posting
/// of new tasks is disabled unless they are posted from within an already executing coroutine.
void drain(std::chrono::milliseconds timeout = std::chrono::milliseconds(-1),
bool drain(std::chrono::milliseconds timeout = std::chrono::milliseconds(-1),
bool isFinal = false);

/// @brief Returns the number of underlying coroutine threads as specified in the constructor. If -1 was passed
Expand Down
4 changes: 2 additions & 2 deletions quantum/util/impl/quantum_sequencer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::isPendingContext(const ICoroC
}

template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
void
bool
Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::drain(std::chrono::milliseconds timeout,
bool isFinal)
{
Expand All @@ -561,7 +561,7 @@ Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::drain(std::chrono::millisecon
});

DrainGuard guard(_drain, !isFinal);
future->waitFor(timeout);
return future->waitFor(timeout) == std::future_status::ready;
}


Expand Down
3 changes: 2 additions & 1 deletion quantum/util/quantum_sequencer.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,12 @@ class Sequencer
/// @brief Drains all sequenced tasks.
/// @param[in] timeout Maximum time for this function to wait. Set to -1 to wait indefinitely until all sequences drain.
/// @param[in] isFinal If set to true, the sequencer will not allow any more processing after the drain completes.
/// @return True if everything drains before timeout, false otherwise.
/// @note This function blocks until all sequences have completed. During this time, posting
/// of new tasks is disabled unless they are posted from within an already executing coroutine.
/// Since this function posts a task which will wait on all others, getStatistics().getPostedTaskCount()
/// will contain one extra count.
void drain(std::chrono::milliseconds timeout = std::chrono::milliseconds(-1),
bool drain(std::chrono::milliseconds timeout = std::chrono::milliseconds(-1),
bool isFinal = false);

private:
Expand Down