diff --git a/integration-testing/src/bin/arrow-json-integration-test.rs b/integration-testing/src/bin/arrow-json-integration-test.rs index 180896a9418..beb0cb34714 100644 --- a/integration-testing/src/bin/arrow-json-integration-test.rs +++ b/integration-testing/src/bin/arrow-json-integration-test.rs @@ -91,10 +91,17 @@ fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> let metadata = read::read_file_metadata(&mut arrow_file)?; let reader = read::FileReader::new(arrow_file, metadata.clone(), None); + let names = metadata + .schema + .fields + .iter() + .map(|f| f.name()) + .collect::>(); + let schema = json_write::serialize_schema(&metadata.schema, &metadata.ipc_schema.fields); let batches = reader - .map(|batch| Ok(json_write::from_record_batch(&batch?))) + .map(|batch| Ok(json_write::serialize_columns(&batch?, &names))) .collect::>>()?; let arrow_json = ArrowJson { @@ -121,10 +128,10 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> { let mut arrow_file = File::open(arrow_name)?; let metadata = read::read_file_metadata(&mut arrow_file)?; let reader = read::FileReader::new(arrow_file, metadata, None); - let arrow_schema = reader.schema().as_ref().to_owned(); + let arrow_schema = reader.schema(); // compare schemas - if json_file.schema != arrow_schema { + if &json_file.schema != arrow_schema { return Err(ArrowError::InvalidArgumentError(format!( "Schemas do not match. JSON: {:?}. Arrow: {:?}", json_file.schema, arrow_schema diff --git a/integration-testing/src/flight_client_scenarios/integration_test.rs b/integration-testing/src/flight_client_scenarios/integration_test.rs index 461659069dc..b402dc0798c 100644 --- a/integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/integration-testing/src/flight_client_scenarios/integration_test.rs @@ -18,6 +18,8 @@ use crate::{read_json_file, ArrowFile}; use arrow2::{ + array::Array, + columns::Columns, datatypes::*, io::ipc::{ read::{self, Dictionaries}, @@ -27,7 +29,6 @@ use arrow2::{ flight::{self, deserialize_batch, serialize_batch}, ipc::IpcField, }, - record_batch::RecordBatch, }; use arrow_format::flight::data::{ flight_descriptor::DescriptorType, FlightData, FlightDescriptor, Location, Ticket, @@ -45,6 +46,8 @@ type Result = std::result::Result; type Client = FlightServiceClient; +type Chunk = Columns>; + pub async fn run_scenario(host: &str, port: &str, path: &str) -> Result { let url = format!("http://{}:{}", host, port); @@ -75,7 +78,7 @@ pub async fn run_scenario(host: &str, port: &str, path: &str) -> Result { batches.clone(), ) .await?; - verify_data(client, descriptor, schema, &ipc_schema, &batches).await?; + verify_data(client, descriptor, &schema, &ipc_schema, &batches).await?; Ok(()) } @@ -85,7 +88,7 @@ async fn upload_data( schema: &Schema, fields: &[IpcField], descriptor: FlightDescriptor, - original_data: Vec, + original_data: Vec, ) -> Result { let (mut upload_tx, upload_rx) = mpsc::channel(10); @@ -140,7 +143,7 @@ async fn upload_data( async fn send_batch( upload_tx: &mut mpsc::Sender, metadata: &[u8], - batch: &RecordBatch, + batch: &Chunk, fields: &[IpcField], options: &write::WriteOptions, ) -> Result { @@ -159,9 +162,9 @@ async fn send_batch( async fn verify_data( mut client: Client, descriptor: FlightDescriptor, - expected_schema: SchemaRef, + expected_schema: &Schema, ipc_schema: &IpcSchema, - expected_data: &[RecordBatch], + expected_data: &[Chunk], ) -> Result { let resp = client.get_flight_info(Request::new(descriptor)).await?; let info = resp.into_inner(); @@ -184,7 +187,7 @@ async fn verify_data( location, ticket.clone(), expected_data, - expected_schema.clone(), + expected_schema, ipc_schema, ) .await?; @@ -197,8 +200,8 @@ async fn verify_data( async fn consume_flight_location( location: Location, ticket: Ticket, - expected_data: &[RecordBatch], - schema: SchemaRef, + expected_data: &[Chunk], + schema: &Schema, ipc_schema: &IpcSchema, ) -> Result { let mut location = location; @@ -231,21 +234,20 @@ async fn consume_flight_location( let metadata = counter.to_string().into_bytes(); assert_eq!(metadata, data.app_metadata); - let actual_batch = deserialize_batch(&data, schema.clone(), ipc_schema, &dictionaries) + let actual_batch = deserialize_batch(&data, schema.fields(), ipc_schema, &dictionaries) .expect("Unable to convert flight data to Arrow batch"); - assert_eq!(expected_batch.schema(), actual_batch.schema()); - assert_eq!(expected_batch.num_columns(), actual_batch.num_columns()); - assert_eq!(expected_batch.num_rows(), actual_batch.num_rows()); - let schema = expected_batch.schema(); - for i in 0..expected_batch.num_columns() { + assert_eq!(expected_batch.columns().len(), actual_batch.columns().len()); + assert_eq!(expected_batch.len(), actual_batch.len()); + for (i, (expected, actual)) in expected_batch + .columns() + .iter() + .zip(actual_batch.columns().iter()) + .enumerate() + { let field = schema.field(i); let field_name = field.name(); - - let expected_data = expected_batch.column(i); - let actual_data = actual_batch.column(i); - - assert_eq!(expected_data, actual_data, "Data for field {}", field_name); + assert_eq!(expected, actual, "Data for field {}", field_name); } } diff --git a/integration-testing/src/flight_server_scenarios/integration_test.rs b/integration-testing/src/flight_server_scenarios/integration_test.rs index 82e99f503d7..685e40ec53d 100644 --- a/integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/integration-testing/src/flight_server_scenarios/integration_test.rs @@ -19,6 +19,8 @@ use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; +use arrow2::array::Array; +use arrow2::columns::Columns; use arrow2::io::flight::{deserialize_schemas, serialize_batch, serialize_schema}; use arrow2::io::ipc::read::Dictionaries; use arrow2::io::ipc::IpcSchema; @@ -28,9 +30,7 @@ use arrow_format::flight::service::flight_service_server::*; use arrow_format::ipc::Message::{root_as_message, Message, MessageHeader}; use arrow_format::ipc::Schema as ArrowSchema; -use arrow2::{ - datatypes::*, io::flight::serialize_schema_to_info, io::ipc, record_batch::RecordBatch, -}; +use arrow2::{datatypes::*, io::flight::serialize_schema_to_info, io::ipc}; use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt}; use tokio::sync::Mutex; @@ -62,7 +62,7 @@ pub async fn scenario_setup(port: &str) -> Result { struct IntegrationDataset { schema: Schema, ipc_schema: IpcSchema, - chunks: Vec, + chunks: Vec>>, } #[derive(Clone, Default)] @@ -173,7 +173,7 @@ impl FlightService for FlightServiceImpl { let endpoint = self.endpoint_from_path(&path[0]); - let total_records: usize = flight.chunks.iter().map(|chunk| chunk.num_rows()).sum(); + let total_records: usize = flight.chunks.iter().map(|chunk| chunk.len()).sum(); let schema = serialize_schema_to_info(&flight.schema, &flight.ipc_schema.fields) .expect( @@ -218,7 +218,6 @@ impl FlightService for FlightServiceImpl { let (schema, ipc_schema) = deserialize_schemas(&flight_data.data_header) .map_err(|e| Status::invalid_argument(format!("Invalid schema: {:?}", e)))?; - let schema_ref = Arc::new(schema.clone()); let (response_tx, response_rx) = mpsc::channel(10); @@ -228,11 +227,10 @@ impl FlightService for FlightServiceImpl { let mut error_tx = response_tx.clone(); if let Err(e) = save_uploaded_chunks( uploaded_chunks, - schema_ref, + schema, ipc_schema, input_stream, response_tx, - schema, key, ) .await @@ -280,10 +278,10 @@ async fn send_app_metadata( async fn record_batch_from_message( message: Message<'_>, data_body: &[u8], - schema_ref: Arc, + fields: &[Field], ipc_schema: &IpcSchema, dictionaries: &mut Dictionaries, -) -> Result { +) -> Result>, Status> { let ipc_batch = message .header_as_record_batch() .ok_or_else(|| Status::internal("Could not parse message header as record batch"))?; @@ -292,7 +290,7 @@ async fn record_batch_from_message( let arrow_batch_result = ipc::read::read_record_batch( ipc_batch, - schema_ref, + fields, ipc_schema, None, dictionaries, @@ -326,11 +324,10 @@ async fn dictionary_from_message( async fn save_uploaded_chunks( uploaded_chunks: Arc>>, - schema_ref: Arc, + schema: Schema, ipc_schema: IpcSchema, mut input_stream: Streaming, mut response_tx: mpsc::Sender>, - schema: Schema, key: String, ) -> Result<(), Status> { let mut chunks = vec![]; @@ -354,7 +351,7 @@ async fn save_uploaded_chunks( let batch = record_batch_from_message( message, &data.data_body, - schema_ref.clone(), + schema.fields(), &ipc_schema, &mut dictionaries, ) @@ -366,7 +363,7 @@ async fn save_uploaded_chunks( dictionary_from_message( message, &data.data_body, - schema_ref.fields(), + schema.fields(), &ipc_schema, &mut dictionaries, ) diff --git a/integration-testing/src/lib.rs b/integration-testing/src/lib.rs index 6da3ff17081..89646e371ef 100644 --- a/integration-testing/src/lib.rs +++ b/integration-testing/src/lib.rs @@ -17,17 +17,19 @@ //! Common code used in the integration test binaries +use arrow2::array::Array; use arrow2::io::ipc::IpcField; use serde_json::Value; +use arrow2::columns::Columns; use arrow2::datatypes::*; use arrow2::error::Result; use arrow2::io::json_integration::{read, ArrowJsonBatch, ArrowJsonDictionaryBatch}; -use arrow2::record_batch::RecordBatch; use std::collections::HashMap; use std::fs::File; use std::io::BufReader; +use std::sync::Arc; /// The expected username for the basic auth integration test. pub const AUTH_USERNAME: &str = "arrow"; @@ -43,7 +45,7 @@ pub struct ArrowFile { // we can evolve this into a concrete Arrow type // this is temporarily not being read from pub _dictionaries: HashMap, - pub batches: Vec, + pub batches: Vec>>, } pub fn read_json_file(json_name: &str) -> Result { @@ -66,12 +68,15 @@ pub fn read_json_file(json_name: &str) -> Result { } } - let mut batches = vec![]; - for b in arrow_json["batches"].as_array().unwrap() { - let json_batch: ArrowJsonBatch = serde_json::from_value(b.clone()).unwrap(); - let batch = read::to_record_batch(&schema, &fields, &json_batch, &dictionaries)?; - batches.push(batch); - } + let batches = arrow_json["batches"] + .as_array() + .unwrap() + .iter() + .map(|b| { + let json_batch: ArrowJsonBatch = serde_json::from_value(b.clone()).unwrap(); + read::deserialize_columns(&schema, &fields, &json_batch, &dictionaries) + }) + .collect::>()?; Ok(ArrowFile { schema, fields, diff --git a/src/io/flight/mod.rs b/src/io/flight/mod.rs index be74b11c6e1..e509c773c60 100644 --- a/src/io/flight/mod.rs +++ b/src/io/flight/mod.rs @@ -15,7 +15,7 @@ use crate::{ use super::ipc::{IpcField, IpcSchema}; -/// Serializes a [`Columns`] to a vector of [`FlightData`] representing the serialized dictionaries +/// Serializes [`Columns`] to a vector of [`FlightData`] representing the serialized dictionaries /// and a [`FlightData`] representing the batch. pub fn serialize_batch( columns: &Columns>, @@ -102,7 +102,7 @@ pub fn deserialize_schemas(bytes: &[u8]) -> Result<(Schema, IpcSchema)> { /// Deserializes [`FlightData`] to [`Columns`]. pub fn deserialize_batch( data: &FlightData, - schema: Arc, + fields: &[Field], ipc_schema: &IpcSchema, dictionaries: &read::Dictionaries, ) -> Result>> { @@ -123,7 +123,7 @@ pub fn deserialize_batch( .map(|batch| { read::read_record_batch( batch, - schema.clone(), + fields, ipc_schema, None, dictionaries, diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index 251640f9df5..ffd71561b41 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, VecDeque}; +use std::collections::VecDeque; use std::io::{Read, Seek}; use std::sync::Arc; @@ -7,7 +7,7 @@ use arrow_format::ipc::Schema::MetadataVersion; use crate::array::*; use crate::columns::Columns; -use crate::datatypes::{DataType, Field, Schema}; +use crate::datatypes::{DataType, Field}; use crate::error::{ArrowError, Result}; use crate::io::ipc::{IpcField, IpcSchema}; @@ -80,7 +80,7 @@ impl<'a, A, I: Iterator> Iterator for ProjectionIter<'a, A, I> { #[allow(clippy::too_many_arguments)] pub fn read_record_batch( batch: ipc::Message::RecordBatch, - schema: Arc, + fields: &[Field], ipc_schema: &IpcSchema, projection: Option<&[usize]>, dictionaries: &Dictionaries, @@ -88,7 +88,7 @@ pub fn read_record_batch( reader: &mut R, block_offset: u64, ) -> Result>> { - assert_eq!(schema.fields().len(), ipc_schema.fields.len()); + assert_eq!(fields.len(), ipc_schema.fields.len()); let buffers = batch.buffers().ok_or_else(|| { ArrowError::OutOfSpec("Unable to get buffers from IPC RecordBatch".to_string()) })?; @@ -100,10 +100,8 @@ pub fn read_record_batch( let mut field_nodes = field_nodes.iter().collect::>(); let columns = if let Some(projection) = projection { - let projection = ProjectionIter::new( - projection, - schema.fields().iter().zip(ipc_schema.fields.iter()), - ); + let projection = + ProjectionIter::new(projection, fields.iter().zip(ipc_schema.fields.iter())); projection .map(|maybe_field| match maybe_field { @@ -127,8 +125,7 @@ pub fn read_record_batch( .flatten() .collect::>>()? } else { - schema - .fields() + fields .iter() .zip(ipc_schema.fields.iter()) .map(|(field, ipc_field)| { @@ -228,19 +225,15 @@ pub fn read_dictionary( let dictionary_values: ArrayRef = match first_field.data_type() { DataType::Dictionary(_, ref value_type, _) => { // Make a fake schema for the dictionary batch. - let schema = Arc::new(Schema { - fields: vec![Field::new("", value_type.as_ref().clone(), false)], - metadata: HashMap::new(), - }); + let fields = vec![Field::new("", value_type.as_ref().clone(), false)]; let ipc_schema = IpcSchema { fields: vec![first_ipc_field.clone()], is_little_endian: ipc_schema.is_little_endian, }; - assert_eq!(ipc_schema.fields.len(), schema.fields().len()); // Read a single column let columns = read_record_batch( batch.data().unwrap(), - schema, + &fields, &ipc_schema, None, dictionaries, diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index 4aa7b994e8b..ba2c9274137 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -19,7 +19,7 @@ use super::Dictionaries; #[derive(Debug, Clone)] pub struct FileMetadata { /// The schema that is read from the file footer - pub schema: Arc, + pub schema: Schema, /// The files' [`IpcSchema`] pub ipc_schema: IpcSchema, @@ -36,19 +36,12 @@ pub struct FileMetadata { version: ipc::Schema::MetadataVersion, } -impl FileMetadata { - /// Returns the schema. - pub fn schema(&self) -> &Arc { - &self.schema - } -} - /// Arrow File reader pub struct FileReader { reader: R, metadata: FileMetadata, current_block: usize, - projection: Option<(Vec, Arc)>, + projection: Option<(Vec, Schema)>, buffer: Vec, } @@ -164,7 +157,6 @@ pub fn read_file_metadata(reader: &mut R) -> Result( read_record_batch( batch, - metadata.schema.clone(), + metadata.schema.fields(), &metadata.ipc_schema, projection, &metadata.dictionaries, @@ -257,12 +249,12 @@ impl FileReader { }); } let projection = projection.map(|projection| { - let fields = metadata.schema().fields(); + let fields = metadata.schema.fields(); let fields = projection.iter().map(|x| fields[*x].clone()).collect(); - let schema = Arc::new(Schema { + let schema = Schema { fields, - metadata: metadata.schema().metadata().clone(), - }); + metadata: metadata.schema.metadata().clone(), + }; (projection, schema) }); Self { @@ -275,7 +267,7 @@ impl FileReader { } /// Return the schema of the file - pub fn schema(&self) -> &Arc { + pub fn schema(&self) -> &Schema { self.projection .as_ref() .map(|x| &x.1) diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index 4b8983e6c36..17e70d89382 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -18,7 +18,7 @@ use super::Dictionaries; #[derive(Debug, Clone)] pub struct StreamMetadata { /// The schema that is read from the stream's first message - pub schema: Arc, + pub schema: Schema, pub version: MetadataVersion, @@ -51,7 +51,6 @@ pub fn read_stream_metadata(reader: &mut R) -> Result { .header_as_schema() .ok_or_else(|| ArrowError::OutOfSpec("Unable to read IPC message as schema".to_string()))?; let (schema, ipc_schema) = fb_to_schema(ipc_schema); - let schema = Arc::new(schema); Ok(StreamMetadata { schema, @@ -156,7 +155,7 @@ fn read_next( read_record_batch( batch, - metadata.schema.clone(), + metadata.schema.fields(), &metadata.ipc_schema, None, dictionaries, diff --git a/src/io/json_integration/write/array.rs b/src/io/json_integration/write/array.rs index edb01823e3f..ebca2058ac8 100644 --- a/src/io/json_integration/write/array.rs +++ b/src/io/json_integration/write/array.rs @@ -9,7 +9,10 @@ use crate::{ use super::super::{ArrowJsonBatch, ArrowJsonColumn}; /// Serializes a [`Columns`] to [`ArrowJsonBatch`]. -pub fn serialize_columns(columns: &Columns>, names: &[&str]) -> ArrowJsonBatch { +pub fn serialize_columns( + columns: &Columns>, + names: &[A], +) -> ArrowJsonBatch { let count = columns.len(); let columns = columns diff --git a/tests/it/io/ipc/read/file.rs b/tests/it/io/ipc/read/file.rs index 4578a4b0e02..9331b2e673d 100644 --- a/tests/it/io/ipc/read/file.rs +++ b/tests/it/io/ipc/read/file.rs @@ -18,7 +18,7 @@ fn test_file(version: &str, file_name: &str) -> Result<()> { let metadata = read_file_metadata(&mut file)?; let reader = FileReader::new(file, metadata, None); - assert_eq!(&schema, reader.schema().as_ref()); + assert_eq!(&schema, reader.schema()); batches.iter().zip(reader).try_for_each(|(lhs, rhs)| { assert_eq!(lhs, &rhs?); diff --git a/tests/it/io/ipc/read/stream.rs b/tests/it/io/ipc/read/stream.rs index 3c3fcf095d6..18e7ecd0756 100644 --- a/tests/it/io/ipc/read/stream.rs +++ b/tests/it/io/ipc/read/stream.rs @@ -18,7 +18,7 @@ fn test_file(version: &str, file_name: &str) -> Result<()> { // read expected JSON output let (schema, ipc_fields, batches) = read_gzip_json(version, file_name)?; - assert_eq!(&schema, reader.metadata().schema.as_ref()); + assert_eq!(&schema, &reader.metadata().schema); assert_eq!(&ipc_fields, &reader.metadata().ipc_schema.fields); batches