Skip to content

Commit

Permalink
[BugFix] fix empty scan ranges in ConnectorScanNode (#9458)
Browse files Browse the repository at this point in the history
(cherry picked from commit 9f538f4)

# Conflicts:
#	be/src/exec/vectorized/connector_scan_node.cpp
  • Loading branch information
dirtysalt authored and mergify[bot] committed Aug 17, 2022
1 parent f6a7f1f commit 21877c3
Showing 1 changed file with 14 additions and 0 deletions.
14 changes: 14 additions & 0 deletions be/src/exec/vectorized/connector_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,19 +159,29 @@ Status ConnectorScanNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ScanNode::open(state));
RETURN_IF_ERROR(_data_source_provider->open(state));

return Status::OK();
}

Status ConnectorScanNode::_start_scan_thread(RuntimeState* state) {
for (const TScanRangeParams& scan_range : _scan_ranges) {
_create_and_init_scanner(state, scan_range.scan_range);
}
_num_scanners = _pending_scanners.size();
if (_num_scanners == 0) {
return Status::EndOfFile("");
}

// init chunk pool
_pending_scanners.reverse();
<<<<<<< HEAD
_num_scanners = _pending_scanners.size();
_chunks_per_scanner = config::doris_scanner_row_num / state->chunk_size();
_chunks_per_scanner += static_cast<int>(config::doris_scanner_row_num % state->chunk_size() != 0);
=======
_chunks_per_scanner = config::scanner_row_num / state->chunk_size();
_chunks_per_scanner += static_cast<int>(config::scanner_row_num % state->chunk_size() != 0);
>>>>>>> 9f538f4c1 ([BugFix] fix empty scan ranges in ConnectorScanNode (#9458))
int concurrency = std::min<int>(config::max_hdfs_scanner_num, _num_scanners);
int chunks = _chunks_per_scanner * concurrency;
_chunk_pool.reserve(chunks);
Expand Down Expand Up @@ -202,6 +212,10 @@ Status ConnectorScanNode::get_next(RuntimeState* state, ChunkPtr* chunk, bool* e
SCOPED_TIMER(_runtime_profile->total_time_counter());
if (!_start && _status.ok()) {
Status status = _start_scan_thread(state);
if (status.is_end_of_file()) {
*eos = true;
return Status::OK();
}
_update_status(status);
LOG_IF(ERROR, !status.ok()) << "Failed to start scan node: " << status.to_string();
_start = true;
Expand Down

0 comments on commit 21877c3

Please sign in to comment.