From 659188ef0c1016099fe9d17898b3b155fb332ec4 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Wed, 16 Feb 2022 17:37:21 +0000 Subject: [PATCH 01/15] Added docker --- .../docker-compose.yml | 21 +++++++++++++ arrow-odbc-integration-testing/dockerfile | 30 +++++++++++++++++++ 2 files changed, 51 insertions(+) create mode 100644 arrow-odbc-integration-testing/docker-compose.yml create mode 100644 arrow-odbc-integration-testing/dockerfile diff --git a/arrow-odbc-integration-testing/docker-compose.yml b/arrow-odbc-integration-testing/docker-compose.yml new file mode 100644 index 00000000000..311fef81cbc --- /dev/null +++ b/arrow-odbc-integration-testing/docker-compose.yml @@ -0,0 +1,21 @@ +services: + + mssql: + image: mcr.microsoft.com/mssql/server:2019-latest + ports: + - 1433:1433 + + environment: + - MSSQL_SA_PASSWORD=My@Test@Password1 + command: ["/opt/mssql/bin/sqlservr", "--accept-eula", "--reset-sa-password"] + + dev: + build: . + volumes: + - .:/workspace:cached + + # Overrides default command so things don't shut down after the process ends. + command: sleep infinity + + # Runs app on the same network as the database container, allows "forwardPorts" in devcontainer.json function. + network_mode: service:mssql diff --git a/arrow-odbc-integration-testing/dockerfile b/arrow-odbc-integration-testing/dockerfile new file mode 100644 index 00000000000..5cca31fb927 --- /dev/null +++ b/arrow-odbc-integration-testing/dockerfile @@ -0,0 +1,30 @@ +# See here for image contents: https://github.com/microsoft/vscode-dev-containers/tree/v0.163.1/containers/debian/.devcontainer/base.Dockerfile + +# [Choice] Debian version: buster, stretch +ARG VARIANT="buster" +FROM mcr.microsoft.com/vscode/devcontainers/base:0-${VARIANT} + +# Install Microsoft ODBC SQL Drivers (msodbcsql17 package) for Debian 10 +# https://docs.microsoft.com/de-de/sql/connect/odbc/linux-mac/installing-the-microsoft-odbc-driver-for-sql-server?view=sql-server-ver15 +RUN curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - +RUN curl https://packages.microsoft.com/config/ubuntu/20/prod.list > /etc/apt/sources.list.d/mssql-release.list +RUN echo msodbcsql17 msodbcsql/ACCEPT_EULA boolean true | debconf-set-selections + +# [Optional] Uncomment this section to install additional packages. +RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \ + && apt-get -y install --no-install-recommends \ + build-essential \ + unixodbc-dev \ + msodbcsql17 + +# Setup cargo and install cargo packages required for tests +USER vscode + +# There is also a rust devcontainer, yet this way we get a toolchain +# which is updatable with rustup. +RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- --default-toolchain stable -y + +# We need 'parquet-read' to run the tests. At the moment I do not +# know a way to specify cargo installable executables as build +# dependencies. +RUN /home/vscode/.cargo/bin/cargo install parquet --features cli From 4fbc29fe62e12caace47500da4aeebf87afec7f8 Mon Sep 17 00:00:00 2001 From: Markus Klein Date: Wed, 16 Feb 2022 19:58:37 +0100 Subject: [PATCH 02/15] Poc: fetching string from MSSQL Co-authored-by: Jorge Leitao --- Cargo.toml | 7 +- arrow-odbc-integration-testing/Cargo.toml | 11 ++++ arrow-odbc-integration-testing/dockerfile | 2 +- arrow-odbc-integration-testing/src/lib.rs | 79 +++++++++++++++++++++++ examples/odbc_read.rs | 0 src/io/mod.rs | 4 ++ src/io/odbc.rs | 2 + 7 files changed, 103 insertions(+), 2 deletions(-) create mode 100644 arrow-odbc-integration-testing/Cargo.toml create mode 100644 arrow-odbc-integration-testing/src/lib.rs create mode 100644 examples/odbc_read.rs create mode 100644 src/io/odbc.rs diff --git a/Cargo.toml b/Cargo.toml index d5d9a832d66..0b50ed79a0e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,6 +87,9 @@ strength_reduce = { version = "0.2", optional = true } # For instruction multiversioning multiversion = { version = "0.6.1", optional = true } +# For instruction multiversioning +odbc-api = { version = "0.34", optional = true } + [dev-dependencies] criterion = "0.3" flate2 = "1" @@ -104,8 +107,9 @@ features = ["full"] rustdoc-args = ["--cfg", "docsrs"] [features] -default = [] +default = ["io_odbc"] full = [ + "io_odbc", "io_csv", "io_csv_async", "io_json", @@ -126,6 +130,7 @@ full = [ # parses timezones used in timestamp conversions "chrono-tz", ] +io_odbc = ["odbc-api"] io_csv = ["io_csv_read", "io_csv_write"] io_csv_async = ["io_csv_read_async"] io_csv_read = ["csv", "lexical-core"] diff --git a/arrow-odbc-integration-testing/Cargo.toml b/arrow-odbc-integration-testing/Cargo.toml new file mode 100644 index 00000000000..942fafdd577 --- /dev/null +++ b/arrow-odbc-integration-testing/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "arrow-odbc-integration-testing" +version = "0.1.0" +authors = ["Jorge C. Leitao "] +edition = "2021" + +[dependencies] +arrow2 = { path = "../", default-features = false, features = ["io_odbc"] } +lazy_static = "1.4.0" +# Function name macro is used to ensure unique table names in test +stdext = "0.3.1" \ No newline at end of file diff --git a/arrow-odbc-integration-testing/dockerfile b/arrow-odbc-integration-testing/dockerfile index 5cca31fb927..bc25009fde6 100644 --- a/arrow-odbc-integration-testing/dockerfile +++ b/arrow-odbc-integration-testing/dockerfile @@ -7,7 +7,7 @@ FROM mcr.microsoft.com/vscode/devcontainers/base:0-${VARIANT} # Install Microsoft ODBC SQL Drivers (msodbcsql17 package) for Debian 10 # https://docs.microsoft.com/de-de/sql/connect/odbc/linux-mac/installing-the-microsoft-odbc-driver-for-sql-server?view=sql-server-ver15 RUN curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - -RUN curl https://packages.microsoft.com/config/ubuntu/20/prod.list > /etc/apt/sources.list.d/mssql-release.list +RUN curl https://packages.microsoft.com/config/debian/10/prod.list > /etc/apt/sources.list.d/mssql-release.list RUN echo msodbcsql17 msodbcsql/ACCEPT_EULA boolean true | debconf-set-selections # [Optional] Uncomment this section to install additional packages. diff --git a/arrow-odbc-integration-testing/src/lib.rs b/arrow-odbc-integration-testing/src/lib.rs new file mode 100644 index 00000000000..ac2275389e7 --- /dev/null +++ b/arrow-odbc-integration-testing/src/lib.rs @@ -0,0 +1,79 @@ +#![cfg(test)] +use arrow2::io::odbc::api::{Connection, Environment, Error as OdbcError, Cursor, buffers::{buffer_from_description, BufferDescription, BufferKind, AnyColumnView}}; +use lazy_static::lazy_static; +use stdext::function_name; + +lazy_static! { + /// This is an example for using doc comment attributes + pub static ref ENV: Environment = Environment::new().unwrap(); +} + +/// Connection string for our test instance of Microsoft SQL Server +const MSSQL: &str = + "Driver={ODBC Driver 17 for SQL Server};Server=localhost;UID=SA;PWD=My@Test@Password1;"; + +#[test] +fn test() { + // Given a table with a single string + let table_name = function_name!().rsplit_once(':').unwrap().1; + let connection = ENV.connect_with_connection_string(MSSQL).unwrap(); + setup_empty_table(&connection, table_name, &["VARCHAR(50)"]).unwrap(); + connection + .execute( + &format!("INSERT INTO {table_name} (a) VALUES ('Hello, World!')"), + (), + ) + .unwrap(); + + // When + let query = format!("SELECT a FROM {table_name} ORDER BY id"); + let cursor = connection.execute(&query, ()).unwrap().unwrap(); + // This is the maximum number of rows in each batch. + let max_batch_size = 1; + // This is the list of C-Types which will be bound to the result columns of the query. + let buffer_descriptions = vec![BufferDescription { + kind: BufferKind::Text { max_str_len: 50 }, + nullable: true, + }]; + let buffer = buffer_from_description(max_batch_size, buffer_descriptions.into_iter()); + let mut cursor = cursor.bind_buffer(buffer).unwrap(); + let batch = cursor.fetch().unwrap().unwrap(); + let column_buffer = batch.column(0); + let mut text_it = match column_buffer { + AnyColumnView::Text(it) => it, + _ => panic!("Unexpected types") + }; + + // One unwrap, because there is a first element + // Second unwrap, because we know the value not to be null + let bytes = text_it.next().unwrap().unwrap(); + + let value = std::str::from_utf8(bytes).unwrap(); + + assert_eq!(value, "Hello, World!") +} + +/// Creates the table and assures it is empty. Columns are named a,b,c, etc. +pub fn setup_empty_table( + conn: &Connection<'_>, + table_name: &str, + column_types: &[&str], +) -> Result<(), OdbcError> { + let drop_table = &format!("DROP TABLE IF EXISTS {}", table_name); + + let column_names = &["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k"]; + let cols = column_types + .iter() + .zip(column_names) + .map(|(ty, name)| format!("{} {}", name, ty)) + .collect::>() + .join(", "); + + let create_table = format!( + "CREATE TABLE {} (id int IDENTITY(1,1),{});", + table_name, cols + ); + conn.execute(drop_table, ())?; + conn.execute(&create_table, ())?; + Ok(()) +} diff --git a/examples/odbc_read.rs b/examples/odbc_read.rs new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/io/mod.rs b/src/io/mod.rs index 1b5c39ed7c8..9343d4281ce 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -1,6 +1,10 @@ #![forbid(unsafe_code)] //! Contains modules to interface with other formats such as [`csv`], //! [`parquet`], [`json`], [`ipc`], [`mod@print`] and [`avro`]. + +#[cfg(feature = "io_odbc")] +pub mod odbc; + #[cfg(any( feature = "io_csv_read", feature = "io_csv_read_async", diff --git a/src/io/odbc.rs b/src/io/odbc.rs new file mode 100644 index 00000000000..728d4a4bad9 --- /dev/null +++ b/src/io/odbc.rs @@ -0,0 +1,2 @@ +//! API to connect to ODBC +pub use odbc_api as api; From 936dc4855d373810ea125bb5cee4255a542faaac Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Thu, 17 Feb 2022 20:28:43 +0000 Subject: [PATCH 03/15] Added deserializers for basic ODBC types --- Cargo.toml | 5 +- arrow-odbc-integration-testing/src/lib.rs | 162 +++++++++++----------- examples/odbc_read.rs | 0 src/io/odbc.rs | 2 - src/io/odbc/deserialize.rs | 106 ++++++++++++++ src/io/odbc/mod.rs | 37 +++++ src/io/odbc/schema.rs | 80 +++++++++++ 7 files changed, 309 insertions(+), 83 deletions(-) delete mode 100644 examples/odbc_read.rs delete mode 100644 src/io/odbc.rs create mode 100644 src/io/odbc/deserialize.rs create mode 100644 src/io/odbc/mod.rs create mode 100644 src/io/odbc/schema.rs diff --git a/Cargo.toml b/Cargo.toml index 0b50ed79a0e..3ac9b26706e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,7 +88,8 @@ strength_reduce = { version = "0.2", optional = true } multiversion = { version = "0.6.1", optional = true } # For instruction multiversioning -odbc-api = { version = "0.34", optional = true } +#odbc-api = { version = "0.34", optional = true } +odbc-api = { git = "https://github.com/jorgecarleitao/odbc-api", branch= "expose_indicators", optional = true } [dev-dependencies] criterion = "0.3" @@ -107,7 +108,7 @@ features = ["full"] rustdoc-args = ["--cfg", "docsrs"] [features] -default = ["io_odbc"] +default = [] full = [ "io_odbc", "io_csv", diff --git a/arrow-odbc-integration-testing/src/lib.rs b/arrow-odbc-integration-testing/src/lib.rs index ac2275389e7..05014f54e7b 100644 --- a/arrow-odbc-integration-testing/src/lib.rs +++ b/arrow-odbc-integration-testing/src/lib.rs @@ -1,79 +1,83 @@ -#![cfg(test)] -use arrow2::io::odbc::api::{Connection, Environment, Error as OdbcError, Cursor, buffers::{buffer_from_description, BufferDescription, BufferKind, AnyColumnView}}; -use lazy_static::lazy_static; -use stdext::function_name; - -lazy_static! { - /// This is an example for using doc comment attributes - pub static ref ENV: Environment = Environment::new().unwrap(); -} - -/// Connection string for our test instance of Microsoft SQL Server -const MSSQL: &str = - "Driver={ODBC Driver 17 for SQL Server};Server=localhost;UID=SA;PWD=My@Test@Password1;"; - -#[test] -fn test() { - // Given a table with a single string - let table_name = function_name!().rsplit_once(':').unwrap().1; - let connection = ENV.connect_with_connection_string(MSSQL).unwrap(); - setup_empty_table(&connection, table_name, &["VARCHAR(50)"]).unwrap(); - connection - .execute( - &format!("INSERT INTO {table_name} (a) VALUES ('Hello, World!')"), - (), - ) - .unwrap(); - - // When - let query = format!("SELECT a FROM {table_name} ORDER BY id"); - let cursor = connection.execute(&query, ()).unwrap().unwrap(); - // This is the maximum number of rows in each batch. - let max_batch_size = 1; - // This is the list of C-Types which will be bound to the result columns of the query. - let buffer_descriptions = vec![BufferDescription { - kind: BufferKind::Text { max_str_len: 50 }, - nullable: true, - }]; - let buffer = buffer_from_description(max_batch_size, buffer_descriptions.into_iter()); - let mut cursor = cursor.bind_buffer(buffer).unwrap(); - let batch = cursor.fetch().unwrap().unwrap(); - let column_buffer = batch.column(0); - let mut text_it = match column_buffer { - AnyColumnView::Text(it) => it, - _ => panic!("Unexpected types") - }; - - // One unwrap, because there is a first element - // Second unwrap, because we know the value not to be null - let bytes = text_it.next().unwrap().unwrap(); - - let value = std::str::from_utf8(bytes).unwrap(); - - assert_eq!(value, "Hello, World!") -} - -/// Creates the table and assures it is empty. Columns are named a,b,c, etc. -pub fn setup_empty_table( - conn: &Connection<'_>, - table_name: &str, - column_types: &[&str], -) -> Result<(), OdbcError> { - let drop_table = &format!("DROP TABLE IF EXISTS {}", table_name); - - let column_names = &["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k"]; - let cols = column_types - .iter() - .zip(column_names) - .map(|(ty, name)| format!("{} {}", name, ty)) - .collect::>() - .join(", "); - - let create_table = format!( - "CREATE TABLE {} (id int IDENTITY(1,1),{});", - table_name, cols - ); - conn.execute(drop_table, ())?; - conn.execute(&create_table, ())?; - Ok(()) -} +#![cfg(test)] +use arrow2::array::Int32Array; +use arrow2::chunk::Chunk; +use arrow2::error::Result; +use arrow2::io::odbc::api::{Connection, Cursor, Environment, Error as OdbcError}; +use arrow2::io::odbc::{buffer_from_metadata, deserialize, infer_schema}; +use lazy_static::lazy_static; +use stdext::function_name; + +lazy_static! { + /// This is an example for using doc comment attributes + pub static ref ENV: Environment = Environment::new().unwrap(); +} + +/// Connection string for our test instance of Microsoft SQL Server +const MSSQL: &str = + "Driver={ODBC Driver 17 for SQL Server};Server=localhost;UID=SA;PWD=My@Test@Password1;"; + +#[test] +fn test() -> Result<()> { + // Given a table with a single string + let table_name = function_name!().rsplit_once(':').unwrap().1; + let connection = ENV.connect_with_connection_string(MSSQL).unwrap(); + setup_empty_table(&connection, table_name, &["INT"]).unwrap(); + connection + .execute(&format!("INSERT INTO {table_name} (a) VALUES (1)"), ()) + .unwrap(); + + // When + let query = format!("SELECT a FROM {table_name} ORDER BY id"); + let mut a = connection.prepare(&query).unwrap(); + let fields = infer_schema(&a)?; + + let max_batch_size = 100; + let buffer = buffer_from_metadata(&a, max_batch_size).unwrap(); + + let cursor = a.execute(()).unwrap().unwrap(); + let mut cursor = cursor.bind_buffer(buffer).unwrap(); + + let mut chunks = vec![]; + while let Some(batch) = cursor.fetch().unwrap() { + let arrays = (0..batch.num_cols()) + .zip(fields.iter()) + .map(|(index, field)| { + let column_view = batch.column(index); + deserialize(column_view, field.data_type.clone()) + }) + .collect::>(); + chunks.push(Chunk::new(arrays)); + } + + assert_eq!( + chunks, + vec![Chunk::new(vec![Box::new(Int32Array::from_slice([1])) as _])] + ); + + Ok(()) +} + +/// Creates the table and assures it is empty. Columns are named a,b,c, etc. +pub fn setup_empty_table( + conn: &Connection<'_>, + table_name: &str, + column_types: &[&str], +) -> std::result::Result<(), OdbcError> { + let drop_table = &format!("DROP TABLE IF EXISTS {}", table_name); + + let column_names = &["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k"]; + let cols = column_types + .iter() + .zip(column_names) + .map(|(ty, name)| format!("{} {}", name, ty)) + .collect::>() + .join(", "); + + let create_table = format!( + "CREATE TABLE {} (id int IDENTITY(1,1),{});", + table_name, cols + ); + conn.execute(drop_table, ())?; + conn.execute(&create_table, ())?; + Ok(()) +} diff --git a/examples/odbc_read.rs b/examples/odbc_read.rs deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/src/io/odbc.rs b/src/io/odbc.rs deleted file mode 100644 index 728d4a4bad9..00000000000 --- a/src/io/odbc.rs +++ /dev/null @@ -1,2 +0,0 @@ -//! API to connect to ODBC -pub use odbc_api as api; diff --git a/src/io/odbc/deserialize.rs b/src/io/odbc/deserialize.rs new file mode 100644 index 00000000000..21312236c24 --- /dev/null +++ b/src/io/odbc/deserialize.rs @@ -0,0 +1,106 @@ +use odbc_api::Bit; + +use crate::array::{Array, BooleanArray, PrimitiveArray}; +use crate::bitmap::{Bitmap, MutableBitmap}; +use crate::datatypes::DataType; +use crate::types::NativeType; + +use super::api::buffers::AnyColumnView; + +/// Deserializes a [`AnyColumnView`] into an array of [`DataType`]. +/// This is Pure CPU +pub fn deserialize(column: AnyColumnView, data_type: DataType) -> Box { + match column { + AnyColumnView::Text(_) => todo!(), + AnyColumnView::WText(_) => todo!(), + AnyColumnView::Binary(_) => todo!(), + AnyColumnView::Date(_) => todo!(), + AnyColumnView::Time(_) => todo!(), + AnyColumnView::Timestamp(_) => todo!(), + AnyColumnView::F64(values) => Box::new(deserialize_p(data_type, values)) as _, + AnyColumnView::F32(values) => Box::new(deserialize_p(data_type, values)) as _, + AnyColumnView::I8(values) => Box::new(deserialize_p(data_type, values)) as _, + AnyColumnView::I16(values) => Box::new(deserialize_p(data_type, values)) as _, + AnyColumnView::I32(values) => Box::new(deserialize_p(data_type, values)) as _, + AnyColumnView::I64(values) => Box::new(deserialize_p(data_type, values)) as _, + AnyColumnView::U8(values) => Box::new(deserialize_p(data_type, values)) as _, + AnyColumnView::Bit(values) => Box::new(deserialize_bool(data_type, values)) as _, + AnyColumnView::NullableDate(_) => todo!(), + AnyColumnView::NullableTime(_) => todo!(), + AnyColumnView::NullableTimestamp(_) => todo!(), + AnyColumnView::NullableF64(slice) => Box::new(deserialize_p_optional( + data_type, + slice.values(), + slice.indicators(), + )) as _, + AnyColumnView::NullableF32(slice) => Box::new(deserialize_p_optional( + data_type, + slice.values(), + slice.indicators(), + )) as _, + AnyColumnView::NullableI8(slice) => Box::new(deserialize_p_optional( + data_type, + slice.values(), + slice.indicators(), + )) as _, + AnyColumnView::NullableI16(slice) => Box::new(deserialize_p_optional( + data_type, + slice.values(), + slice.indicators(), + )) as _, + AnyColumnView::NullableI32(slice) => Box::new(deserialize_p_optional( + data_type, + slice.values(), + slice.indicators(), + )) as _, + AnyColumnView::NullableI64(slice) => Box::new(deserialize_p_optional( + data_type, + slice.values(), + slice.indicators(), + )) as _, + AnyColumnView::NullableU8(slice) => Box::new(deserialize_p_optional( + data_type, + slice.values(), + slice.indicators(), + )) as _, + AnyColumnView::NullableBit(slice) => Box::new(deserialize_bool_optional( + data_type, + slice.values(), + slice.indicators(), + )) as _, + } +} + +fn deserialize_bitmap(values: &[isize]) -> Option { + MutableBitmap::from_trusted_len_iter(values.iter().map(|x| *x != -1)).into() +} + +fn deserialize_p(data_type: DataType, values: &[T]) -> PrimitiveArray { + PrimitiveArray::from_data(data_type, values.to_vec().into(), None) +} + +fn deserialize_p_optional( + data_type: DataType, + values: &[T], + indicators: &[isize], +) -> PrimitiveArray { + let validity = deserialize_bitmap(indicators); + PrimitiveArray::from_data(data_type, values.to_vec().into(), validity) +} + +fn deserialize_bool(data_type: DataType, values: &[Bit]) -> BooleanArray { + let values = values.iter().map(|x| x.as_bool()); + let values = Bitmap::from_trusted_len_iter(values); + BooleanArray::from_data(data_type, values, None) +} + +fn deserialize_bool_optional( + data_type: DataType, + values: &[Bit], + indicators: &[isize], +) -> BooleanArray { + let validity = deserialize_bitmap(indicators); + let values = values.iter().map(|x| x.as_bool()); + let values = Bitmap::from_trusted_len_iter(values); + BooleanArray::from_data(data_type, values, validity) +} diff --git a/src/io/odbc/mod.rs b/src/io/odbc/mod.rs new file mode 100644 index 00000000000..e931f157aaf --- /dev/null +++ b/src/io/odbc/mod.rs @@ -0,0 +1,37 @@ +//! API to connect to ODBC +pub use odbc_api as api; + +mod deserialize; +mod schema; + +pub use deserialize::deserialize; +pub use schema::infer_schema; + +/// Creates a [`api::buffers::ColumnarBuffer`] from the metadata. +/// # Errors +/// If the driver provides incorrect an incorrect [`ResultSetMetadata`] +pub fn buffer_from_metadata( + resut_set_metadata: &impl api::ResultSetMetadata, + max_batch_size: usize, +) -> std::result::Result, api::Error> { + let num_cols: u16 = resut_set_metadata.num_result_cols()? as u16; + + let descs = (0..num_cols) + .map(|index| { + let mut column_description = api::ColumnDescription::default(); + + resut_set_metadata.describe_col(index + 1, &mut column_description)?; + + Ok(api::buffers::BufferDescription { + nullable: column_description.could_be_nullable(), + kind: api::buffers::BufferKind::from_data_type(column_description.data_type) + .unwrap(), + }) + }) + .collect::, api::Error>>()?; + + Ok(api::buffers::buffer_from_description( + max_batch_size, + descs.into_iter(), + )) +} diff --git a/src/io/odbc/schema.rs b/src/io/odbc/schema.rs new file mode 100644 index 00000000000..5259bda1e8c --- /dev/null +++ b/src/io/odbc/schema.rs @@ -0,0 +1,80 @@ +use super::api; +use super::api::ResultSetMetadata; + +use crate::datatypes::{DataType, Field, TimeUnit}; +use crate::error::Result; + +/// Infers the Arrow [Field]s from a [`ResultSetMetadata`] +pub fn infer_schema(resut_set_metadata: &impl ResultSetMetadata) -> Result> { + let num_cols: u16 = resut_set_metadata.num_result_cols().unwrap() as u16; + + let fields = (0..num_cols) + .map(|index| { + let mut column_description = api::ColumnDescription::default(); + resut_set_metadata + .describe_col(index + 1, &mut column_description) + .unwrap(); + + column_to_field(&column_description) + }) + .collect(); + Ok(fields) +} + +fn column_to_field(column_description: &api::ColumnDescription) -> Field { + Field::new( + &column_description + .name_to_string() + .expect("Column name must be representable in utf8"), + column_to_data_type(&column_description.data_type), + column_description.could_be_nullable(), + ) +} + +fn column_to_data_type(data_type: &api::DataType) -> DataType { + use api::DataType as OdbcDataType; + match data_type { + OdbcDataType::Numeric { + precision: p @ 0..=38, + scale, + } + | OdbcDataType::Decimal { + precision: p @ 0..=38, + scale, + } => DataType::Decimal(*p, (*scale) as usize), + OdbcDataType::Integer => DataType::Int32, + OdbcDataType::SmallInt => DataType::Int16, + OdbcDataType::Real | OdbcDataType::Float { precision: 0..=24 } => DataType::Float32, + OdbcDataType::Float { precision: _ } | OdbcDataType::Double => DataType::Float64, + OdbcDataType::Date => DataType::Date32, + OdbcDataType::Timestamp { precision: 0 } => DataType::Timestamp(TimeUnit::Second, None), + OdbcDataType::Timestamp { precision: 1..=3 } => { + DataType::Timestamp(TimeUnit::Millisecond, None) + } + OdbcDataType::Timestamp { precision: 4..=6 } => { + DataType::Timestamp(TimeUnit::Microsecond, None) + } + OdbcDataType::Timestamp { precision: _ } => DataType::Timestamp(TimeUnit::Nanosecond, None), + OdbcDataType::BigInt => DataType::Int64, + OdbcDataType::TinyInt => DataType::Int8, + OdbcDataType::Bit => DataType::Boolean, + OdbcDataType::Binary { length } => DataType::FixedSizeBinary(*length), + OdbcDataType::LongVarbinary { length: _ } | OdbcDataType::Varbinary { length: _ } => { + DataType::Binary + } + OdbcDataType::Unknown + | OdbcDataType::Time { precision: _ } + | OdbcDataType::Numeric { .. } + | OdbcDataType::Decimal { .. } + | OdbcDataType::Other { + data_type: _, + column_size: _, + decimal_digits: _, + } + | OdbcDataType::WChar { length: _ } + | OdbcDataType::Char { length: _ } + | OdbcDataType::WVarchar { length: _ } + | OdbcDataType::LongVarchar { length: _ } + | OdbcDataType::Varchar { length: _ } => DataType::Utf8, + } +} From 85202eb0f24915efd5498bf6e47d8737439cb8f2 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Thu, 17 Feb 2022 21:39:37 +0000 Subject: [PATCH 04/15] Added CI --- .github/workflows/integration-odbc.yml | 40 ++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 .github/workflows/integration-odbc.yml diff --git a/.github/workflows/integration-odbc.yml b/.github/workflows/integration-odbc.yml new file mode 100644 index 00000000000..2ee3bf096f2 --- /dev/null +++ b/.github/workflows/integration-odbc.yml @@ -0,0 +1,40 @@ +name: Integration ODBC + +on: [push, pull_request] + +env: + CARGO_TERM_COLOR: always + +jobs: + linux: + name: Test + runs-on: ubuntu-latest + + services: + sqlserver: + image: mcr.microsoft.com/mssql/server:2017-latest-ubuntu + ports: + - 1433:1433 + env: + ACCEPT_EULA: Y + SA_PASSWORD: My@Test@Password1 + + steps: + - name: Checkout + uses: actions/checkout@v2 + - name: Install ODBC Drivers + run: | + curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - + curl https://packages.microsoft.com/config/ubuntu/20.04/prod.list > /etc/apt/sources.list.d/mssql-release.list + apt-get update + ACCEPT_EULA=Y apt-get install -y msodbcsql17 + ln -s /opt/microsoft/msodbcsql17/lib64/libmsodbcsql-17.*.so.* /opt/microsoft/msodbcsql17/lib64/libmsodbcsql-17.so + shell: sudo bash {0} + - name: Setup Rust toolchain + run: | + rustup toolchain install stable + rustup default stable + rustup component add rustfmt clippy + - uses: Swatinem/rust-cache@v1 + - name: Test + run: cd arrow-odbc-integration-testing && cargo test From 3b021863ab5a1a2569ebd505fc2c25a80d81fedb Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Thu, 17 Feb 2022 21:44:16 +0000 Subject: [PATCH 05/15] Tweaks to documentation --- Cargo.toml | 2 +- src/io/odbc/deserialize.rs | 2 +- src/io/odbc/mod.rs | 2 +- src/io/odbc/schema.rs | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3ac9b26706e..a8afde46f51 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,7 +87,7 @@ strength_reduce = { version = "0.2", optional = true } # For instruction multiversioning multiversion = { version = "0.6.1", optional = true } -# For instruction multiversioning +# For support for odbc #odbc-api = { version = "0.34", optional = true } odbc-api = { git = "https://github.com/jorgecarleitao/odbc-api", branch= "expose_indicators", optional = true } diff --git a/src/io/odbc/deserialize.rs b/src/io/odbc/deserialize.rs index 21312236c24..cc0298e9e9e 100644 --- a/src/io/odbc/deserialize.rs +++ b/src/io/odbc/deserialize.rs @@ -8,7 +8,7 @@ use crate::types::NativeType; use super::api::buffers::AnyColumnView; /// Deserializes a [`AnyColumnView`] into an array of [`DataType`]. -/// This is Pure CPU +/// This is CPU-bounded pub fn deserialize(column: AnyColumnView, data_type: DataType) -> Box { match column { AnyColumnView::Text(_) => todo!(), diff --git a/src/io/odbc/mod.rs b/src/io/odbc/mod.rs index e931f157aaf..e1cf2376067 100644 --- a/src/io/odbc/mod.rs +++ b/src/io/odbc/mod.rs @@ -9,7 +9,7 @@ pub use schema::infer_schema; /// Creates a [`api::buffers::ColumnarBuffer`] from the metadata. /// # Errors -/// If the driver provides incorrect an incorrect [`ResultSetMetadata`] +/// Iff the driver provides an incorrect [`ResultSetMetadata`] pub fn buffer_from_metadata( resut_set_metadata: &impl api::ResultSetMetadata, max_batch_size: usize, diff --git a/src/io/odbc/schema.rs b/src/io/odbc/schema.rs index 5259bda1e8c..e0f538e4abc 100644 --- a/src/io/odbc/schema.rs +++ b/src/io/odbc/schema.rs @@ -4,7 +4,7 @@ use super::api::ResultSetMetadata; use crate::datatypes::{DataType, Field, TimeUnit}; use crate::error::Result; -/// Infers the Arrow [Field]s from a [`ResultSetMetadata`] +/// Infers the Arrow [`Field`]s from a [`ResultSetMetadata`] pub fn infer_schema(resut_set_metadata: &impl ResultSetMetadata) -> Result> { let num_cols: u16 = resut_set_metadata.num_result_cols().unwrap() as u16; From 93b053fdbfc37cbcb31b79e63e53215747005a3f Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Thu, 17 Feb 2022 22:43:25 +0000 Subject: [PATCH 06/15] Added tests for remaining cases and Binary/Text support --- arrow-odbc-integration-testing/src/lib.rs | 113 +++++++++++++-- src/io/odbc/deserialize.rs | 169 ++++++++++++++-------- 2 files changed, 208 insertions(+), 74 deletions(-) diff --git a/arrow-odbc-integration-testing/src/lib.rs b/arrow-odbc-integration-testing/src/lib.rs index 05014f54e7b..373211850c6 100644 --- a/arrow-odbc-integration-testing/src/lib.rs +++ b/arrow-odbc-integration-testing/src/lib.rs @@ -1,6 +1,7 @@ #![cfg(test)] -use arrow2::array::Int32Array; +use arrow2::array::{Array, BinaryArray, BooleanArray, Int32Array, Utf8Array}; use arrow2::chunk::Chunk; +use arrow2::datatypes::Field; use arrow2::error::Result; use arrow2::io::odbc::api::{Connection, Cursor, Environment, Error as OdbcError}; use arrow2::io::odbc::{buffer_from_metadata, deserialize, infer_schema}; @@ -17,18 +18,111 @@ const MSSQL: &str = "Driver={ODBC Driver 17 for SQL Server};Server=localhost;UID=SA;PWD=My@Test@Password1;"; #[test] -fn test() -> Result<()> { - // Given a table with a single string +fn int() -> Result<()> { let table_name = function_name!().rsplit_once(':').unwrap().1; + let expected = vec![Chunk::new(vec![Box::new(Int32Array::from_slice([1])) as _])]; + + test(expected, "INT", "(1)", table_name) +} + +#[test] +fn int_nullable() -> Result<()> { + let table_name = function_name!().rsplit_once(':').unwrap().1; + let expected = vec![Chunk::new(vec![ + Box::new(Int32Array::from([Some(1), None])) as _, + ])]; + + test(expected, "INT", "(1),(NULL)", table_name) +} + +#[test] +fn bool() -> Result<()> { + let table_name = function_name!().rsplit_once(':').unwrap().1; + let expected = vec![Chunk::new(vec![ + Box::new(BooleanArray::from_slice([true])) as _ + ])]; + + test(expected, "BIT", "(1)", table_name) +} + +#[test] +fn bool_nullable() -> Result<()> { + let table_name = function_name!().rsplit_once(':').unwrap().1; + let expected = vec![Chunk::new(vec![ + Box::new(BooleanArray::from([Some(true), None])) as _, + ])]; + + test(expected, "BIT", "(1),(NULL)", table_name) +} + +#[test] +fn binary() -> Result<()> { + let table_name = function_name!().rsplit_once(':').unwrap().1; + let expected = vec![Chunk::new(vec![ + Box::new(BinaryArray::::from([Some(b"ab")])) as _, + ])]; + + test( + expected, + "VARBINARY(2)", + "(CAST('ab' AS VARBINARY(2)))", + table_name, + ) +} + +#[test] +fn binary_nullable() -> Result<()> { + let table_name = function_name!().rsplit_once(':').unwrap().1; + let expected = + vec![Chunk::new(vec![ + Box::new(BinaryArray::::from([Some(b"ab"), None, Some(b"ac")])) as _, + ])]; + + test( + expected, + "VARBINARY(2)", + "(CAST('ab' AS VARBINARY(2))),(NULL),(CAST('ac' AS VARBINARY(2)))", + table_name, + ) +} + +#[test] +fn utf8_nullable() -> Result<()> { + let table_name = function_name!().rsplit_once(':').unwrap().1; + let expected = + vec![Chunk::new(vec![ + Box::new(Utf8Array::::from([Some("ab"), None, Some("ac")])) as _, + ])]; + + test(expected, "VARCHAR(2)", "('ab'),(NULL),('ac')", table_name) +} + +fn test( + expected: Vec>>, + type_: &str, + insert: &str, + table_name: &str, +) -> Result<()> { let connection = ENV.connect_with_connection_string(MSSQL).unwrap(); - setup_empty_table(&connection, table_name, &["INT"]).unwrap(); + setup_empty_table(&connection, table_name, &[type_]).unwrap(); connection - .execute(&format!("INSERT INTO {table_name} (a) VALUES (1)"), ()) + .execute(&format!("INSERT INTO {table_name} (a) VALUES {insert}"), ()) .unwrap(); // When let query = format!("SELECT a FROM {table_name} ORDER BY id"); - let mut a = connection.prepare(&query).unwrap(); + + let chunks = read(&connection, &query)?.1; + + assert_eq!(chunks, expected); + Ok(()) +} + +fn read( + connection: &Connection<'_>, + query: &str, +) -> Result<(Vec, Vec>>)> { + let mut a = connection.prepare(query).unwrap(); let fields = infer_schema(&a)?; let max_batch_size = 100; @@ -49,12 +143,7 @@ fn test() -> Result<()> { chunks.push(Chunk::new(arrays)); } - assert_eq!( - chunks, - vec![Chunk::new(vec![Box::new(Int32Array::from_slice([1])) as _])] - ); - - Ok(()) + Ok((fields, chunks)) } /// Creates the table and assures it is empty. Columns are named a,b,c, etc. diff --git a/src/io/odbc/deserialize.rs b/src/io/odbc/deserialize.rs index cc0298e9e9e..6a886d76f3f 100644 --- a/src/io/odbc/deserialize.rs +++ b/src/io/odbc/deserialize.rs @@ -1,7 +1,8 @@ use odbc_api::Bit; -use crate::array::{Array, BooleanArray, PrimitiveArray}; +use crate::array::{Array, BinaryArray, BooleanArray, PrimitiveArray, Utf8Array}; use crate::bitmap::{Bitmap, MutableBitmap}; +use crate::buffer::Buffer; use crate::datatypes::DataType; use crate::types::NativeType; @@ -11,96 +12,140 @@ use super::api::buffers::AnyColumnView; /// This is CPU-bounded pub fn deserialize(column: AnyColumnView, data_type: DataType) -> Box { match column { - AnyColumnView::Text(_) => todo!(), + AnyColumnView::Text(slice) => Box::new(utf8( + data_type, + slice.values(), + slice.lengths(), + slice.max_len(), + )) as _, AnyColumnView::WText(_) => todo!(), - AnyColumnView::Binary(_) => todo!(), + AnyColumnView::Binary(slice) => Box::new(binary( + data_type, + slice.values(), + slice.lengths(), + slice.max_len(), + )) as _, AnyColumnView::Date(_) => todo!(), AnyColumnView::Time(_) => todo!(), AnyColumnView::Timestamp(_) => todo!(), - AnyColumnView::F64(values) => Box::new(deserialize_p(data_type, values)) as _, - AnyColumnView::F32(values) => Box::new(deserialize_p(data_type, values)) as _, - AnyColumnView::I8(values) => Box::new(deserialize_p(data_type, values)) as _, - AnyColumnView::I16(values) => Box::new(deserialize_p(data_type, values)) as _, - AnyColumnView::I32(values) => Box::new(deserialize_p(data_type, values)) as _, - AnyColumnView::I64(values) => Box::new(deserialize_p(data_type, values)) as _, - AnyColumnView::U8(values) => Box::new(deserialize_p(data_type, values)) as _, - AnyColumnView::Bit(values) => Box::new(deserialize_bool(data_type, values)) as _, + AnyColumnView::F64(values) => Box::new(p(data_type, values)) as _, + AnyColumnView::F32(values) => Box::new(p(data_type, values)) as _, + AnyColumnView::I8(values) => Box::new(p(data_type, values)) as _, + AnyColumnView::I16(values) => Box::new(p(data_type, values)) as _, + AnyColumnView::I32(values) => Box::new(p(data_type, values)) as _, + AnyColumnView::I64(values) => Box::new(p(data_type, values)) as _, + AnyColumnView::U8(values) => Box::new(p(data_type, values)) as _, + AnyColumnView::Bit(values) => Box::new(bool(data_type, values)) as _, AnyColumnView::NullableDate(_) => todo!(), AnyColumnView::NullableTime(_) => todo!(), AnyColumnView::NullableTimestamp(_) => todo!(), - AnyColumnView::NullableF64(slice) => Box::new(deserialize_p_optional( - data_type, - slice.values(), - slice.indicators(), - )) as _, - AnyColumnView::NullableF32(slice) => Box::new(deserialize_p_optional( - data_type, - slice.values(), - slice.indicators(), - )) as _, - AnyColumnView::NullableI8(slice) => Box::new(deserialize_p_optional( - data_type, - slice.values(), - slice.indicators(), - )) as _, - AnyColumnView::NullableI16(slice) => Box::new(deserialize_p_optional( - data_type, - slice.values(), - slice.indicators(), - )) as _, - AnyColumnView::NullableI32(slice) => Box::new(deserialize_p_optional( - data_type, - slice.values(), - slice.indicators(), - )) as _, - AnyColumnView::NullableI64(slice) => Box::new(deserialize_p_optional( - data_type, - slice.values(), - slice.indicators(), - )) as _, - AnyColumnView::NullableU8(slice) => Box::new(deserialize_p_optional( - data_type, - slice.values(), - slice.indicators(), - )) as _, - AnyColumnView::NullableBit(slice) => Box::new(deserialize_bool_optional( - data_type, - slice.values(), - slice.indicators(), - )) as _, + AnyColumnView::NullableF64(slice) => { + Box::new(p_optional(data_type, slice.values(), slice.indicators())) as _ + } + AnyColumnView::NullableF32(slice) => { + Box::new(p_optional(data_type, slice.values(), slice.indicators())) as _ + } + AnyColumnView::NullableI8(slice) => { + Box::new(p_optional(data_type, slice.values(), slice.indicators())) as _ + } + AnyColumnView::NullableI16(slice) => { + Box::new(p_optional(data_type, slice.values(), slice.indicators())) as _ + } + AnyColumnView::NullableI32(slice) => { + Box::new(p_optional(data_type, slice.values(), slice.indicators())) as _ + } + AnyColumnView::NullableI64(slice) => { + Box::new(p_optional(data_type, slice.values(), slice.indicators())) as _ + } + AnyColumnView::NullableU8(slice) => { + Box::new(p_optional(data_type, slice.values(), slice.indicators())) as _ + } + AnyColumnView::NullableBit(slice) => { + Box::new(bool_optional(data_type, slice.values(), slice.indicators())) as _ + } } } -fn deserialize_bitmap(values: &[isize]) -> Option { +fn bitmap(values: &[isize]) -> Option { MutableBitmap::from_trusted_len_iter(values.iter().map(|x| *x != -1)).into() } -fn deserialize_p(data_type: DataType, values: &[T]) -> PrimitiveArray { +fn p(data_type: DataType, values: &[T]) -> PrimitiveArray { PrimitiveArray::from_data(data_type, values.to_vec().into(), None) } -fn deserialize_p_optional( +fn p_optional( data_type: DataType, values: &[T], indicators: &[isize], ) -> PrimitiveArray { - let validity = deserialize_bitmap(indicators); + let validity = bitmap(indicators); PrimitiveArray::from_data(data_type, values.to_vec().into(), validity) } -fn deserialize_bool(data_type: DataType, values: &[Bit]) -> BooleanArray { +fn bool(data_type: DataType, values: &[Bit]) -> BooleanArray { let values = values.iter().map(|x| x.as_bool()); let values = Bitmap::from_trusted_len_iter(values); BooleanArray::from_data(data_type, values, None) } -fn deserialize_bool_optional( - data_type: DataType, - values: &[Bit], - indicators: &[isize], -) -> BooleanArray { - let validity = deserialize_bitmap(indicators); +fn bool_optional(data_type: DataType, values: &[Bit], indicators: &[isize]) -> BooleanArray { + let validity = bitmap(indicators); let values = values.iter().map(|x| x.as_bool()); let values = Bitmap::from_trusted_len_iter(values); BooleanArray::from_data(data_type, values, validity) } + +fn binary_generic( + slice: &[u8], + lengths: &[isize], + max_length: usize, + null_terminator: usize, +) -> (Buffer, Buffer, Option) { + let mut validity = MutableBitmap::with_capacity(lengths.len()); + + println!("{:?}", lengths); + println!("{:?}", slice); + let mut offsets = Vec::with_capacity(lengths.len() + 1); + offsets.push(0i32); + let mut length = 0; + offsets.extend(lengths.iter().map(|&indicator| { + validity.push(indicator != -1); + length += if indicator > 0 { indicator as i32 } else { 0 }; + length + })); + // the loop above ensures monotonicity + // this proves boundness + assert!((length as usize) < slice.len()); + + let mut values = Vec::::with_capacity(length as usize); + offsets.windows(2).enumerate().for_each(|(index, x)| { + let len = (x[1] - x[0]) as usize; + let offset = index * (max_length + null_terminator); + // this bound check is not necessary + values.extend_from_slice(&slice[offset..offset + len]) + }); + + // this O(N) check is not necessary + + (offsets.into(), values.into(), validity.into()) +} + +fn binary( + data_type: DataType, + slice: &[u8], + lengths: &[isize], + max_length: usize, +) -> BinaryArray { + let (offsets, values, validity) = binary_generic(slice, lengths, max_length, 0); + + // this O(N) check is not necessary + BinaryArray::from_data(data_type, offsets, values, validity) +} + +fn utf8(data_type: DataType, slice: &[u8], lengths: &[isize], max_length: usize) -> Utf8Array { + let (offsets, values, validity) = binary_generic(slice, lengths, max_length, 1); + + // this O(N) check is necessary for the utf8 validity + Utf8Array::from_data(data_type, offsets, values, validity) +} From 1b158c313f1ca6b393a2dd053a6d3623922e2813 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Thu, 17 Feb 2022 22:44:24 +0000 Subject: [PATCH 07/15] Cleanup --- arrow-odbc-integration-testing/Cargo.toml | 2 +- .../docker-compose.yml | 11 --- arrow-odbc-integration-testing/dockerfile | 30 -------- src/io/odbc/deserialize.rs | 76 +++++++++++-------- 4 files changed, 45 insertions(+), 74 deletions(-) delete mode 100644 arrow-odbc-integration-testing/dockerfile diff --git a/arrow-odbc-integration-testing/Cargo.toml b/arrow-odbc-integration-testing/Cargo.toml index 942fafdd577..add16ba5d2d 100644 --- a/arrow-odbc-integration-testing/Cargo.toml +++ b/arrow-odbc-integration-testing/Cargo.toml @@ -8,4 +8,4 @@ edition = "2021" arrow2 = { path = "../", default-features = false, features = ["io_odbc"] } lazy_static = "1.4.0" # Function name macro is used to ensure unique table names in test -stdext = "0.3.1" \ No newline at end of file +stdext = "0.3.1" diff --git a/arrow-odbc-integration-testing/docker-compose.yml b/arrow-odbc-integration-testing/docker-compose.yml index 311fef81cbc..9c344136652 100644 --- a/arrow-odbc-integration-testing/docker-compose.yml +++ b/arrow-odbc-integration-testing/docker-compose.yml @@ -8,14 +8,3 @@ services: environment: - MSSQL_SA_PASSWORD=My@Test@Password1 command: ["/opt/mssql/bin/sqlservr", "--accept-eula", "--reset-sa-password"] - - dev: - build: . - volumes: - - .:/workspace:cached - - # Overrides default command so things don't shut down after the process ends. - command: sleep infinity - - # Runs app on the same network as the database container, allows "forwardPorts" in devcontainer.json function. - network_mode: service:mssql diff --git a/arrow-odbc-integration-testing/dockerfile b/arrow-odbc-integration-testing/dockerfile deleted file mode 100644 index bc25009fde6..00000000000 --- a/arrow-odbc-integration-testing/dockerfile +++ /dev/null @@ -1,30 +0,0 @@ -# See here for image contents: https://github.com/microsoft/vscode-dev-containers/tree/v0.163.1/containers/debian/.devcontainer/base.Dockerfile - -# [Choice] Debian version: buster, stretch -ARG VARIANT="buster" -FROM mcr.microsoft.com/vscode/devcontainers/base:0-${VARIANT} - -# Install Microsoft ODBC SQL Drivers (msodbcsql17 package) for Debian 10 -# https://docs.microsoft.com/de-de/sql/connect/odbc/linux-mac/installing-the-microsoft-odbc-driver-for-sql-server?view=sql-server-ver15 -RUN curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - -RUN curl https://packages.microsoft.com/config/debian/10/prod.list > /etc/apt/sources.list.d/mssql-release.list -RUN echo msodbcsql17 msodbcsql/ACCEPT_EULA boolean true | debconf-set-selections - -# [Optional] Uncomment this section to install additional packages. -RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \ - && apt-get -y install --no-install-recommends \ - build-essential \ - unixodbc-dev \ - msodbcsql17 - -# Setup cargo and install cargo packages required for tests -USER vscode - -# There is also a rust devcontainer, yet this way we get a toolchain -# which is updatable with rustup. -RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- --default-toolchain stable -y - -# We need 'parquet-read' to run the tests. At the moment I do not -# know a way to specify cargo installable executables as build -# dependencies. -RUN /home/vscode/.cargo/bin/cargo install parquet --features cli diff --git a/src/io/odbc/deserialize.rs b/src/io/odbc/deserialize.rs index 6a886d76f3f..a811412b2b3 100644 --- a/src/io/odbc/deserialize.rs +++ b/src/io/odbc/deserialize.rs @@ -28,38 +28,52 @@ pub fn deserialize(column: AnyColumnView, data_type: DataType) -> Box AnyColumnView::Date(_) => todo!(), AnyColumnView::Time(_) => todo!(), AnyColumnView::Timestamp(_) => todo!(), - AnyColumnView::F64(values) => Box::new(p(data_type, values)) as _, - AnyColumnView::F32(values) => Box::new(p(data_type, values)) as _, - AnyColumnView::I8(values) => Box::new(p(data_type, values)) as _, - AnyColumnView::I16(values) => Box::new(p(data_type, values)) as _, - AnyColumnView::I32(values) => Box::new(p(data_type, values)) as _, - AnyColumnView::I64(values) => Box::new(p(data_type, values)) as _, - AnyColumnView::U8(values) => Box::new(p(data_type, values)) as _, + AnyColumnView::F64(values) => Box::new(primitive(data_type, values)) as _, + AnyColumnView::F32(values) => Box::new(primitive(data_type, values)) as _, + AnyColumnView::I8(values) => Box::new(primitive(data_type, values)) as _, + AnyColumnView::I16(values) => Box::new(primitive(data_type, values)) as _, + AnyColumnView::I32(values) => Box::new(primitive(data_type, values)) as _, + AnyColumnView::I64(values) => Box::new(primitive(data_type, values)) as _, + AnyColumnView::U8(values) => Box::new(primitive(data_type, values)) as _, AnyColumnView::Bit(values) => Box::new(bool(data_type, values)) as _, AnyColumnView::NullableDate(_) => todo!(), AnyColumnView::NullableTime(_) => todo!(), AnyColumnView::NullableTimestamp(_) => todo!(), - AnyColumnView::NullableF64(slice) => { - Box::new(p_optional(data_type, slice.values(), slice.indicators())) as _ - } - AnyColumnView::NullableF32(slice) => { - Box::new(p_optional(data_type, slice.values(), slice.indicators())) as _ - } - AnyColumnView::NullableI8(slice) => { - Box::new(p_optional(data_type, slice.values(), slice.indicators())) as _ - } - AnyColumnView::NullableI16(slice) => { - Box::new(p_optional(data_type, slice.values(), slice.indicators())) as _ - } - AnyColumnView::NullableI32(slice) => { - Box::new(p_optional(data_type, slice.values(), slice.indicators())) as _ - } - AnyColumnView::NullableI64(slice) => { - Box::new(p_optional(data_type, slice.values(), slice.indicators())) as _ - } - AnyColumnView::NullableU8(slice) => { - Box::new(p_optional(data_type, slice.values(), slice.indicators())) as _ - } + AnyColumnView::NullableF64(slice) => Box::new(primitive_optional( + data_type, + slice.values(), + slice.indicators(), + )) as _, + AnyColumnView::NullableF32(slice) => Box::new(primitive_optional( + data_type, + slice.values(), + slice.indicators(), + )) as _, + AnyColumnView::NullableI8(slice) => Box::new(primitive_optional( + data_type, + slice.values(), + slice.indicators(), + )) as _, + AnyColumnView::NullableI16(slice) => Box::new(primitive_optional( + data_type, + slice.values(), + slice.indicators(), + )) as _, + AnyColumnView::NullableI32(slice) => Box::new(primitive_optional( + data_type, + slice.values(), + slice.indicators(), + )) as _, + AnyColumnView::NullableI64(slice) => Box::new(primitive_optional( + data_type, + slice.values(), + slice.indicators(), + )) as _, + AnyColumnView::NullableU8(slice) => Box::new(primitive_optional( + data_type, + slice.values(), + slice.indicators(), + )) as _, AnyColumnView::NullableBit(slice) => { Box::new(bool_optional(data_type, slice.values(), slice.indicators())) as _ } @@ -70,11 +84,11 @@ fn bitmap(values: &[isize]) -> Option { MutableBitmap::from_trusted_len_iter(values.iter().map(|x| *x != -1)).into() } -fn p(data_type: DataType, values: &[T]) -> PrimitiveArray { +fn primitive(data_type: DataType, values: &[T]) -> PrimitiveArray { PrimitiveArray::from_data(data_type, values.to_vec().into(), None) } -fn p_optional( +fn primitive_optional( data_type: DataType, values: &[T], indicators: &[isize], @@ -126,8 +140,6 @@ fn binary_generic( values.extend_from_slice(&slice[offset..offset + len]) }); - // this O(N) check is not necessary - (offsets.into(), values.into(), validity.into()) } From c6fe2a7fee43d923b28d62b85f5ef0d5f6c10453 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Fri, 18 Feb 2022 05:12:56 +0000 Subject: [PATCH 08/15] Moved functions --- arrow-odbc-integration-testing/src/lib.rs | 140 +-------------------- arrow-odbc-integration-testing/src/read.rs | 139 ++++++++++++++++++++ 2 files changed, 143 insertions(+), 136 deletions(-) create mode 100644 arrow-odbc-integration-testing/src/read.rs diff --git a/arrow-odbc-integration-testing/src/lib.rs b/arrow-odbc-integration-testing/src/lib.rs index 373211850c6..124c57c3620 100644 --- a/arrow-odbc-integration-testing/src/lib.rs +++ b/arrow-odbc-integration-testing/src/lib.rs @@ -1,12 +1,9 @@ #![cfg(test)] -use arrow2::array::{Array, BinaryArray, BooleanArray, Int32Array, Utf8Array}; -use arrow2::chunk::Chunk; -use arrow2::datatypes::Field; -use arrow2::error::Result; -use arrow2::io::odbc::api::{Connection, Cursor, Environment, Error as OdbcError}; -use arrow2::io::odbc::{buffer_from_metadata, deserialize, infer_schema}; + +mod read; + +use arrow2::io::odbc::api::{Connection, Environment, Error as OdbcError}; use lazy_static::lazy_static; -use stdext::function_name; lazy_static! { /// This is an example for using doc comment attributes @@ -17,135 +14,6 @@ lazy_static! { const MSSQL: &str = "Driver={ODBC Driver 17 for SQL Server};Server=localhost;UID=SA;PWD=My@Test@Password1;"; -#[test] -fn int() -> Result<()> { - let table_name = function_name!().rsplit_once(':').unwrap().1; - let expected = vec![Chunk::new(vec![Box::new(Int32Array::from_slice([1])) as _])]; - - test(expected, "INT", "(1)", table_name) -} - -#[test] -fn int_nullable() -> Result<()> { - let table_name = function_name!().rsplit_once(':').unwrap().1; - let expected = vec![Chunk::new(vec![ - Box::new(Int32Array::from([Some(1), None])) as _, - ])]; - - test(expected, "INT", "(1),(NULL)", table_name) -} - -#[test] -fn bool() -> Result<()> { - let table_name = function_name!().rsplit_once(':').unwrap().1; - let expected = vec![Chunk::new(vec![ - Box::new(BooleanArray::from_slice([true])) as _ - ])]; - - test(expected, "BIT", "(1)", table_name) -} - -#[test] -fn bool_nullable() -> Result<()> { - let table_name = function_name!().rsplit_once(':').unwrap().1; - let expected = vec![Chunk::new(vec![ - Box::new(BooleanArray::from([Some(true), None])) as _, - ])]; - - test(expected, "BIT", "(1),(NULL)", table_name) -} - -#[test] -fn binary() -> Result<()> { - let table_name = function_name!().rsplit_once(':').unwrap().1; - let expected = vec![Chunk::new(vec![ - Box::new(BinaryArray::::from([Some(b"ab")])) as _, - ])]; - - test( - expected, - "VARBINARY(2)", - "(CAST('ab' AS VARBINARY(2)))", - table_name, - ) -} - -#[test] -fn binary_nullable() -> Result<()> { - let table_name = function_name!().rsplit_once(':').unwrap().1; - let expected = - vec![Chunk::new(vec![ - Box::new(BinaryArray::::from([Some(b"ab"), None, Some(b"ac")])) as _, - ])]; - - test( - expected, - "VARBINARY(2)", - "(CAST('ab' AS VARBINARY(2))),(NULL),(CAST('ac' AS VARBINARY(2)))", - table_name, - ) -} - -#[test] -fn utf8_nullable() -> Result<()> { - let table_name = function_name!().rsplit_once(':').unwrap().1; - let expected = - vec![Chunk::new(vec![ - Box::new(Utf8Array::::from([Some("ab"), None, Some("ac")])) as _, - ])]; - - test(expected, "VARCHAR(2)", "('ab'),(NULL),('ac')", table_name) -} - -fn test( - expected: Vec>>, - type_: &str, - insert: &str, - table_name: &str, -) -> Result<()> { - let connection = ENV.connect_with_connection_string(MSSQL).unwrap(); - setup_empty_table(&connection, table_name, &[type_]).unwrap(); - connection - .execute(&format!("INSERT INTO {table_name} (a) VALUES {insert}"), ()) - .unwrap(); - - // When - let query = format!("SELECT a FROM {table_name} ORDER BY id"); - - let chunks = read(&connection, &query)?.1; - - assert_eq!(chunks, expected); - Ok(()) -} - -fn read( - connection: &Connection<'_>, - query: &str, -) -> Result<(Vec, Vec>>)> { - let mut a = connection.prepare(query).unwrap(); - let fields = infer_schema(&a)?; - - let max_batch_size = 100; - let buffer = buffer_from_metadata(&a, max_batch_size).unwrap(); - - let cursor = a.execute(()).unwrap().unwrap(); - let mut cursor = cursor.bind_buffer(buffer).unwrap(); - - let mut chunks = vec![]; - while let Some(batch) = cursor.fetch().unwrap() { - let arrays = (0..batch.num_cols()) - .zip(fields.iter()) - .map(|(index, field)| { - let column_view = batch.column(index); - deserialize(column_view, field.data_type.clone()) - }) - .collect::>(); - chunks.push(Chunk::new(arrays)); - } - - Ok((fields, chunks)) -} - /// Creates the table and assures it is empty. Columns are named a,b,c, etc. pub fn setup_empty_table( conn: &Connection<'_>, diff --git a/arrow-odbc-integration-testing/src/read.rs b/arrow-odbc-integration-testing/src/read.rs new file mode 100644 index 00000000000..2f68b29d483 --- /dev/null +++ b/arrow-odbc-integration-testing/src/read.rs @@ -0,0 +1,139 @@ +use stdext::function_name; + +use arrow2::array::{Array, BinaryArray, BooleanArray, Int32Array, Utf8Array}; +use arrow2::chunk::Chunk; +use arrow2::datatypes::Field; +use arrow2::error::Result; +use arrow2::io::odbc::api::{Connection, Cursor}; +use arrow2::io::odbc::{buffer_from_metadata, deserialize, infer_schema}; + +use super::{setup_empty_table, ENV, MSSQL}; + +#[test] +fn int() -> Result<()> { + let table_name = function_name!().rsplit_once(':').unwrap().1; + let expected = vec![Chunk::new(vec![Box::new(Int32Array::from_slice([1])) as _])]; + + test(expected, "INT", "(1)", table_name) +} + +#[test] +fn int_nullable() -> Result<()> { + let table_name = function_name!().rsplit_once(':').unwrap().1; + let expected = vec![Chunk::new(vec![ + Box::new(Int32Array::from([Some(1), None])) as _, + ])]; + + test(expected, "INT", "(1),(NULL)", table_name) +} + +#[test] +fn bool() -> Result<()> { + let table_name = function_name!().rsplit_once(':').unwrap().1; + let expected = vec![Chunk::new(vec![ + Box::new(BooleanArray::from_slice([true])) as _ + ])]; + + test(expected, "BIT", "(1)", table_name) +} + +#[test] +fn bool_nullable() -> Result<()> { + let table_name = function_name!().rsplit_once(':').unwrap().1; + let expected = vec![Chunk::new(vec![ + Box::new(BooleanArray::from([Some(true), None])) as _, + ])]; + + test(expected, "BIT", "(1),(NULL)", table_name) +} + +#[test] +fn binary() -> Result<()> { + let table_name = function_name!().rsplit_once(':').unwrap().1; + let expected = vec![Chunk::new(vec![ + Box::new(BinaryArray::::from([Some(b"ab")])) as _, + ])]; + + test( + expected, + "VARBINARY(2)", + "(CAST('ab' AS VARBINARY(2)))", + table_name, + ) +} + +#[test] +fn binary_nullable() -> Result<()> { + let table_name = function_name!().rsplit_once(':').unwrap().1; + let expected = + vec![Chunk::new(vec![ + Box::new(BinaryArray::::from([Some(b"ab"), None, Some(b"ac")])) as _, + ])]; + + test( + expected, + "VARBINARY(2)", + "(CAST('ab' AS VARBINARY(2))),(NULL),(CAST('ac' AS VARBINARY(2)))", + table_name, + ) +} + +#[test] +fn utf8_nullable() -> Result<()> { + let table_name = function_name!().rsplit_once(':').unwrap().1; + let expected = + vec![Chunk::new(vec![ + Box::new(Utf8Array::::from([Some("ab"), None, Some("ac")])) as _, + ])]; + + test(expected, "VARCHAR(2)", "('ab'),(NULL),('ac')", table_name) +} + +fn test( + expected: Vec>>, + type_: &str, + insert: &str, + table_name: &str, +) -> Result<()> { + let connection = ENV.connect_with_connection_string(MSSQL).unwrap(); + setup_empty_table(&connection, table_name, &[type_]).unwrap(); + connection + .execute(&format!("INSERT INTO {table_name} (a) VALUES {insert}"), ()) + .unwrap(); + + // When + let query = format!("SELECT a FROM {table_name} ORDER BY id"); + + let chunks = read(&connection, &query)?.1; + + assert_eq!(chunks, expected); + Ok(()) +} + +fn read( + connection: &Connection<'_>, + query: &str, +) -> Result<(Vec, Vec>>)> { + let mut a = connection.prepare(query).unwrap(); + let fields = infer_schema(&a)?; + + let max_batch_size = 100; + let buffer = buffer_from_metadata(&a, max_batch_size).unwrap(); + + let cursor = a.execute(()).unwrap().unwrap(); + let mut cursor = cursor.bind_buffer(buffer).unwrap(); + + let mut chunks = vec![]; + while let Some(batch) = cursor.fetch().unwrap() { + let arrays = (0..batch.num_cols()) + .zip(fields.iter()) + .map(|(index, field)| { + let column_view = batch.column(index); + deserialize(column_view, field.data_type.clone()) + }) + .collect::>(); + chunks.push(Chunk::new(arrays)); + } + + Ok((fields, chunks)) +} From cded1e94d2800b94ef84c0b7cc854324bdaefddf Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Fri, 18 Feb 2022 06:26:01 +0000 Subject: [PATCH 09/15] Moved files --- arrow-odbc-integration-testing/src/lib.rs | 1 + arrow-odbc-integration-testing/src/read.rs | 4 +-- src/io/odbc/mod.rs | 38 ++-------------------- src/io/odbc/{ => read}/deserialize.rs | 2 +- src/io/odbc/read/mod.rs | 37 +++++++++++++++++++++ src/io/odbc/{ => read}/schema.rs | 8 ++--- 6 files changed, 48 insertions(+), 42 deletions(-) rename src/io/odbc/{ => read}/deserialize.rs (99%) create mode 100644 src/io/odbc/read/mod.rs rename src/io/odbc/{ => read}/schema.rs (96%) diff --git a/arrow-odbc-integration-testing/src/lib.rs b/arrow-odbc-integration-testing/src/lib.rs index 124c57c3620..cec97c6631a 100644 --- a/arrow-odbc-integration-testing/src/lib.rs +++ b/arrow-odbc-integration-testing/src/lib.rs @@ -1,6 +1,7 @@ #![cfg(test)] mod read; +//mod write; use arrow2::io::odbc::api::{Connection, Environment, Error as OdbcError}; use lazy_static::lazy_static; diff --git a/arrow-odbc-integration-testing/src/read.rs b/arrow-odbc-integration-testing/src/read.rs index 2f68b29d483..145ec852dcf 100644 --- a/arrow-odbc-integration-testing/src/read.rs +++ b/arrow-odbc-integration-testing/src/read.rs @@ -5,7 +5,7 @@ use arrow2::chunk::Chunk; use arrow2::datatypes::Field; use arrow2::error::Result; use arrow2::io::odbc::api::{Connection, Cursor}; -use arrow2::io::odbc::{buffer_from_metadata, deserialize, infer_schema}; +use arrow2::io::odbc::read::{buffer_from_metadata, deserialize, infer_schema}; use super::{setup_empty_table, ENV, MSSQL}; @@ -110,7 +110,7 @@ fn test( Ok(()) } -fn read( +pub fn read( connection: &Connection<'_>, query: &str, ) -> Result<(Vec, Vec>>)> { diff --git a/src/io/odbc/mod.rs b/src/io/odbc/mod.rs index e1cf2376067..3a93c9544ff 100644 --- a/src/io/odbc/mod.rs +++ b/src/io/odbc/mod.rs @@ -1,37 +1,5 @@ -//! API to connect to ODBC +//! API to serialize and deserialize data from and to ODBC pub use odbc_api as api; -mod deserialize; -mod schema; - -pub use deserialize::deserialize; -pub use schema::infer_schema; - -/// Creates a [`api::buffers::ColumnarBuffer`] from the metadata. -/// # Errors -/// Iff the driver provides an incorrect [`ResultSetMetadata`] -pub fn buffer_from_metadata( - resut_set_metadata: &impl api::ResultSetMetadata, - max_batch_size: usize, -) -> std::result::Result, api::Error> { - let num_cols: u16 = resut_set_metadata.num_result_cols()? as u16; - - let descs = (0..num_cols) - .map(|index| { - let mut column_description = api::ColumnDescription::default(); - - resut_set_metadata.describe_col(index + 1, &mut column_description)?; - - Ok(api::buffers::BufferDescription { - nullable: column_description.could_be_nullable(), - kind: api::buffers::BufferKind::from_data_type(column_description.data_type) - .unwrap(), - }) - }) - .collect::, api::Error>>()?; - - Ok(api::buffers::buffer_from_description( - max_batch_size, - descs.into_iter(), - )) -} +pub mod read; +//pub mod write; diff --git a/src/io/odbc/deserialize.rs b/src/io/odbc/read/deserialize.rs similarity index 99% rename from src/io/odbc/deserialize.rs rename to src/io/odbc/read/deserialize.rs index a811412b2b3..74db813d2c5 100644 --- a/src/io/odbc/deserialize.rs +++ b/src/io/odbc/read/deserialize.rs @@ -6,7 +6,7 @@ use crate::buffer::Buffer; use crate::datatypes::DataType; use crate::types::NativeType; -use super::api::buffers::AnyColumnView; +use super::super::api::buffers::AnyColumnView; /// Deserializes a [`AnyColumnView`] into an array of [`DataType`]. /// This is CPU-bounded diff --git a/src/io/odbc/read/mod.rs b/src/io/odbc/read/mod.rs new file mode 100644 index 00000000000..b4077332ff3 --- /dev/null +++ b/src/io/odbc/read/mod.rs @@ -0,0 +1,37 @@ +//! APIs to read from ODBC +mod deserialize; +mod schema; + +pub use deserialize::deserialize; +pub use schema::infer_schema; + +use super::api; + +/// Creates a [`api::buffers::ColumnarBuffer`] from the metadata. +/// # Errors +/// Iff the driver provides an incorrect [`ResultSetMetadata`] +pub fn buffer_from_metadata( + resut_set_metadata: &impl api::ResultSetMetadata, + max_batch_size: usize, +) -> std::result::Result, api::Error> { + let num_cols: u16 = resut_set_metadata.num_result_cols()? as u16; + + let descs = (0..num_cols) + .map(|index| { + let mut column_description = api::ColumnDescription::default(); + + resut_set_metadata.describe_col(index + 1, &mut column_description)?; + + Ok(api::buffers::BufferDescription { + nullable: column_description.could_be_nullable(), + kind: api::buffers::BufferKind::from_data_type(column_description.data_type) + .unwrap(), + }) + }) + .collect::, api::Error>>()?; + + Ok(api::buffers::buffer_from_description( + max_batch_size, + descs.into_iter(), + )) +} diff --git a/src/io/odbc/schema.rs b/src/io/odbc/read/schema.rs similarity index 96% rename from src/io/odbc/schema.rs rename to src/io/odbc/read/schema.rs index e0f538e4abc..c8fc82c02c7 100644 --- a/src/io/odbc/schema.rs +++ b/src/io/odbc/read/schema.rs @@ -1,8 +1,8 @@ -use super::api; -use super::api::ResultSetMetadata; - use crate::datatypes::{DataType, Field, TimeUnit}; -use crate::error::Result; +use crate::error::{ArrowError, Result}; + +use super::super::api; +use super::super::api::ResultSetMetadata; /// Infers the Arrow [`Field`]s from a [`ResultSetMetadata`] pub fn infer_schema(resut_set_metadata: &impl ResultSetMetadata) -> Result> { From 2d59f9de30f7ccb0abff67fa9aac6e26f6a49c92 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Fri, 18 Feb 2022 15:37:20 +0000 Subject: [PATCH 10/15] Added support to write to ODBC --- arrow-odbc-integration-testing/src/lib.rs | 2 +- arrow-odbc-integration-testing/src/write.rs | 99 +++++++++++++++ src/io/odbc/mod.rs | 2 +- src/io/odbc/read/schema.rs | 2 +- src/io/odbc/write/mod.rs | 24 ++++ src/io/odbc/write/schema.rs | 36 ++++++ src/io/odbc/write/serialize.rs | 126 ++++++++++++++++++++ 7 files changed, 288 insertions(+), 3 deletions(-) create mode 100644 arrow-odbc-integration-testing/src/write.rs create mode 100644 src/io/odbc/write/mod.rs create mode 100644 src/io/odbc/write/schema.rs create mode 100644 src/io/odbc/write/serialize.rs diff --git a/arrow-odbc-integration-testing/src/lib.rs b/arrow-odbc-integration-testing/src/lib.rs index cec97c6631a..bfc24d65dc3 100644 --- a/arrow-odbc-integration-testing/src/lib.rs +++ b/arrow-odbc-integration-testing/src/lib.rs @@ -1,7 +1,7 @@ #![cfg(test)] mod read; -//mod write; +mod write; use arrow2::io::odbc::api::{Connection, Environment, Error as OdbcError}; use lazy_static::lazy_static; diff --git a/arrow-odbc-integration-testing/src/write.rs b/arrow-odbc-integration-testing/src/write.rs new file mode 100644 index 00000000000..ecc91582815 --- /dev/null +++ b/arrow-odbc-integration-testing/src/write.rs @@ -0,0 +1,99 @@ +use stdext::function_name; + +use arrow2::array::{Array, BinaryArray, BooleanArray, Int32Array, Utf8Array}; +use arrow2::chunk::Chunk; +use arrow2::datatypes::{DataType, Field}; +use arrow2::error::Result; +use arrow2::io::odbc::api::{Connection, Cursor}; +use arrow2::io::odbc::write::{buffer_from_description, infer_descriptions, serialize}; + +use super::read::read; +use super::{setup_empty_table, ENV, MSSQL}; + +fn test( + expected: Chunk>, + fields: Vec, + type_: &str, + table_name: &str, +) -> Result<()> { + let connection = ENV.connect_with_connection_string(MSSQL).unwrap(); + setup_empty_table(&connection, table_name, &[type_]).unwrap(); + + let query = &format!("INSERT INTO {table_name} (a) VALUES (?)"); + let mut a = connection.prepare(query).unwrap(); + + let mut buffer = buffer_from_description(infer_descriptions(&fields)?, expected.len()); + + // write + buffer.set_num_rows(expected.len()); + let array = &expected.columns()[0]; + + serialize(array.as_ref(), &mut buffer.column_mut(0))?; + + a.execute(&buffer).unwrap(); + + // read + let query = format!("SELECT a FROM {table_name} ORDER BY id"); + let chunks = read(&connection, &query)?.1; + + assert_eq!(chunks[0], expected); + Ok(()) +} + +#[test] +fn int() -> Result<()> { + let table_name = function_name!().rsplit_once(':').unwrap().1; + let table_name = format!("write_{}", table_name); + let expected = Chunk::new(vec![Box::new(Int32Array::from_slice([1])) as _]); + + test( + expected, + vec![Field::new("a", DataType::Int32, false)], + "INT", + &table_name, + ) +} + +#[test] +fn int_nullable() -> Result<()> { + let table_name = function_name!().rsplit_once(':').unwrap().1; + let table_name = format!("write_{}", table_name); + let expected = Chunk::new(vec![Box::new(Int32Array::from([Some(1), None])) as _]); + + test( + expected, + vec![Field::new("a", DataType::Int32, true)], + "INT", + &table_name, + ) +} + +#[test] +fn bool() -> Result<()> { + let table_name = function_name!().rsplit_once(':').unwrap().1; + let table_name = format!("write_{}", table_name); + let expected = Chunk::new(vec![Box::new(BooleanArray::from_slice([true, false])) as _]); + + test( + expected, + vec![Field::new("a", DataType::Boolean, false)], + "BIT", + &table_name, + ) +} + +#[test] +fn bool_nullable() -> Result<()> { + let table_name = function_name!().rsplit_once(':').unwrap().1; + let table_name = format!("write_{}", table_name); + let expected = Chunk::new(vec![ + Box::new(BooleanArray::from([Some(true), Some(false), None])) as _, + ]); + + test( + expected, + vec![Field::new("a", DataType::Boolean, true)], + "BIT", + &table_name, + ) +} diff --git a/src/io/odbc/mod.rs b/src/io/odbc/mod.rs index 3a93c9544ff..1407cf1e3d9 100644 --- a/src/io/odbc/mod.rs +++ b/src/io/odbc/mod.rs @@ -2,4 +2,4 @@ pub use odbc_api as api; pub mod read; -//pub mod write; +pub mod write; diff --git a/src/io/odbc/read/schema.rs b/src/io/odbc/read/schema.rs index c8fc82c02c7..dba4c233738 100644 --- a/src/io/odbc/read/schema.rs +++ b/src/io/odbc/read/schema.rs @@ -1,5 +1,5 @@ use crate::datatypes::{DataType, Field, TimeUnit}; -use crate::error::{ArrowError, Result}; +use crate::error::Result; use super::super::api; use super::super::api::ResultSetMetadata; diff --git a/src/io/odbc/write/mod.rs b/src/io/odbc/write/mod.rs new file mode 100644 index 00000000000..7f2e5e4a5c3 --- /dev/null +++ b/src/io/odbc/write/mod.rs @@ -0,0 +1,24 @@ +//! APIs to write to ODBC +mod schema; +mod serialize; + +use super::api; +pub use schema::infer_descriptions; +pub use serialize::serialize; + +/// Creates a [`api::buffers::ColumnarBuffer`] from [`api::ColumnDescription`]s. +pub fn buffer_from_description( + descriptions: Vec, + max_batch_size: usize, +) -> api::buffers::ColumnarBuffer { + let descs = descriptions + .into_iter() + .map(|description| api::buffers::BufferDescription { + nullable: description.could_be_nullable(), + kind: api::buffers::BufferKind::from_data_type(description.data_type).unwrap(), + }); + + let mut buffer = api::buffers::buffer_from_description(max_batch_size, descs); + buffer.set_num_rows(max_batch_size); + buffer +} diff --git a/src/io/odbc/write/schema.rs b/src/io/odbc/write/schema.rs new file mode 100644 index 00000000000..779f9fa1129 --- /dev/null +++ b/src/io/odbc/write/schema.rs @@ -0,0 +1,36 @@ +use super::super::api; + +use crate::datatypes::{DataType, Field}; +use crate::error::{ArrowError, Result}; + +/// Infers the [`ColumnDescription`] from the fields +pub fn infer_descriptions(fields: &[Field]) -> Result> { + fields + .iter() + .map(|field| { + let nullability = if field.is_nullable { + api::Nullability::Nullable + } else { + api::Nullability::NoNulls + }; + let data_type = data_type_to(field.data_type())?; + Ok(api::ColumnDescription { + name: api::U16String::from_str(&field.name).into_vec(), + nullability, + data_type, + }) + }) + .collect() +} + +fn data_type_to(data_type: &DataType) -> Result { + Ok(match data_type { + DataType::Boolean => api::DataType::Bit, + DataType::Int16 => api::DataType::SmallInt, + DataType::Int32 => api::DataType::Integer, + DataType::Float32 => api::DataType::Float { precision: 24 }, + DataType::Float64 => api::DataType::Float { precision: 53 }, + DataType::FixedSizeBinary(length) => api::DataType::Varbinary { length: *length }, + other => return Err(ArrowError::nyi(format!("{other:?} to ODBC"))), + }) +} diff --git a/src/io/odbc/write/serialize.rs b/src/io/odbc/write/serialize.rs new file mode 100644 index 00000000000..7655daf3f4a --- /dev/null +++ b/src/io/odbc/write/serialize.rs @@ -0,0 +1,126 @@ +use api::buffers::BinColumnWriter; + +use crate::array::{Array, BooleanArray, FixedSizeBinaryArray, PrimitiveArray}; +use crate::bitmap::Bitmap; +use crate::datatypes::DataType; +use crate::error::{ArrowError, Result}; +use crate::types::NativeType; + +use super::super::api; +use super::super::api::buffers::NullableSliceMut; + +/// Serializes an [`Array`] to [`api::buffers::AnyColumnViewMut`] +/// This operation is CPU-bounded +pub fn serialize(array: &dyn Array, column: &mut api::buffers::AnyColumnViewMut) -> Result<()> { + match array.data_type() { + DataType::Boolean => { + if let api::buffers::AnyColumnViewMut::Bit(values) = column { + Ok(bool(array.as_any().downcast_ref().unwrap(), values)) + } else if let api::buffers::AnyColumnViewMut::NullableBit(values) = column { + Ok(bool_optional( + array.as_any().downcast_ref().unwrap(), + values, + )) + } else { + Err(ArrowError::nyi("serialize bool to non-bool ODBC")) + } + } + DataType::Int16 => { + if let api::buffers::AnyColumnViewMut::I16(values) = column { + Ok(primitive(array.as_any().downcast_ref().unwrap(), values)) + } else if let api::buffers::AnyColumnViewMut::NullableI16(values) = column { + Ok(primitive_optional( + array.as_any().downcast_ref().unwrap(), + values, + )) + } else { + Err(ArrowError::nyi("serialize i16 to non-i16 ODBC")) + } + } + DataType::Int32 => { + if let api::buffers::AnyColumnViewMut::I32(values) = column { + Ok(primitive(array.as_any().downcast_ref().unwrap(), values)) + } else if let api::buffers::AnyColumnViewMut::NullableI32(values) = column { + Ok(primitive_optional( + array.as_any().downcast_ref().unwrap(), + values, + )) + } else { + Err(ArrowError::nyi("serialize i32 to non-i32 ODBC")) + } + } + DataType::Float32 => { + if let api::buffers::AnyColumnViewMut::F32(values) = column { + Ok(primitive(array.as_any().downcast_ref().unwrap(), values)) + } else if let api::buffers::AnyColumnViewMut::NullableF32(values) = column { + Ok(primitive_optional( + array.as_any().downcast_ref().unwrap(), + values, + )) + } else { + Err(ArrowError::nyi("serialize f32 to non-f32 ODBC")) + } + } + DataType::Float64 => { + if let api::buffers::AnyColumnViewMut::F64(values) = column { + Ok(primitive(array.as_any().downcast_ref().unwrap(), values)) + } else if let api::buffers::AnyColumnViewMut::NullableF64(values) = column { + Ok(primitive_optional( + array.as_any().downcast_ref().unwrap(), + values, + )) + } else { + Err(ArrowError::nyi("serialize f64 to non-f64 ODBC")) + } + } + DataType::FixedSizeBinary(_) => { + if let api::buffers::AnyColumnViewMut::Binary(values) = column { + Ok(binary(array.as_any().downcast_ref().unwrap(), values)) + } else { + Err(ArrowError::nyi("serialize f64 to non-f64 ODBC")) + } + } + other => Err(ArrowError::nyi(format!("{other:?} to ODBC"))), + } +} + +fn bool(array: &BooleanArray, values: &mut [api::Bit]) { + array + .values() + .iter() + .zip(values.iter_mut()) + .for_each(|(from, to)| *to = api::Bit(from as u8)); +} + +fn bool_optional(array: &BooleanArray, values: &mut NullableSliceMut) { + array + .values() + .iter() + .zip(values.values().iter_mut()) + .for_each(|(from, to)| *to = api::Bit(from as u8)); + write_validity(array.validity(), values.indicators()); +} + +fn primitive(array: &PrimitiveArray, values: &mut [T]) { + values.copy_from_slice(array.values()) +} + +fn write_validity(validity: Option<&Bitmap>, indicators: &mut [isize]) { + if let Some(validity) = validity { + indicators + .iter_mut() + .zip(validity.iter()) + .for_each(|(indicator, is_valid)| *indicator = if is_valid { 0 } else { -1 }) + } else { + indicators.iter_mut().for_each(|x| *x = 0) + } +} + +fn primitive_optional(array: &PrimitiveArray, values: &mut NullableSliceMut) { + values.values().copy_from_slice(array.values()); + write_validity(array.validity(), values.indicators()); +} + +fn binary(array: &FixedSizeBinaryArray, writer: &mut BinColumnWriter) { + writer.write(array.iter()) +} From 98a532e5211aabbfd560611f7e7462e84cd760dc Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Fri, 18 Feb 2022 15:38:21 +0000 Subject: [PATCH 11/15] Updated README --- README.md | 5 +++- src/io/odbc/write/serialize.rs | 48 ++++++++++++++++------------------ 2 files changed, 26 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index e54883f6eed..f7457762491 100644 --- a/README.md +++ b/README.md @@ -31,8 +31,9 @@ documentation of each of its APIs. * Apache Arrow IPC (all types) * Apache Arrow Flight (all types) * Apache Parquet (except deep nested types) - * Apache Avro (not all types yet) + * Apache Avro (all types) * NJSON + * ODBC (some types) * Extensive suite of compute operations * aggregations * arithmetics @@ -58,8 +59,10 @@ documentation of each of its APIs. This crate uses `unsafe` when strickly necessary: * when the compiler can't prove certain invariants and * FFI + We have extensive tests over these, all of which run and pass under MIRI. Most uses of `unsafe` fall into 3 categories: + * The Arrow format has invariants over utf8 that can't be written in safe Rust * `TrustedLen` and trait specialization are still nightly features * FFI diff --git a/src/io/odbc/write/serialize.rs b/src/io/odbc/write/serialize.rs index 7655daf3f4a..d2e5816a1d0 100644 --- a/src/io/odbc/write/serialize.rs +++ b/src/io/odbc/write/serialize.rs @@ -15,67 +15,63 @@ pub fn serialize(array: &dyn Array, column: &mut api::buffers::AnyColumnViewMut) match array.data_type() { DataType::Boolean => { if let api::buffers::AnyColumnViewMut::Bit(values) = column { - Ok(bool(array.as_any().downcast_ref().unwrap(), values)) + bool(array.as_any().downcast_ref().unwrap(), values); + Ok(()) } else if let api::buffers::AnyColumnViewMut::NullableBit(values) = column { - Ok(bool_optional( - array.as_any().downcast_ref().unwrap(), - values, - )) + bool_optional(array.as_any().downcast_ref().unwrap(), values); + Ok(()) } else { Err(ArrowError::nyi("serialize bool to non-bool ODBC")) } } DataType::Int16 => { if let api::buffers::AnyColumnViewMut::I16(values) = column { - Ok(primitive(array.as_any().downcast_ref().unwrap(), values)) + primitive(array.as_any().downcast_ref().unwrap(), values); + Ok(()) } else if let api::buffers::AnyColumnViewMut::NullableI16(values) = column { - Ok(primitive_optional( - array.as_any().downcast_ref().unwrap(), - values, - )) + primitive_optional(array.as_any().downcast_ref().unwrap(), values); + Ok(()) } else { Err(ArrowError::nyi("serialize i16 to non-i16 ODBC")) } } DataType::Int32 => { if let api::buffers::AnyColumnViewMut::I32(values) = column { - Ok(primitive(array.as_any().downcast_ref().unwrap(), values)) + primitive(array.as_any().downcast_ref().unwrap(), values); + Ok(()) } else if let api::buffers::AnyColumnViewMut::NullableI32(values) = column { - Ok(primitive_optional( - array.as_any().downcast_ref().unwrap(), - values, - )) + primitive_optional(array.as_any().downcast_ref().unwrap(), values); + Ok(()) } else { Err(ArrowError::nyi("serialize i32 to non-i32 ODBC")) } } DataType::Float32 => { if let api::buffers::AnyColumnViewMut::F32(values) = column { - Ok(primitive(array.as_any().downcast_ref().unwrap(), values)) + primitive(array.as_any().downcast_ref().unwrap(), values); + Ok(()) } else if let api::buffers::AnyColumnViewMut::NullableF32(values) = column { - Ok(primitive_optional( - array.as_any().downcast_ref().unwrap(), - values, - )) + primitive_optional(array.as_any().downcast_ref().unwrap(), values); + Ok(()) } else { Err(ArrowError::nyi("serialize f32 to non-f32 ODBC")) } } DataType::Float64 => { if let api::buffers::AnyColumnViewMut::F64(values) = column { - Ok(primitive(array.as_any().downcast_ref().unwrap(), values)) + primitive(array.as_any().downcast_ref().unwrap(), values); + Ok(()) } else if let api::buffers::AnyColumnViewMut::NullableF64(values) = column { - Ok(primitive_optional( - array.as_any().downcast_ref().unwrap(), - values, - )) + primitive_optional(array.as_any().downcast_ref().unwrap(), values); + Ok(()) } else { Err(ArrowError::nyi("serialize f64 to non-f64 ODBC")) } } DataType::FixedSizeBinary(_) => { if let api::buffers::AnyColumnViewMut::Binary(values) = column { - Ok(binary(array.as_any().downcast_ref().unwrap(), values)) + binary(array.as_any().downcast_ref().unwrap(), values); + Ok(()) } else { Err(ArrowError::nyi("serialize f64 to non-f64 ODBC")) } From 8fad2141049d1f49fb80ba740431672e490b2b0c Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 19 Feb 2022 07:22:40 +0000 Subject: [PATCH 12/15] Added Utf8 and Binary --- arrow-odbc-integration-testing/src/write.rs | 36 +++++++++++- src/io/odbc/write/schema.rs | 4 +- src/io/odbc/write/serialize.rs | 65 +++++++++++++++++++-- 3 files changed, 98 insertions(+), 7 deletions(-) diff --git a/arrow-odbc-integration-testing/src/write.rs b/arrow-odbc-integration-testing/src/write.rs index ecc91582815..bcf12761abd 100644 --- a/arrow-odbc-integration-testing/src/write.rs +++ b/arrow-odbc-integration-testing/src/write.rs @@ -4,7 +4,6 @@ use arrow2::array::{Array, BinaryArray, BooleanArray, Int32Array, Utf8Array}; use arrow2::chunk::Chunk; use arrow2::datatypes::{DataType, Field}; use arrow2::error::Result; -use arrow2::io::odbc::api::{Connection, Cursor}; use arrow2::io::odbc::write::{buffer_from_description, infer_descriptions, serialize}; use super::read::read; @@ -97,3 +96,38 @@ fn bool_nullable() -> Result<()> { &table_name, ) } + +#[test] +fn utf8() -> Result<()> { + let table_name = function_name!().rsplit_once(':').unwrap().1; + let table_name = format!("write_{}", table_name); + let expected = + Chunk::new(vec![ + Box::new(Utf8Array::::from([Some("aa"), None, Some("aaaa")])) as _, + ]); + + test( + expected, + vec![Field::new("a", DataType::Utf8, true)], + "VARCHAR(4)", + &table_name, + ) +} + +#[test] +fn binary() -> Result<()> { + let table_name = function_name!().rsplit_once(':').unwrap().1; + let table_name = format!("write_{}", table_name); + let expected = Chunk::new(vec![Box::new(BinaryArray::::from([ + Some(&b"aa"[..]), + None, + Some(&b"aaaa"[..]), + ])) as _]); + + test( + expected, + vec![Field::new("a", DataType::Binary, true)], + "VARBINARY(4)", + &table_name, + ) +} diff --git a/src/io/odbc/write/schema.rs b/src/io/odbc/write/schema.rs index 779f9fa1129..46bf1d0e900 100644 --- a/src/io/odbc/write/schema.rs +++ b/src/io/odbc/write/schema.rs @@ -30,7 +30,9 @@ fn data_type_to(data_type: &DataType) -> Result { DataType::Int32 => api::DataType::Integer, DataType::Float32 => api::DataType::Float { precision: 24 }, DataType::Float64 => api::DataType::Float { precision: 53 }, - DataType::FixedSizeBinary(length) => api::DataType::Varbinary { length: *length }, + DataType::FixedSizeBinary(length) => api::DataType::Binary { length: *length }, + DataType::Binary | DataType::LargeBinary => api::DataType::Varbinary { length: 0 }, + DataType::Utf8 | DataType::LargeUtf8 => api::DataType::LongVarchar { length: 0 }, other => return Err(ArrowError::nyi(format!("{other:?} to ODBC"))), }) } diff --git a/src/io/odbc/write/serialize.rs b/src/io/odbc/write/serialize.rs index d2e5816a1d0..f3a68fc1de2 100644 --- a/src/io/odbc/write/serialize.rs +++ b/src/io/odbc/write/serialize.rs @@ -1,6 +1,6 @@ -use api::buffers::BinColumnWriter; +use api::buffers::{BinColumnWriter, TextColumnWriter}; -use crate::array::{Array, BooleanArray, FixedSizeBinaryArray, PrimitiveArray}; +use crate::array::*; use crate::bitmap::Bitmap; use crate::datatypes::DataType; use crate::error::{ArrowError, Result}; @@ -68,12 +68,44 @@ pub fn serialize(array: &dyn Array, column: &mut api::buffers::AnyColumnViewMut) Err(ArrowError::nyi("serialize f64 to non-f64 ODBC")) } } + DataType::Utf8 => { + if let api::buffers::AnyColumnViewMut::Text(values) = column { + utf8::(array.as_any().downcast_ref().unwrap(), values); + Ok(()) + } else { + Err(ArrowError::nyi("serialize utf8 to non-text ODBC")) + } + } + DataType::LargeUtf8 => { + if let api::buffers::AnyColumnViewMut::Text(values) = column { + utf8::(array.as_any().downcast_ref().unwrap(), values); + Ok(()) + } else { + Err(ArrowError::nyi("serialize utf8 to non-text ODBC")) + } + } + DataType::Binary => { + if let api::buffers::AnyColumnViewMut::Binary(values) = column { + binary::(array.as_any().downcast_ref().unwrap(), values); + Ok(()) + } else { + Err(ArrowError::nyi("serialize utf8 to non-binary ODBC")) + } + } + DataType::LargeBinary => { + if let api::buffers::AnyColumnViewMut::Binary(values) = column { + binary::(array.as_any().downcast_ref().unwrap(), values); + Ok(()) + } else { + Err(ArrowError::nyi("serialize utf8 to non-text ODBC")) + } + } DataType::FixedSizeBinary(_) => { if let api::buffers::AnyColumnViewMut::Binary(values) = column { - binary(array.as_any().downcast_ref().unwrap(), values); + fixed_binary(array.as_any().downcast_ref().unwrap(), values); Ok(()) } else { - Err(ArrowError::nyi("serialize f64 to non-f64 ODBC")) + Err(ArrowError::nyi("serialize fixed to non-binary ODBC")) } } other => Err(ArrowError::nyi(format!("{other:?} to ODBC"))), @@ -117,6 +149,29 @@ fn primitive_optional(array: &PrimitiveArray, values: &mut Nul write_validity(array.validity(), values.indicators()); } -fn binary(array: &FixedSizeBinaryArray, writer: &mut BinColumnWriter) { +fn fixed_binary(array: &FixedSizeBinaryArray, writer: &mut BinColumnWriter) { + writer.set_max_len(array.size()); + writer.write(array.iter()) +} + +fn binary(array: &BinaryArray, writer: &mut BinColumnWriter) { + let max_len = array + .offsets() + .windows(2) + .map(|x| (x[1] - x[0]).to_usize()) + .max() + .unwrap_or(0); + writer.set_max_len(max_len); writer.write(array.iter()) } + +fn utf8(array: &Utf8Array, writer: &mut TextColumnWriter) { + let max_len = array + .offsets() + .windows(2) + .map(|x| (x[1] - x[0]).to_usize()) + .max() + .unwrap_or(0); + writer.set_max_len(max_len); + writer.write(array.iter().map(|x| x.map(|x| x.as_bytes()))) +} From d022a493e6601dab49fe0f4a8ecf14cfc5918cf4 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 19 Feb 2022 23:30:58 +0000 Subject: [PATCH 13/15] Added example --- examples/io_odbc.rs | 82 +++++++++++++++++++++++++++++++++++++ src/io/odbc/mod.rs | 6 +++ src/io/odbc/write/mod.rs | 46 +++++++++++++++++++-- src/io/odbc/write/schema.rs | 2 +- 4 files changed, 131 insertions(+), 5 deletions(-) create mode 100644 examples/io_odbc.rs diff --git a/examples/io_odbc.rs b/examples/io_odbc.rs new file mode 100644 index 00000000000..996fa7130c6 --- /dev/null +++ b/examples/io_odbc.rs @@ -0,0 +1,82 @@ +//! Example showing how to write to, and read from, an ODBC connector +//! +//! On an Ubuntu, you need to run the following (to install the driver): +//! ```bash +//! sudo apt install libsqliteodbc sqlite3 unixodbc-dev +//! sudo sed --in-place 's/libsqlite3odbc.so/\/usr\/lib\/x86_64-linux-gnu\/odbc\/libsqlite3odbc.so/' /etc/odbcinst.ini +//! ``` +use arrow2::array::{Array, Int32Array, Utf8Array}; +use arrow2::chunk::Chunk; +use arrow2::datatypes::{DataType, Field}; +use arrow2::error::Result; +use arrow2::io::odbc::api; +use arrow2::io::odbc::api::Cursor; +use arrow2::io::odbc::read; +use arrow2::io::odbc::write; + +fn main() -> Result<()> { + let connector = "Driver={SQLite3};Database=sqlite-test.db"; + let env = api::Environment::new()?; + let connection = env.connect_with_connection_string(connector)?; + + // let's create an empty table with a schema + connection.execute("DROP TABLE IF EXISTS example;", ())?; + connection.execute("CREATE TABLE example (c1 INT, c2 TEXT);", ())?; + + // and now let's write some data into it (from arrow arrays!) + // first, we prepare the statement + let query = "INSERT INTO example (c1, c2) VALUES (?, ?)"; + let prepared = connection.prepare(query).unwrap(); + + // first, initialize buffers from odbc-api + let fields = vec![ + // (for now) the types here must match the tables' schema + Field::new("unused", DataType::Int32, true), + Field::new("unused", DataType::LargeUtf8, true), + ]; + + let mut writer = write::Writer::try_new(prepared, fields)?; + + // say we have (or receive from a channel) a chunk: + let chunk = Chunk::new(vec![ + Box::new(Int32Array::from_slice([1, 2, 3])) as Box, + Box::new(Utf8Array::::from([Some("Hello"), None, Some("World")])), + ]); + + // we write it like this + writer.write(&chunk)?; + + // and we can later read from it + let chunks = read(&connection, "SELECT c1 FROM example")?; + + // and the result should be the same + assert_eq!(chunks[0].columns()[0], chunk.columns()[0]); + + Ok(()) +} + +/// Reads chunks from a query done against an ODBC connection +pub fn read(connection: &api::Connection<'_>, query: &str) -> Result>>> { + let mut a = connection.prepare(query)?; + let fields = read::infer_schema(&a)?; + + let max_batch_size = 100; + let buffer = read::buffer_from_metadata(&a, max_batch_size)?; + + let cursor = a.execute(())?.unwrap(); + let mut cursor = cursor.bind_buffer(buffer)?; + + let mut chunks = vec![]; + while let Some(batch) = cursor.fetch()? { + let arrays = (0..batch.num_cols()) + .zip(fields.iter()) + .map(|(index, field)| { + let column_view = batch.column(index); + read::deserialize(column_view, field.data_type.clone()) + }) + .collect::>(); + chunks.push(Chunk::new(arrays)); + } + + Ok(chunks) +} diff --git a/src/io/odbc/mod.rs b/src/io/odbc/mod.rs index 1407cf1e3d9..d681ed3c2fd 100644 --- a/src/io/odbc/mod.rs +++ b/src/io/odbc/mod.rs @@ -3,3 +3,9 @@ pub use odbc_api as api; pub mod read; pub mod write; + +impl From for crate::error::ArrowError { + fn from(error: api::Error) -> Self { + crate::error::ArrowError::External("".to_string(), Box::new(error)) + } +} diff --git a/src/io/odbc/write/mod.rs b/src/io/odbc/write/mod.rs index 7f2e5e4a5c3..18d2ebdbdaf 100644 --- a/src/io/odbc/write/mod.rs +++ b/src/io/odbc/write/mod.rs @@ -2,6 +2,8 @@ mod schema; mod serialize; +use crate::{array::Array, chunk::Chunk, datatypes::Field, error::Result}; + use super::api; pub use schema::infer_descriptions; pub use serialize::serialize; @@ -9,7 +11,7 @@ pub use serialize::serialize; /// Creates a [`api::buffers::ColumnarBuffer`] from [`api::ColumnDescription`]s. pub fn buffer_from_description( descriptions: Vec, - max_batch_size: usize, + capacity: usize, ) -> api::buffers::ColumnarBuffer { let descs = descriptions .into_iter() @@ -18,7 +20,43 @@ pub fn buffer_from_description( kind: api::buffers::BufferKind::from_data_type(description.data_type).unwrap(), }); - let mut buffer = api::buffers::buffer_from_description(max_batch_size, descs); - buffer.set_num_rows(max_batch_size); - buffer + api::buffers::buffer_from_description(capacity, descs) +} + +/// A writer of [`Chunk`] to an ODBC prepared statement. +pub struct Writer<'a> { + fields: Vec, + buffer: api::buffers::ColumnarBuffer, + prepared: api::Prepared<'a>, +} + +impl<'a> Writer<'a> { + /// Creates a new [`Writer`] + pub fn try_new(prepared: api::Prepared<'a>, fields: Vec) -> Result { + let buffer = buffer_from_description(infer_descriptions(&fields)?, 0); + Ok(Self { + fields, + buffer, + prepared, + }) + } + + /// Writes a chunk to the writter. + pub fn write>(&mut self, chunk: &Chunk) -> Result<()> { + if chunk.len() > self.buffer.num_rows() { + // if the chunk is larger, we re-allocate new buffers to hold it + self.buffer = buffer_from_description(infer_descriptions(&self.fields)?, chunk.len()); + } + + self.buffer.set_num_rows(chunk.len()); + + // serialize (CPU-bounded) + for (i, column) in chunk.arrays().iter().enumerate() { + serialize(column.as_ref(), &mut self.buffer.column_mut(i))?; + } + + // write (IO-bounded) + self.prepared.execute(&self.buffer)?; + Ok(()) + } } diff --git a/src/io/odbc/write/schema.rs b/src/io/odbc/write/schema.rs index 46bf1d0e900..546c6d2b427 100644 --- a/src/io/odbc/write/schema.rs +++ b/src/io/odbc/write/schema.rs @@ -32,7 +32,7 @@ fn data_type_to(data_type: &DataType) -> Result { DataType::Float64 => api::DataType::Float { precision: 53 }, DataType::FixedSizeBinary(length) => api::DataType::Binary { length: *length }, DataType::Binary | DataType::LargeBinary => api::DataType::Varbinary { length: 0 }, - DataType::Utf8 | DataType::LargeUtf8 => api::DataType::LongVarchar { length: 0 }, + DataType::Utf8 | DataType::LargeUtf8 => api::DataType::Varchar { length: 0 }, other => return Err(ArrowError::nyi(format!("{other:?} to ODBC"))), }) } From 76eb0f89979285eddc7592a3ac74ca97321e8348 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Thu, 3 Mar 2022 05:25:37 +0000 Subject: [PATCH 14/15] WIP --- Cargo.toml | 3 +- src/io/odbc/read/deserialize.rs | 106 +++++++++++++------------------- 2 files changed, 43 insertions(+), 66 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a8afde46f51..c6a9e189f35 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,8 +88,7 @@ strength_reduce = { version = "0.2", optional = true } multiversion = { version = "0.6.1", optional = true } # For support for odbc -#odbc-api = { version = "0.34", optional = true } -odbc-api = { git = "https://github.com/jorgecarleitao/odbc-api", branch= "expose_indicators", optional = true } +odbc-api = { version = "0.35", optional = true } [dev-dependencies] criterion = "0.3" diff --git a/src/io/odbc/read/deserialize.rs b/src/io/odbc/read/deserialize.rs index 74db813d2c5..e55fc1a2598 100644 --- a/src/io/odbc/read/deserialize.rs +++ b/src/io/odbc/read/deserialize.rs @@ -1,3 +1,4 @@ +use odbc_api::buffers::{BinColumnIt, TextColumnIt}; use odbc_api::Bit; use crate::array::{Array, BinaryArray, BooleanArray, PrimitiveArray, Utf8Array}; @@ -12,19 +13,9 @@ use super::super::api::buffers::AnyColumnView; /// This is CPU-bounded pub fn deserialize(column: AnyColumnView, data_type: DataType) -> Box { match column { - AnyColumnView::Text(slice) => Box::new(utf8( - data_type, - slice.values(), - slice.lengths(), - slice.max_len(), - )) as _, + AnyColumnView::Text(iter) => Box::new(utf8(data_type, iter)) as _, AnyColumnView::WText(_) => todo!(), - AnyColumnView::Binary(slice) => Box::new(binary( - data_type, - slice.values(), - slice.lengths(), - slice.max_len(), - )) as _, + AnyColumnView::Binary(iter) => Box::new(binary(data_type, iter)) as _, AnyColumnView::Date(_) => todo!(), AnyColumnView::Time(_) => todo!(), AnyColumnView::Timestamp(_) => todo!(), @@ -41,42 +32,44 @@ pub fn deserialize(column: AnyColumnView, data_type: DataType) -> Box AnyColumnView::NullableTimestamp(_) => todo!(), AnyColumnView::NullableF64(slice) => Box::new(primitive_optional( data_type, - slice.values(), - slice.indicators(), + slice.raw_values().0, + slice.raw_values().1, )) as _, AnyColumnView::NullableF32(slice) => Box::new(primitive_optional( data_type, - slice.values(), - slice.indicators(), + slice.raw_values().0, + slice.raw_values().1, )) as _, AnyColumnView::NullableI8(slice) => Box::new(primitive_optional( data_type, - slice.values(), - slice.indicators(), + slice.raw_values().0, + slice.raw_values().1, )) as _, AnyColumnView::NullableI16(slice) => Box::new(primitive_optional( data_type, - slice.values(), - slice.indicators(), + slice.raw_values().0, + slice.raw_values().1, )) as _, AnyColumnView::NullableI32(slice) => Box::new(primitive_optional( data_type, - slice.values(), - slice.indicators(), + slice.raw_values().0, + slice.raw_values().1, )) as _, AnyColumnView::NullableI64(slice) => Box::new(primitive_optional( data_type, - slice.values(), - slice.indicators(), + slice.raw_values().0, + slice.raw_values().1, )) as _, AnyColumnView::NullableU8(slice) => Box::new(primitive_optional( data_type, - slice.values(), - slice.indicators(), + slice.raw_values().0, + slice.raw_values().1, + )) as _, + AnyColumnView::NullableBit(slice) => Box::new(bool_optional( + data_type, + slice.raw_values().0, + slice.raw_values().1, )) as _, - AnyColumnView::NullableBit(slice) => { - Box::new(bool_optional(data_type, slice.values(), slice.indicators())) as _ - } } } @@ -110,53 +103,38 @@ fn bool_optional(data_type: DataType, values: &[Bit], indicators: &[isize]) -> B BooleanArray::from_data(data_type, values, validity) } -fn binary_generic( - slice: &[u8], - lengths: &[isize], - max_length: usize, - null_terminator: usize, +fn binary_generic<'a>( + iter: impl Iterator>, ) -> (Buffer, Buffer, Option) { - let mut validity = MutableBitmap::with_capacity(lengths.len()); + let length = iter.size_hint().0; + let mut validity = MutableBitmap::with_capacity(length); + let mut values = Vec::::with_capacity(0); - println!("{:?}", lengths); - println!("{:?}", slice); - let mut offsets = Vec::with_capacity(lengths.len() + 1); + let mut offsets = Vec::with_capacity(length + 1); offsets.push(0i32); - let mut length = 0; - offsets.extend(lengths.iter().map(|&indicator| { - validity.push(indicator != -1); - length += if indicator > 0 { indicator as i32 } else { 0 }; - length - })); - // the loop above ensures monotonicity - // this proves boundness - assert!((length as usize) < slice.len()); - - let mut values = Vec::::with_capacity(length as usize); - offsets.windows(2).enumerate().for_each(|(index, x)| { - let len = (x[1] - x[0]) as usize; - let offset = index * (max_length + null_terminator); - // this bound check is not necessary - values.extend_from_slice(&slice[offset..offset + len]) - }); + + for item in iter { + if let Some(item) = item { + values.extend_from_slice(item); + validity.push(true); + } else { + validity.push(false); + } + offsets.push(values.len() as i32) + } (offsets.into(), values.into(), validity.into()) } -fn binary( - data_type: DataType, - slice: &[u8], - lengths: &[isize], - max_length: usize, -) -> BinaryArray { - let (offsets, values, validity) = binary_generic(slice, lengths, max_length, 0); +fn binary(data_type: DataType, iter: BinColumnIt) -> BinaryArray { + let (offsets, values, validity) = binary_generic(iter); // this O(N) check is not necessary BinaryArray::from_data(data_type, offsets, values, validity) } -fn utf8(data_type: DataType, slice: &[u8], lengths: &[isize], max_length: usize) -> Utf8Array { - let (offsets, values, validity) = binary_generic(slice, lengths, max_length, 1); +fn utf8(data_type: DataType, iter: TextColumnIt) -> Utf8Array { + let (offsets, values, validity) = binary_generic(iter); // this O(N) check is necessary for the utf8 validity Utf8Array::from_data(data_type, offsets, values, validity) From d971e4090b25007b5eded16e86dd3788e3462bc6 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 5 Mar 2022 18:32:15 +0000 Subject: [PATCH 15/15] Improved ODBC docs --- examples/io_odbc.rs | 5 +++-- guide/src/io/README.md | 1 + guide/src/io/odbc.md | 8 ++++++++ src/io/odbc/write/mod.rs | 15 ++++++++++++--- src/io/odbc/write/schema.rs | 2 +- src/io/odbc/write/serialize.rs | 10 ++++++---- 6 files changed, 31 insertions(+), 10 deletions(-) create mode 100644 guide/src/io/odbc.md diff --git a/examples/io_odbc.rs b/examples/io_odbc.rs index 996fa7130c6..9305fab6e24 100644 --- a/examples/io_odbc.rs +++ b/examples/io_odbc.rs @@ -1,4 +1,4 @@ -//! Example showing how to write to, and read from, an ODBC connector +//! Demo of how to write to, and read from, an ODBC connector //! //! On an Ubuntu, you need to run the following (to install the driver): //! ```bash @@ -28,13 +28,14 @@ fn main() -> Result<()> { let query = "INSERT INTO example (c1, c2) VALUES (?, ?)"; let prepared = connection.prepare(query).unwrap(); - // first, initialize buffers from odbc-api + // secondly, we initialize buffers from odbc-api let fields = vec![ // (for now) the types here must match the tables' schema Field::new("unused", DataType::Int32, true), Field::new("unused", DataType::LargeUtf8, true), ]; + // third, we initialize the writer let mut writer = write::Writer::try_new(prepared, fields)?; // say we have (or receive from a channel) a chunk: diff --git a/guide/src/io/README.md b/guide/src/io/README.md index f8e8bb8ca64..5a8cd477949 100644 --- a/guide/src/io/README.md +++ b/guide/src/io/README.md @@ -7,5 +7,6 @@ This crate offers optional features that enable interoperability with different * Parquet (`io_parquet`) * JSON and NDJSON (`io_json`) * Avro (`io_avro` and `io_avro_async`) +* ODBC-compliant databases (`io_odbc`) In this section you can find a guide and examples for each one of them. diff --git a/guide/src/io/odbc.md b/guide/src/io/odbc.md new file mode 100644 index 00000000000..7e362daf7c6 --- /dev/null +++ b/guide/src/io/odbc.md @@ -0,0 +1,8 @@ +# ODBC + +When compiled with feature `io_odbc`, this crate can be used to read from, and write to +any [ODBC](https://en.wikipedia.org/wiki/Open_Database_Connectivity) interface: + +```rust +{{#include ../../../examples/odbc.rs}} +``` diff --git a/src/io/odbc/write/mod.rs b/src/io/odbc/write/mod.rs index 18d2ebdbdaf..245f2455bb8 100644 --- a/src/io/odbc/write/mod.rs +++ b/src/io/odbc/write/mod.rs @@ -9,6 +9,8 @@ pub use schema::infer_descriptions; pub use serialize::serialize; /// Creates a [`api::buffers::ColumnarBuffer`] from [`api::ColumnDescription`]s. +/// +/// This is useful when separating the serialization (CPU-bounded) to writing to the DB (IO-bounded). pub fn buffer_from_description( descriptions: Vec, capacity: usize, @@ -23,7 +25,10 @@ pub fn buffer_from_description( api::buffers::buffer_from_description(capacity, descs) } -/// A writer of [`Chunk`] to an ODBC prepared statement. +/// A writer of [`Chunk`]s to an ODBC [`api::Prepared`] statement. +/// # Implementation +/// This struct mixes CPU-bounded and IO-bounded tasks and is not ideal +/// for an `async` context. pub struct Writer<'a> { fields: Vec, buffer: api::buffers::ColumnarBuffer, @@ -31,7 +36,9 @@ pub struct Writer<'a> { } impl<'a> Writer<'a> { - /// Creates a new [`Writer`] + /// Creates a new [`Writer`]. + /// # Errors + /// Errors iff any of the types from [`Field`] is not supported. pub fn try_new(prepared: api::Prepared<'a>, fields: Vec) -> Result { let buffer = buffer_from_description(infer_descriptions(&fields)?, 0); Ok(Self { @@ -41,7 +48,9 @@ impl<'a> Writer<'a> { }) } - /// Writes a chunk to the writter. + /// Writes a chunk to the writer. + /// # Errors + /// Errors iff the execution of the statement fails. pub fn write>(&mut self, chunk: &Chunk) -> Result<()> { if chunk.len() > self.buffer.num_rows() { // if the chunk is larger, we re-allocate new buffers to hold it diff --git a/src/io/odbc/write/schema.rs b/src/io/odbc/write/schema.rs index 546c6d2b427..9e4b61f704e 100644 --- a/src/io/odbc/write/schema.rs +++ b/src/io/odbc/write/schema.rs @@ -3,7 +3,7 @@ use super::super::api; use crate::datatypes::{DataType, Field}; use crate::error::{ArrowError, Result}; -/// Infers the [`ColumnDescription`] from the fields +/// Infers the [`api::ColumnDescription`] from the fields pub fn infer_descriptions(fields: &[Field]) -> Result> { fields .iter() diff --git a/src/io/odbc/write/serialize.rs b/src/io/odbc/write/serialize.rs index f3a68fc1de2..3128ceb964b 100644 --- a/src/io/odbc/write/serialize.rs +++ b/src/io/odbc/write/serialize.rs @@ -121,12 +121,13 @@ fn bool(array: &BooleanArray, values: &mut [api::Bit]) { } fn bool_optional(array: &BooleanArray, values: &mut NullableSliceMut) { + let (values, indicators) = values.raw_values(); array .values() .iter() - .zip(values.values().iter_mut()) + .zip(values.iter_mut()) .for_each(|(from, to)| *to = api::Bit(from as u8)); - write_validity(array.validity(), values.indicators()); + write_validity(array.validity(), indicators); } fn primitive(array: &PrimitiveArray, values: &mut [T]) { @@ -145,8 +146,9 @@ fn write_validity(validity: Option<&Bitmap>, indicators: &mut [isize]) { } fn primitive_optional(array: &PrimitiveArray, values: &mut NullableSliceMut) { - values.values().copy_from_slice(array.values()); - write_validity(array.validity(), values.indicators()); + let (values, indicators) = values.raw_values(); + values.copy_from_slice(array.values()); + write_validity(array.validity(), indicators); } fn fixed_binary(array: &FixedSizeBinaryArray, writer: &mut BinColumnWriter) {