Skip to content

Commit

Permalink
[BugFix] fix scan blocked by unreleased tokens (backport #36836) (#36860
Browse files Browse the repository at this point in the history
)

Co-authored-by: Zhuhe Fang <[email protected]>
  • Loading branch information
mergify[bot] and fzhedu authored Dec 12, 2023
1 parent c3f0ee2 commit 53b0573
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion be/src/exec/pipeline/scan/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ Status ScanOperator::prepare(RuntimeState* state) {
}

void ScanOperator::close(RuntimeState* state) {
set_buffer_finished();
// For the running io task, we close its chunk sources in ~ScanOperator not in ScanOperator::close.
for (size_t i = 0; i < _chunk_sources.size(); i++) {
std::lock_guard guard(_task_mutex);
Expand Down Expand Up @@ -215,8 +214,19 @@ void ScanOperator::_detach_chunk_sources() {
}

Status ScanOperator::set_finishing(RuntimeState* state) {
// check when expired, are there running io tasks or submitted tasks
if (UNLIKELY(state != nullptr && state->query_ctx()->is_query_expired() &&
(_num_running_io_tasks > 0 || _submit_task_counter->value() == 0))) {
LOG(WARNING) << "set_finishing scan fragment " << print_id(state->fragment_instance_id()) << " driver_id "
<< get_driver_sequence() << " _num_running_io_tasks= " << _num_running_io_tasks
<< " _submit_task_counter= " << _submit_task_counter->value()
<< " _morsels_counter= " << _morsels_counter->value()
<< (is_buffer_full() && (num_buffered_chunks() == 0) ? ", buff is full but without local chunks"
: "");
}
std::lock_guard guard(_task_mutex);
_detach_chunk_sources();
set_buffer_finished();
_is_finished = true;
return Status::OK();
}
Expand Down Expand Up @@ -390,6 +400,8 @@ Status ScanOperator::_trigger_next_scan(RuntimeState* state, int chunk_source_in
int64_t prev_scan_bytes = chunk_source->get_scan_bytes();
auto status = chunk_source->buffer_next_batch_chunks_blocking(state, kIOTaskBatchSize, _workgroup.get());
if (!status.ok() && !status.is_end_of_file()) {
LOG(ERROR) << "scan fragment " << print_id(state->fragment_instance_id()) << " driver "
<< get_driver_sequence() << " Scan tasks error: " << status.to_string();
_set_scan_status(status);
}

Expand Down

0 comments on commit 53b0573

Please sign in to comment.