-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: multi-line json streaming #2729
Changes from 4 commits
7bf04bb
4360067
e6fd5c7
76541f6
8e29154
2bfe353
aa76632
31b3208
df7914a
1b610d7
e1bf40a
9dda83f
6dbdf9e
5cf3b10
9a9fe89
81daf3a
523c90d
133bef4
dad1f72
0ebc336
facc9d5
4866cdf
6247276
cd0faf9
9dcda74
698cc29
74bb2d6
59abfb6
f70d18b
3e92692
b23e32c
2807bb5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,97 +1,88 @@ | ||
use std::pin::Pin; | ||
use std::sync::{Arc, Mutex}; | ||
use std::sync::Arc; | ||
use std::task::{Context, Poll}; | ||
|
||
use datafusion::arrow::datatypes::{Schema, SchemaRef}; | ||
use datafusion::arrow::json::ReaderBuilder; | ||
use datafusion::arrow::record_batch::RecordBatch; | ||
use datafusion::error::DataFusionError; | ||
use datafusion::error::{DataFusionError, Result as DFResult}; | ||
use datafusion::execution::TaskContext; | ||
use datafusion::physical_plan::streaming::PartitionStream; | ||
use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; | ||
use futures::{Stream, StreamExt}; | ||
use futures::channel::mpsc; | ||
use futures::{SinkExt, Stream, StreamExt, TryStreamExt}; | ||
// use json_stream::JsonStream; | ||
use object_store::{ObjectMeta, ObjectStore}; | ||
use serde_json::{Map, Value}; | ||
use serde_json::{Deserializer, Map, Value}; | ||
|
||
use crate::json::errors::JsonError; | ||
use crate::json::table::push_unwind_json_values; | ||
use crate::json::errors::{JsonError, Result}; | ||
|
||
pub type SendableCheckedRecordBatchStream = | ||
Pin<Box<dyn Stream<Item = Result<RecordBatch, DataFusionError>> + Send>>; | ||
|
||
pub struct JsonStream { | ||
pub struct JsonRecordBatchStream { | ||
schema: Arc<Schema>, | ||
stream: SendableCheckedRecordBatchStream, | ||
// this is the same as a sendable recordbatch stream, but declared | ||
// separtley so we can have isomorphic values using adapters. | ||
stream: Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know what this means. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, so if the type of this is I think the comments made things less clear. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It might have been from not importing the |
||
} | ||
|
||
impl Stream for JsonStream { | ||
type Item = Result<RecordBatch, DataFusionError>; | ||
impl Stream for JsonRecordBatchStream { | ||
type Item = DFResult<RecordBatch>; | ||
|
||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
self.stream.poll_next_unpin(cx) | ||
} | ||
} | ||
|
||
impl RecordBatchStream for JsonStream { | ||
impl RecordBatchStream for JsonRecordBatchStream { | ||
fn schema(&self) -> SchemaRef { | ||
self.schema.clone() | ||
} | ||
} | ||
|
||
pub struct JsonPartitionStream { | ||
/// WrappedPartition wraps a vector of serde_json documents as a | ||
/// Partition as one of DataFusion's streaming table. Well all of | ||
pub struct WrappedPartition { | ||
schema: Arc<Schema>, | ||
stream: Mutex<Option<SendableCheckedRecordBatchStream>>, | ||
stream: Vec<Map<String, Value>>, | ||
} | ||
|
||
impl PartitionStream for JsonPartitionStream { | ||
impl PartitionStream for WrappedPartition { | ||
fn schema(&self) -> &SchemaRef { | ||
&self.schema | ||
} | ||
|
||
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream { | ||
let partition = self | ||
.stream | ||
.lock() | ||
.unwrap() | ||
.take() | ||
.expect("stream can only be used once") | ||
.boxed(); | ||
|
||
Box::pin(JsonStream { | ||
schema: self.schema.clone(), | ||
stream: partition, | ||
}) | ||
Box::pin(JsonStreamHandler::new( | ||
self.schema.clone(), | ||
futures::stream::iter(self.stream.clone().into_iter().map(Ok)).boxed(), | ||
)) | ||
} | ||
} | ||
|
||
impl JsonPartitionStream { | ||
impl WrappedPartition { | ||
pub fn new(schema: Arc<Schema>, chunk: Vec<Map<String, Value>>) -> Self { | ||
let stream_schema = schema.clone(); | ||
let stream = futures::stream::iter(chunk) | ||
.chunks(1000) | ||
.map(move |objs| { | ||
let mut decoder = ReaderBuilder::new(stream_schema.to_owned()).build_decoder()?; | ||
decoder | ||
.serialize(&objs) | ||
.map_err(|e| DataFusionError::External(Box::new(e)))?; | ||
Ok(decoder.flush()?.unwrap()) | ||
}) | ||
.boxed(); | ||
|
||
Self { | ||
schema: schema.clone(), | ||
stream: Mutex::new(Some(stream)), | ||
stream: chunk, | ||
} | ||
} | ||
} | ||
|
||
pub(crate) struct LazyJsonPartitionStream { | ||
/// ObjectStorePartition holds a reference to the object store and the | ||
/// object metadata and represents a partition that is read only when | ||
/// the partition is executed. | ||
pub(crate) struct ObjectStorePartition { | ||
schema: Arc<Schema>, | ||
store: Arc<dyn ObjectStore>, | ||
obj: ObjectMeta, | ||
} | ||
|
||
impl PartitionStream for LazyJsonPartitionStream { | ||
impl ObjectStorePartition { | ||
pub fn new(schema: Arc<Schema>, store: Arc<dyn ObjectStore>, obj: ObjectMeta) -> Self { | ||
Self { schema, store, obj } | ||
} | ||
} | ||
|
||
impl PartitionStream for ObjectStorePartition { | ||
fn schema(&self) -> &SchemaRef { | ||
&self.schema | ||
} | ||
|
@@ -101,45 +92,147 @@ impl PartitionStream for LazyJsonPartitionStream { | |
let store = self.store.clone(); | ||
let obj = self.obj.clone(); | ||
|
||
Box::pin(JsonStream { | ||
Box::pin(JsonRecordBatchStream { | ||
schema: self.schema.clone(), | ||
stream: futures::stream::once(async move { | ||
futures::stream::iter(match Self::build(stream_schema, store, obj).await { | ||
Ok(batches) => batches, | ||
Err(e) => vec![Err(DataFusionError::External(Box::new(e)))], | ||
}) | ||
match JsonStreamHandler::setup_read_stream(store, obj).await { | ||
Ok(st) => JsonStreamHandler::new(stream_schema, st), | ||
Err(e) => JsonStreamHandler::wrap_error(e), | ||
} | ||
}) | ||
.flatten() | ||
.boxed(), | ||
}) | ||
} | ||
} | ||
|
||
impl LazyJsonPartitionStream { | ||
pub fn new(schema: Arc<Schema>, store: Arc<dyn ObjectStore>, obj: ObjectMeta) -> Self { | ||
Self { schema, store, obj } | ||
/// JsonObjectStream represents a sequence of "json documents" in an | ||
/// intermediate format produced by serde_json. | ||
type JsonObjectStream = Pin<Box<dyn Stream<Item = Result<Map<String, Value>>> + Send>>; | ||
|
||
/// JsonStreamHandler is the basis of all stream handling, converting | ||
/// streams of serde_json objects to RecordBatches, including from | ||
/// object store and from iterators of values. | ||
/// | ||
/// These are used by the PartitionStream implementations (above) | ||
/// which interface into the table function and providers. | ||
struct JsonStreamHandler { | ||
schema: Arc<Schema>, | ||
buf: Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>, | ||
worker: tokio::task::JoinHandle<DFResult<()>>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this needed? Why can't we pull and decode directly from the stream? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Huh, I just refactored it without this and it seemed to work. smh |
||
} | ||
|
||
impl RecordBatchStream for JsonStreamHandler { | ||
fn schema(&self) -> SchemaRef { | ||
self.schema.clone() | ||
} | ||
} | ||
|
||
impl Stream for JsonStreamHandler { | ||
type Item = DFResult<RecordBatch>; | ||
|
||
async fn build( | ||
schema: Arc<Schema>, | ||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
match self.buf.poll_next_unpin(cx) { | ||
Poll::Pending => Poll::Pending, | ||
Poll::Ready(None) => { | ||
if self.worker.is_finished() { | ||
Poll::Pending | ||
} else { | ||
Poll::Ready(None) | ||
} | ||
} | ||
Poll::Ready(Some(val)) => Poll::Ready(Some(val)), | ||
} | ||
} | ||
} | ||
|
||
impl JsonStreamHandler { | ||
fn new(schema: Arc<Schema>, stream: JsonObjectStream) -> Self { | ||
let stream_schema = schema.clone(); | ||
let (mut sink, buf) = mpsc::channel(2048); | ||
|
||
Self { | ||
schema, | ||
buf: buf | ||
.chunks(1024) | ||
.map(move |chunk| { | ||
let mut decoder = | ||
ReaderBuilder::new(stream_schema.to_owned()).build_decoder()?; | ||
decoder | ||
.serialize(&chunk) | ||
.map_err(|e| DataFusionError::External(Box::new(e)))?; | ||
Ok(decoder.flush()?.unwrap()) | ||
}) | ||
.boxed(), | ||
worker: tokio::spawn(async move { | ||
let mut stream = stream; | ||
while let Some(row) = stream.next().await { | ||
sink.send(row?).await.map_err(JsonError::from)?; | ||
} | ||
Ok(()) | ||
}), | ||
} | ||
} | ||
|
||
fn wrap_error(err: JsonError) -> Self { | ||
Self { | ||
schema: Arc::new(Schema::empty()), | ||
buf: futures::stream::empty().boxed(), | ||
worker: tokio::task::spawn_local(async move { Err(err.into()) }), | ||
} | ||
} | ||
|
||
async fn setup_read_stream( | ||
store: Arc<dyn ObjectStore>, | ||
obj: ObjectMeta, | ||
) -> Result<Vec<Result<RecordBatch, DataFusionError>>, JsonError> { | ||
let mut data = Vec::new(); | ||
push_unwind_json_values( | ||
&mut data, | ||
serde_json::from_slice::<Value>(&store.get(&obj.location).await?.bytes().await?), | ||
)?; | ||
|
||
Ok(data | ||
.chunks(1000) | ||
.map(|chunk| { | ||
let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder()?; | ||
decoder | ||
.serialize(chunk) | ||
.map_err(|e| DataFusionError::External(Box::new(e)))?; | ||
Ok(decoder.flush()?.unwrap()) | ||
}) | ||
.collect()) | ||
) -> Result<JsonObjectStream> { | ||
// Ok(JsonStream::<Value, _>::new( | ||
// store | ||
// .get(&obj.location) | ||
// .await? | ||
// .into_stream() | ||
// .map_err(JsonError::ObjectStore), | ||
// ) | ||
// .flat_map(Self::unwind_json_value) | ||
// .boxed()) | ||
Ok(Box::pin( | ||
futures::stream::iter( | ||
Deserializer::from_reader(std::io::BufReader::new(std::io::Cursor::new( | ||
store.get(&obj.location).await?.bytes().await?.to_vec(), | ||
))) | ||
.into_iter(), | ||
) | ||
.map_err(JsonError::SerdeJson) | ||
.flat_map(Self::unwind_json_value), | ||
)) | ||
} | ||
|
||
fn unwind_json_value(input: Result<Value>) -> JsonObjectStream { | ||
futures::stream::iter(match input { | ||
Ok(value) => match value { | ||
Value::Array(vals) => { | ||
let mut out = Vec::with_capacity(vals.len()); | ||
for v in vals { | ||
match v { | ||
Value::Object(doc) => out.push(Ok(doc)), | ||
Value::Null => out.push(Ok(Map::new())), | ||
_ => out.push(Err(JsonError::UnspportedType( | ||
"only objects and arrays of objects are supported", | ||
))), | ||
} | ||
} | ||
out | ||
} | ||
Value::Object(doc) => vec![Ok(doc)], | ||
Value::Null => vec![Ok(Map::new())], | ||
_ => { | ||
vec![Err(JsonError::UnspportedType( | ||
"only objects and arrays of objects are supported", | ||
))] | ||
} | ||
}, | ||
Err(e) => vec![Err(e)], | ||
}) | ||
.boxed() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.