Skip to content

Commit

Permalink
feat(batch): introduce delete executor in compute node (#883)
Browse files Browse the repository at this point in the history
* refactor insertion

Signed-off-by: Bugen Zhao <[email protected]>

* add delete executor

Signed-off-by: Bugen Zhao <[email protected]>

* move schema test utils

Signed-off-by: Bugen Zhao <[email protected]>

* format

Signed-off-by: Bugen Zhao <[email protected]>

* add tests

Signed-off-by: Bugen Zhao <[email protected]>

* add integration tests

Signed-off-by: Bugen Zhao <[email protected]>

* add docs

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Mar 14, 2022
1 parent 98d4d2b commit 7be7027
Show file tree
Hide file tree
Showing 11 changed files with 405 additions and 89 deletions.
5 changes: 5 additions & 0 deletions proto/plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ message InsertNode {
repeated int32 column_ids = 2;
}

message DeleteNode {
TableRefId table_source_ref_id = 1;
}

message ValuesNode {
message ExprTuple {
repeated expr.ExprNode cells = 1;
Expand Down Expand Up @@ -258,6 +262,7 @@ message PlanNode {
repeated PlanNode children = 1;
oneof node_body {
InsertNode insert = 2;
DeleteNode delete = 3;
ProjectNode project = 4;
CreateTableNode create_table = 5;
DropTableNode drop_table = 6;
Expand Down
233 changes: 233 additions & 0 deletions rust/batch/src/executor/delete.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
use std::sync::Arc;

use futures::future::try_join_all;
use risingwave_common::array::column::Column;
use risingwave_common::array::{ArrayBuilder, DataChunk, Op, PrimitiveArrayBuilder, StreamChunk};
use risingwave_common::catalog::{Field, Schema, TableId};
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::types::DataType;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_source::SourceManagerRef;

use super::BoxedExecutor;
use crate::executor::{BoxedExecutorBuilder, Executor, ExecutorBuilder};

/// [`DeleteExecutor`] implements table deletion with values from its child executor.
pub struct DeleteExecutor {
/// Target table id.
table_id: TableId,
source_manager: SourceManagerRef,

child: BoxedExecutor,
executed: bool,
schema: Schema,
identity: String,
}

impl DeleteExecutor {
pub fn new(table_id: TableId, source_manager: SourceManagerRef, child: BoxedExecutor) -> Self {
Self {
table_id,
source_manager,
child,
executed: false,
// TODO: support `RETURNING`
schema: Schema {
fields: vec![Field::unnamed(DataType::Int64)],
},
identity: "DeleteExecutor".to_string(),
}
}
}

#[async_trait::async_trait]
impl Executor for DeleteExecutor {
async fn open(&mut self) -> Result<()> {
self.child.open().await?;
info!("Delete executor");
Ok(())
}

async fn next(&mut self) -> Result<Option<DataChunk>> {
if self.executed {
return Ok(None);
}

let source_desc = self.source_manager.get_source(&self.table_id)?;
let source = source_desc.source.as_table_v2();

let mut notifiers = Vec::new();

while let Some(child_chunk) = self.child.next().await? {
let len = child_chunk.cardinality();
assert!(child_chunk.visibility().is_none());

let chunk = StreamChunk::from_parts(vec![Op::Delete; len], child_chunk);

let notifier = source.write_chunk(chunk)?;
notifiers.push(notifier);
}

// Wait for all chunks to be taken / written.
let rows_deleted = try_join_all(notifiers)
.await
.map_err(|_| {
RwError::from(ErrorCode::InternalError(
"failed to wait chunks to be written".to_owned(),
))
})?
.into_iter()
.sum::<usize>();

// create ret value
{
let mut array_builder = PrimitiveArrayBuilder::<i64>::new(1)?;
array_builder.append(Some(rows_deleted as i64))?;

let array = array_builder.finish()?;
let ret_chunk = DataChunk::builder()
.columns(vec![Column::new(Arc::new(array.into()))])
.build();

self.executed = true;
Ok(Some(ret_chunk))
}
}

async fn close(&mut self) -> Result<()> {
self.child.close().await?;
info!("Cleaning delete executor.");
Ok(())
}

fn schema(&self) -> &Schema {
&self.schema
}

fn identity(&self) -> &str {
&self.identity
}
}

impl BoxedExecutorBuilder for DeleteExecutor {
fn new_boxed_executor(source: &ExecutorBuilder) -> Result<BoxedExecutor> {
let delete_node = try_match_expand!(
source.plan_node().get_node_body().unwrap(),
NodeBody::Delete
)?;

let table_id = TableId::from(&delete_node.table_source_ref_id);

let proto_child = source.plan_node.get_children().get(0).ok_or_else(|| {
RwError::from(ErrorCode::InternalError(String::from(
"Child interpreting error",
)))
})?;
let child = source.clone_for_plan(proto_child).build()?;

Ok(Box::new(Self::new(
table_id,
source.global_batch_env().source_manager_ref(),
child,
)))
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use risingwave_common::array::{Array, I64Array};
use risingwave_common::catalog::{schema_test_utils, ColumnDesc, ColumnId};
use risingwave_common::column_nonnull;
use risingwave_source::{
MemSourceManager, Source, SourceManager, StreamSourceReader, TableV2ReaderContext,
};

use super::*;
use crate::executor::test_utils::MockExecutor;
use crate::*;

#[tokio::test]
async fn test_delete_executor() -> Result<()> {
let source_manager = Arc::new(MemSourceManager::new());

// Schema for mock executor.
let schema = schema_test_utils::ii();
let mut mock_executor = MockExecutor::new(schema.clone());

// Schema of the table
let schema = schema_test_utils::ii();

let table_columns: Vec<_> = schema
.fields
.iter()
.enumerate()
.map(|(i, f)| ColumnDesc {
data_type: f.data_type.clone(),
column_id: ColumnId::from(i as i32), // use column index as column id
name: f.name.clone(),
})
.collect();

let col1 = column_nonnull! { I64Array, [1, 3, 5, 7, 9] };
let col2 = column_nonnull! { I64Array, [2, 4, 6, 8, 10] };
let data_chunk: DataChunk = DataChunk::builder().columns(vec![col1, col2]).build();
mock_executor.add(data_chunk.clone());

// Create the table.
let table_id = TableId::new(0);
source_manager.create_table_source_v2(&table_id, table_columns.to_vec())?;

// Create reader
let source_desc = source_manager.get_source(&table_id)?;
let source = source_desc.source.as_table_v2();
let mut reader = source.stream_reader(TableV2ReaderContext, vec![0.into(), 1.into()])?;

// Delete
let mut delete_executor =
DeleteExecutor::new(table_id, source_manager.clone(), Box::new(mock_executor));
let handle = tokio::spawn(async move {
delete_executor.open().await.unwrap();
let result = delete_executor.next().await.unwrap().unwrap();
delete_executor.close().await.unwrap();
assert_eq!(
result
.column_at(0)
.array()
.as_int64()
.iter()
.collect::<Vec<_>>(),
vec![Some(5)] // deleted rows
);
});

// Read
reader.open().await?;
let chunk = reader.next().await?;

assert_eq!(chunk.ops().to_vec(), vec![Op::Delete; 5]);

assert_eq!(
chunk.columns()[0]
.array()
.as_int64()
.iter()
.collect::<Vec<_>>(),
vec![Some(1), Some(3), Some(5), Some(7), Some(9)]
);

assert_eq!(
chunk.columns()[1]
.array()
.as_int64()
.iter()
.collect::<Vec<_>>(),
vec![Some(2), Some(4), Some(6), Some(8), Some(10)]
);

handle.await.unwrap();

Ok(())
}
}
50 changes: 20 additions & 30 deletions rust/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ use risingwave_source::SourceManagerRef;
use super::BoxedExecutor;
use crate::executor::{BoxedExecutorBuilder, Executor, ExecutorBuilder};

/// `InsertExecutor` implements table insertion with values from its child executor.
/// [`InsertExecutor`] implements table insertion with values from its child executor.
pub struct InsertExecutor {
/// target table id
/// Target table id.
table_id: TableId,
source_manager: SourceManagerRef,
worker_id: u32,
Expand Down Expand Up @@ -57,7 +57,6 @@ impl Executor for InsertExecutor {
Ok(())
}

// TODO: refactor this function since we only have `TableV2` now.
async fn next(&mut self) -> Result<Option<DataChunk>> {
if self.executed {
return Ok(None);
Expand All @@ -67,10 +66,9 @@ impl Executor for InsertExecutor {
let source = source_desc.source.as_table_v2();

let mut notifiers = Vec::new();
let mut rows_inserted = 0;

while let Some(child_chunk) = self.child.next().await? {
let len = child_chunk.capacity();
let len = child_chunk.cardinality();
assert!(child_chunk.visibility().is_none());

// add row-id column as first column
Expand All @@ -84,24 +82,26 @@ impl Executor for InsertExecutor {
let rowid_column = once(Column::new(Arc::new(ArrayImpl::from(
builder.finish().unwrap(),
))));
let child_columns = child_chunk.columns().iter().map(|c| c.to_owned());
let child_columns = child_chunk.into_parts().0.into_iter();

// put row id column to the last to match the behavior of mview
let columns = child_columns.chain(rowid_column).collect();
let chunk = StreamChunk::new(vec![Op::Insert; len], columns, None);

let notifier = source.write_chunk(chunk)?;

notifiers.push(notifier);
rows_inserted += len;
}

// Wait for all chunks to be taken / written.
try_join_all(notifiers).await.map_err(|_| {
RwError::from(ErrorCode::InternalError(
"failed to wait chunks to be written".to_owned(),
))
})?;
let rows_inserted = try_join_all(notifiers)
.await
.map_err(|_| {
RwError::from(ErrorCode::InternalError(
"failed to wait chunks to be written".to_owned(),
))
})?
.into_iter()
.sum::<usize>();

// create ret value
{
Expand Down Expand Up @@ -164,7 +164,7 @@ mod tests {
use std::sync::Arc;

use risingwave_common::array::{Array, I64Array};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
use risingwave_common::catalog::{schema_test_utils, ColumnDesc, ColumnId};
use risingwave_common::column_nonnull;
use risingwave_common::types::DataType;
use risingwave_source::{
Expand All @@ -178,27 +178,16 @@ mod tests {
use crate::*;

#[tokio::test]
async fn test_insert_executor_for_table_v2() -> Result<()> {
async fn test_insert_executor() -> Result<()> {
let source_manager = Arc::new(MemSourceManager::new());
let store = MemoryStateStore::new();

// Schema for mock executor.
let schema = Schema {
fields: vec![
Field::unnamed(DataType::Int64),
Field::unnamed(DataType::Int64),
],
};
let schema = schema_test_utils::ii();
let mut mock_executor = MockExecutor::new(schema.clone());

// Schema of first table
let schema = Schema {
fields: vec![
Field::unnamed(DataType::Decimal),
Field::unnamed(DataType::Decimal),
Field::unnamed(DataType::Decimal),
],
};
// Schema of the table
let schema = schema_test_utils::iii();

let table_columns: Vec<_> = schema
.fields
Expand All @@ -216,7 +205,7 @@ mod tests {
let data_chunk: DataChunk = DataChunk::builder().columns(vec![col1, col2]).build();
mock_executor.add(data_chunk.clone());

// Create the first table.
// Create the table.
let table_id = TableId::new(0);
source_manager.create_table_source_v2(&table_id, table_columns.to_vec())?;

Expand Down Expand Up @@ -268,6 +257,7 @@ mod tests {
vec![Some(2), Some(4), Some(6), Some(8), Some(10)]
);

// Row id column
assert_eq!(
chunk.columns()[2]
.array()
Expand Down
Loading

0 comments on commit 7be7027

Please sign in to comment.