Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: SQL Server data source #2018

Merged
merged 17 commits into from
Nov 13, 2023
Merged

feat: SQL Server data source #2018

merged 17 commits into from
Nov 13, 2023

Conversation

scsmithr
Copy link
Member

@scsmithr scsmithr commented Oct 30, 2023

Adds support for SQL Server as a data source

Closes #1101

SQL

Create external table:

CREATE EXTERNAL TABLE large_table
	FROM sql_server
	OPTIONS (
		connection_string = 'server=tcp:localhost,1433;user=SA;password=Password123;TrustServerCertificate=true',
		schema = 'dbo',
		table = 'bikeshare_trips'
	);

Create external database:

CREATE EXTERNAL DATABASE external_db
	FROM sql_server
	OPTIONS (
		connection_string = 'server=tcp:localhost,1433;user=SA;password=Password123;TrustServerCertificate=true',
	);

Next

  • Virtual schema thing (for listing tables, column info)
  • read_sqlserver function
  • Push down filters to sql server
  • datatypes table for SLT

@scsmithr scsmithr marked this pull request as ready for review November 13, 2023 19:40
crates/datasources/src/sqlserver/mod.rs Show resolved Hide resolved
crates/datasources/src/sqlserver/mod.rs Show resolved Hide resolved
Comment on lines +475 to +528
let mut columns = Vec::with_capacity(schema.fields.len());
for (col_idx, field) in schema.fields.iter().enumerate() {
let col: Arc<dyn Array> = match field.data_type() {
DataType::Boolean => make_column!(BooleanBuilder, rows, col_idx),
DataType::Int16 => make_column!(Int16Builder, rows, col_idx),
DataType::Int32 => make_column!(Int32Builder, rows, col_idx),
DataType::Int64 => {
let mut arr = Int64Builder::with_capacity(rows.len());
for row in rows.iter() {
let val: Option<Intn> = row.try_get(col_idx)?;
arr.append_option(val.map(|v| v.0));
}
Arc::new(arr.finish())
}
DataType::Float32 => make_column!(Float32Builder, rows, col_idx),
DataType::Float64 => make_column!(Float64Builder, rows, col_idx),
DataType::Utf8 => {
// Assumes an average of 16 bytes per item.
let mut arr = StringBuilder::with_capacity(rows.len(), rows.len() * 16);
for row in rows.iter() {
let val: Option<&str> = row.try_get(col_idx)?;
arr.append_option(val);
}
Arc::new(arr.finish())
}
DataType::Binary => {
// Assumes an average of 16 bytes per item.
let mut arr = BinaryBuilder::with_capacity(rows.len(), rows.len() * 16);
for row in rows.iter() {
let val: Option<&[u8]> = row.try_get(col_idx)?;
arr.append_option(val);
}
Arc::new(arr.finish())
}
DataType::Timestamp(TimeUnit::Nanosecond, None) => {
let mut arr = TimestampNanosecondBuilder::with_capacity(rows.len());
for row in rows.iter() {
let val: Option<NaiveDateTime> = row.try_get(col_idx)?;
let val = val.map(|v| v.timestamp_nanos_opt().unwrap());
arr.append_option(val);
}
Arc::new(arr.finish())
}

// TODO: All the others...
// Tiberius mapping: <https://docs.rs/tiberius/latest/tiberius/trait.FromSql.html>
other => {
return Err(SqlServerError::String(format!(
"unsupported data type for sql server: {other}"
)))
}
};
columns.push(col);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we could easily parallelize this using rayon's par_iter. (idk if tokio has an equivalent)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly, it would require us to manage a thread pool. Also we'd probably want to do higher level parallelization through working with multiple partitions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think we would want both. The parallelization of multiple partitions would help with larger datasets that require several partitions, and would lend well to distributed execution. Without parallelizing the lower level stuff, we'll likely run into slower compute once we distribute the partitions as each partition would still be processing each column sequentially.

this is definitely not blocking the PR approval, but just something to think about. (it looks like our other sql readers follow the same pattern anyways).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Int32Builder, Int64Builder, StringBuilder, TimestampNanosecondBuilder,
};

let rows = rows.into_iter().collect::<Result<Vec<_>>>()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this cause us to iterate over the rows twice? once here, and once again when we are constructing the columns.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I figured this would make it easier to auto vectorize, but I haven't actually measure and/or seen.

crates/datasources/src/sqlserver/mod.rs Show resolved Hide resolved
_overwrite: bool,
) -> DatafusionResult<Arc<dyn ExecutionPlan>> {
Err(DataFusionError::Execution(
"inserts not supported for SQL Server".to_string(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"inserts not supported for SQL Server".to_string(),
"inserts not yet supported for SQL Server".to_string(),

Comment on lines +49 to 50
tokio-util = { version = "*" }
tokio = { version = "1.34.0", features = ["full"] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feels like this wants to be workspace synced?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pub struct QueryStream {
receiver: mpsc::Receiver<Result<QueryItem>>,
metadata: Option<ResultMetadata>,
buffered_rows: VecDeque<Row>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we unbounded in the mpsc stream and in this buffer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're unbounded in the mpsc from Client to Connection, but bounded in the mpsc from Connection to QueryStream (the results). buffered_rows is unbounded, but is effectively 0 (no allocations) or 1 (unlikely).

For the unbounded mpsc, a more proper implementation would send messages directly to the server as soon as it's received instead of after reading the entirety of the response for a message before moving to the next one. The current implementation is still correct, but would unexpectedly block on 2 client messages. We're only sending 1 message at a time anyways so that's not really an issue for us right now though.

crates/datasources/src/sqlserver/client.rs Show resolved Hide resolved
crates/datasources/src/sqlserver/mod.rs Show resolved Hide resolved
crates/datasources/src/sqlserver/mod.rs Show resolved Hide resolved
crates/datasources/src/sqlserver/mod.rs Show resolved Hide resolved
crates/datasources/src/sqlserver/mod.rs Show resolved Hide resolved
@scsmithr
Copy link
Member Author

Followup items tracked in #2094

@scsmithr scsmithr merged commit e93bf2b into main Nov 13, 2023
@scsmithr scsmithr deleted the sean/sqlserver branch November 13, 2023 23:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Data source: SQL Server
3 participants