-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BugFix] fix scan blocked by unreleased tokens #36836
Conversation
Signed-off-by: Zhuhe Fang <[email protected]>
@@ -409,6 +419,8 @@ Status ScanOperator::_trigger_next_scan(RuntimeState* state, int chunk_source_in | |||
int64_t prev_scan_bytes = chunk_source->get_scan_bytes(); | |||
auto status = chunk_source->buffer_next_batch_chunks_blocking(state, kIOTaskBatchSize, _workgroup.get()); | |||
if (!status.ok() && !status.is_end_of_file()) { | |||
LOG(ERROR) << "scan fragment " << print_id(state->fragment_instance_id()) << " driver " | |||
<< get_driver_sequence() << " Scan tasks error: " << status.to_string(); | |||
_set_scan_status(status); | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most risky bug in this code is:
Concurrent modification of shared resources without proper synchronization may lead to undefined behavior or data races.
You can modify the code like this:
void ScanOperator::close(RuntimeState* state) {
std::lock_guard guard(_task_mutex); // Acquire the lock before modifying shared resources
set_buffer_finished();
// For the running io task, we close its chunk sources in ~ScanOperator not in ScanOperator::close.
for (size_t i = 0; i < _chunk_sources.size(); i++) {
// std::lock_guard guard(_task_mutex); // This line is commented out as we've already acquired the lock above
...
Note: The provided code snippet seems to be a diff patch from a version control system. However, the context around modifications suggests that there is some thread-sensitive logic regarding the management of IO tasks and buffer state. Moving set_buffer_finished()
outside of the close
method without locking (_task_mutex
) could result in data races if another thread is simultaneously involved with the chunk sources or related counters such as _num_running_io_tasks
or _submit_task_counter
.
Acquiring the mutex lock at the start of the ScanOperator::close
method can help ensure these shared resources are protected against concurrent access, preventing data races and ensuring the consistency of the runtime state.
[FE Incremental Coverage Report]✅ pass : 0 / 0 (0%) |
[BE Incremental Coverage Report]❌ fail : 2 / 10 (20.00%) file detail
|
@Mergifyio backport branch-3.2 |
@Mergifyio backport branch-3.1 |
@Mergifyio backport branch-3.0 |
@Mergifyio backport branch-2.5 |
✅ Backports have been created
|
✅ Backports have been created
|
✅ Backports have been created
|
✅ Backports have been created
|
Signed-off-by: Zhuhe Fang <[email protected]> (cherry picked from commit d3621d0)
Signed-off-by: Zhuhe Fang <[email protected]> (cherry picked from commit d3621d0)
Signed-off-by: Zhuhe Fang <[email protected]> (cherry picked from commit d3621d0)
Signed-off-by: Zhuhe Fang <[email protected]> (cherry picked from commit d3621d0)
) Co-authored-by: Zhuhe Fang <[email protected]>
) Signed-off-by: Zhuhe Fang <[email protected]> Co-authored-by: Zhuhe Fang <[email protected]>
Signed-off-by: Zhuhe Fang <[email protected]> (cherry picked from commit d3621d0)
) Co-authored-by: Zhuhe Fang <[email protected]>
) Co-authored-by: Zhuhe Fang <[email protected]>
Why I'm doing:
some scan operators are blocked as they cann't get tokens, which are hold by others. This case happen when a fragment with scan + limit, some dirvers run faster and reach the limit, and the scan's has_output() always return full state, so the incoming drivers can't be issued, blocking the fragment to be finished.
What I'm doing:
let finishing scan release token in time.
besides, add some key logs when the scan is blocked, help to fix similar issues in the future.
Fixes #issue
What type of PR is this:
Does this PR entail a change in behavior?
If yes, please specify the type of change:
Checklist:
Bugfix cherry-pick branch check: