Skip to content

Commit

Permalink
fix: respect bigquery limit (#2754)
Browse files Browse the repository at this point in the history
Closes #716

----

I found this as I was looking through the issues earlier today. 

One concern with this (and limit pushdowns in general,) is that we
could be _under_ reporting data in these cases:

Imagine we only partially push down a predicate but we completly push
down the limit. If we get `n` rows back from the data source, but then
further filter it down, then we've returned fewer than the target
number of rows. Right? 

This isn't new, and I'm not sure we _shouldn't_ do this because of
that, and also I could imagine datafusion passing in different limits
than what the user specified.

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Vaibhav Rabber <[email protected]>
  • Loading branch information
3 people authored Mar 18, 2024
1 parent b23ad2d commit f1b558a
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 4 deletions.
6 changes: 6 additions & 0 deletions crates/datasources/src/bigquery/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,9 @@ pub enum BigQueryError {
}

pub type Result<T, E = BigQueryError> = std::result::Result<T, E>;

impl From<BigQueryError> for datafusion::common::DataFusionError {
fn from(e: BigQueryError) -> Self {
datafusion::common::DataFusionError::External(Box::new(e))
}
}
11 changes: 7 additions & 4 deletions crates/datasources/src/bigquery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ impl TableProvider for BigQueryTableProvider {
_ctx: &SessionState,
projection: Option<&Vec<usize>>,
filters: &[Expr],
_limit: Option<usize>,
limit: Option<usize>,
) -> DatafusionResult<Arc<dyn ExecutionPlan>> {
// TODO: Fix duplicated key deserialization.
let storage = {
Expand All @@ -299,15 +299,18 @@ impl TableProvider for BigQueryTableProvider {

// Add row restriction.
// TODO: Check what restrictions are valid.
let predicate = if self.predicate_pushdown {
let restriction = exprs_to_predicate_string(filters)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let mut predicate = if self.predicate_pushdown {
let restriction = exprs_to_predicate_string(filters)?;
builder = builder.row_restriction(restriction.clone());
restriction
} else {
String::new()
};

if let Some(size) = limit {
write!(predicate, " LIMIT {}", size)?;
};

// Select fields based off of what's in our projected schema.
let selected: Vec<_> = projected_schema
.fields
Expand Down
9 changes: 9 additions & 0 deletions testdata/sqllogictests/csv.slt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ select count(*) from bikeshare_stations;
----
102

query I
SELECT station_id, number_of_docks
FROM '${PWD}/testdata/sqllogictests_datasources_common/data/bikeshare_stations.csv'
WHERE alternate_name IS NOT null
ORDER BY station_id
LIMIT 1;
----
2574 17

query IT
select count(*), status from bikeshare_stations group by status order by status;
----
Expand Down
9 changes: 9 additions & 0 deletions testdata/sqllogictests_bigquery/read.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,12 @@ query I
SELECT count(*) FROM read_bigquery('${GCP_SERVICE_ACCOUNT_KEY}', '${GCP_PROJECT_ID}', '${BIGQUERY_DATASET_ID}', 'bikeshare_stations');
----
102

query I
SELECT station_id, number_of_docks
FROM read_bigquery('${GCP_SERVICE_ACCOUNT_KEY}', '${GCP_PROJECT_ID}', '${BIGQUERY_DATASET_ID}', 'bikeshare_stations')
WHERE alternate_name IS NOT null
ORDER BY station_id
LIMIT 1;
----
2574 17

0 comments on commit f1b558a

Please sign in to comment.