Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: respect bigquery limit #2754

Merged
merged 14 commits into from
Mar 18, 2024
6 changes: 6 additions & 0 deletions crates/datasources/src/bigquery/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,9 @@ pub enum BigQueryError {
}

pub type Result<T, E = BigQueryError> = std::result::Result<T, E>;

impl From<BigQueryError> for datafusion::common::DataFusionError {
fn from(e: BigQueryError) -> Self {
datafusion::common::DataFusionError::External(Box::new(e))
}
}
11 changes: 7 additions & 4 deletions crates/datasources/src/bigquery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ impl TableProvider for BigQueryTableProvider {
_ctx: &SessionState,
projection: Option<&Vec<usize>>,
filters: &[Expr],
_limit: Option<usize>,
limit: Option<usize>,
) -> DatafusionResult<Arc<dyn ExecutionPlan>> {
// TODO: Fix duplicated key deserialization.
let storage = {
Expand All @@ -299,15 +299,18 @@ impl TableProvider for BigQueryTableProvider {

// Add row restriction.
// TODO: Check what restrictions are valid.
let predicate = if self.predicate_pushdown {
let restriction = exprs_to_predicate_string(filters)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let mut predicate = if self.predicate_pushdown {
let restriction = exprs_to_predicate_string(filters)?;
builder = builder.row_restriction(restriction.clone());
restriction
} else {
String::new()
};

if let Some(size) = limit {
write!(predicate, " LIMIT {}", size)?;
};

// Select fields based off of what's in our projected schema.
let selected: Vec<_> = projected_schema
.fields
Expand Down
9 changes: 9 additions & 0 deletions testdata/sqllogictests/csv.slt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ select count(*) from bikeshare_stations;
----
102

query I
SELECT station_id, number_of_docks
FROM '${PWD}/testdata/sqllogictests_datasources_common/data/bikeshare_stations.csv'
WHERE alternate_name IS NOT null
ORDER BY station_id
LIMIT 1;
----
2574 17

query IT
select count(*), status from bikeshare_stations group by status order by status;
----
Expand Down
9 changes: 9 additions & 0 deletions testdata/sqllogictests_bigquery/read.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,12 @@ query I
SELECT count(*) FROM read_bigquery('${GCP_SERVICE_ACCOUNT_KEY}', '${GCP_PROJECT_ID}', '${BIGQUERY_DATASET_ID}', 'bikeshare_stations');
----
102

query I
SELECT station_id, number_of_docks
FROM read_bigquery('${GCP_SERVICE_ACCOUNT_KEY}', '${GCP_PROJECT_ID}', '${BIGQUERY_DATASET_ID}', 'bikeshare_stations')
WHERE alternate_name IS NOT null
ORDER BY station_id
LIMIT 1;
----
2574 17
Loading