From 731986e9f9723f51a31178b766c21766a7472192 Mon Sep 17 00:00:00 2001 From: coldWater Date: Thu, 16 Jan 2025 16:03:29 +0800 Subject: [PATCH 1/5] udf runtime pool --- .../src/pipelines/builders/builder_udf.rs | 9 +- .../processors/transforms/aggregator/mod.rs | 1 + .../transforms/aggregator/udaf_script.rs | 162 ++++----- .../pipelines/processors/transforms/mod.rs | 14 +- .../processors/transforms/runtime_pool.rs | 54 +++ .../transforms/transform_udf_script.rs | 317 +++++++++--------- 6 files changed, 299 insertions(+), 258 deletions(-) create mode 100644 src/query/service/src/pipelines/processors/transforms/runtime_pool.rs diff --git a/src/query/service/src/pipelines/builders/builder_udf.rs b/src/query/service/src/pipelines/builders/builder_udf.rs index 7045374042eaf..da9b35d5559bf 100644 --- a/src/query/service/src/pipelines/builders/builder_udf.rs +++ b/src/query/service/src/pipelines/builders/builder_udf.rs @@ -12,13 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::atomic::AtomicUsize; -use std::sync::Arc; - use databend_common_exception::Result; use databend_common_pipeline_transforms::processors::TransformPipelineHelper; use databend_common_sql::executor::physical_plans::Udf; -use databend_common_storages_fuse::TableContext; use crate::pipelines::processors::transforms::TransformUdfScript; use crate::pipelines::processors::transforms::TransformUdfServer; @@ -29,15 +25,12 @@ impl PipelineBuilder { self.build_pipeline(&udf.input)?; if udf.script_udf { - let index_seq = Arc::new(AtomicUsize::new(0)); - let runtime_num = self.ctx.get_settings().get_max_threads()? as usize; - let runtimes = TransformUdfScript::init_runtime(&udf.udf_funcs, runtime_num)?; + let runtimes = TransformUdfScript::init_runtime(&udf.udf_funcs)?; self.main_pipeline.try_add_transformer(|| { Ok(TransformUdfScript::new( self.func_ctx.clone(), udf.udf_funcs.clone(), runtimes.clone(), - index_seq.clone(), )) }) } else { diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs index a4dd56da1e1c3..89eaede89b97a 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs @@ -37,3 +37,4 @@ pub use udaf_script::*; pub use utils::*; pub use self::serde::*; +use super::runtime_pool; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/udaf_script.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/udaf_script.rs index 9cef5d96932dc..85ecbc850e41e 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/udaf_script.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/udaf_script.rs @@ -17,7 +17,6 @@ use std::fmt; use std::io::BufRead; use std::io::Cursor; use std::sync::Arc; -use std::sync::Mutex; use arrow_array::Array; use arrow_array::RecordBatch; @@ -40,8 +39,8 @@ use databend_common_functions::aggregates::AggregateFunction; use databend_common_sql::plans::UDFLanguage; use databend_common_sql::plans::UDFScriptCode; -#[cfg(feature = "python-udf")] -use super::super::python_udf::GLOBAL_PYTHON_RUNTIME; +use super::runtime_pool::Pool; +use super::runtime_pool::RuntimeBuilder; pub struct AggregateUdfScript { display_name: String, @@ -138,6 +137,15 @@ impl AggregateFunction for AggregateUdfScript { builder.append_column(&result); Ok(()) } + + fn need_manual_drop_state(&self) -> bool { + true + } + + unsafe fn drop_state(&self, place: StateAddr) { + let state = place.get::(); + std::ptr::drop_in_place(state); + } } impl fmt::Display for AggregateUdfScript { @@ -244,10 +252,10 @@ pub fn create_udaf_script_function( let UDFScriptCode { language, code, .. } = code; let runtime = match language { UDFLanguage::JavaScript => { - let pool = JsRuntimePool::new( + let builder = JsRuntimeBuilder { name, - String::from_utf8(code.to_vec())?, - ArrowType::Struct( + code: String::from_utf8(code.to_vec())?, + state_type: ArrowType::Struct( state_fields .iter() .map(|f| f.into()) @@ -255,8 +263,8 @@ pub fn create_udaf_script_function( .into(), ), output_type, - ); - UDAFRuntime::JavaScript(pool) + }; + UDAFRuntime::JavaScript(JsRuntimePool::new(builder)) } UDFLanguage::WebAssembly => unimplemented!(), #[cfg(not(feature = "python-udf"))] @@ -267,22 +275,19 @@ pub fn create_udaf_script_function( } #[cfg(feature = "python-udf")] UDFLanguage::Python => { - let mut runtime = GLOBAL_PYTHON_RUNTIME.write(); - let code = String::from_utf8(code.to_vec())?; - runtime.add_aggregate( - &name, - ArrowType::Struct( + let builder = python_pool::PyRuntimeBuilder { + name, + code: String::from_utf8(code.to_vec())?, + state_type: ArrowType::Struct( state_fields .iter() .map(|f| f.into()) .collect::>() .into(), ), - ArrowType::from(&output_type), - arrow_udf_python::CallMode::CalledOnNullInput, - &code, - )?; - UDAFRuntime::Python(PythonInfo { name, output_type }) + output_type, + }; + UDAFRuntime::Python(Pool::new(builder)) } }; let init_state = runtime @@ -297,27 +302,17 @@ pub fn create_udaf_script_function( })) } -struct JsRuntimePool { +struct JsRuntimeBuilder { name: String, code: String, state_type: ArrowType, output_type: DataType, - - runtimes: Mutex>, } -impl JsRuntimePool { - fn new(name: String, code: String, state_type: ArrowType, output_type: DataType) -> Self { - Self { - name, - code, - state_type, - output_type, - runtimes: Mutex::new(vec![]), - } - } +impl RuntimeBuilder for JsRuntimeBuilder { + type Error = ErrorCode; - fn create(&self) -> Result { + fn build(&self) -> std::result::Result { let mut runtime = match arrow_udf_js::Runtime::new() { Ok(runtime) => runtime, Err(e) => { @@ -344,23 +339,41 @@ impl JsRuntimePool { Ok(runtime) } +} - fn call(&self, op: F) -> anyhow::Result - where F: FnOnce(&arrow_udf_js::Runtime) -> anyhow::Result { - let mut runtimes = self.runtimes.lock().unwrap(); - let runtime = match runtimes.pop() { - Some(runtime) => runtime, - None => self.create()?, - }; - drop(runtimes); +type JsRuntimePool = Pool; + +#[cfg(feature = "python-udf")] +mod python_pool { + use super::*; - let result = op(&runtime)?; + pub(super) struct PyRuntimeBuilder { + pub name: String, + pub code: String, + pub state_type: ArrowType, + pub output_type: DataType, + } - let mut runtimes = self.runtimes.lock().unwrap(); - runtimes.push(runtime); + impl RuntimeBuilder for PyRuntimeBuilder { + type Error = ErrorCode; - Ok(result) + fn build(&self) -> std::result::Result { + let mut runtime = arrow_udf_python::Builder::default() + .sandboxed(true) + .build()?; + let output_type: ArrowType = (&self.output_type).into(); + runtime.add_aggregate( + &self.name, + self.state_type.clone(), + output_type, + arrow_udf_python::CallMode::CalledOnNullInput, + &self.code, + )?; + Ok(runtime) + } } + + pub type PyRuntimePool = Pool; } enum UDAFRuntime { @@ -368,41 +381,36 @@ enum UDAFRuntime { #[expect(unused)] WebAssembly, #[cfg(feature = "python-udf")] - Python(PythonInfo), -} - -#[cfg(feature = "python-udf")] -struct PythonInfo { - name: String, - output_type: DataType, + Python(python_pool::PyRuntimePool), } impl UDAFRuntime { fn name(&self) -> &str { match self { - UDAFRuntime::JavaScript(pool) => &pool.name, + UDAFRuntime::JavaScript(pool) => &pool.builder.name, #[cfg(feature = "python-udf")] - UDAFRuntime::Python(info) => &info.name, + UDAFRuntime::Python(info) => &info.builder.name, _ => unimplemented!(), } } fn return_type(&self) -> DataType { match self { - UDAFRuntime::JavaScript(pool) => pool.output_type.clone(), + UDAFRuntime::JavaScript(pool) => pool.builder.output_type.clone(), #[cfg(feature = "python-udf")] - UDAFRuntime::Python(info) => info.output_type.clone(), + UDAFRuntime::Python(info) => info.builder.output_type.clone(), _ => unimplemented!(), } } fn create_state(&self) -> anyhow::Result { let state = match self { - UDAFRuntime::JavaScript(pool) => pool.call(|runtime| runtime.create_state(&pool.name)), + UDAFRuntime::JavaScript(pool) => { + pool.call(|runtime| runtime.create_state(&pool.builder.name)) + } #[cfg(feature = "python-udf")] - UDAFRuntime::Python(info) => { - let runtime = GLOBAL_PYTHON_RUNTIME.read(); - runtime.create_state(&info.name) + UDAFRuntime::Python(pool) => { + pool.call(|runtime| runtime.create_state(&pool.builder.name)) } _ => unimplemented!(), }?; @@ -412,12 +420,11 @@ impl UDAFRuntime { fn accumulate(&self, state: &UdfAggState, input: &RecordBatch) -> anyhow::Result { let state = match self { UDAFRuntime::JavaScript(pool) => { - pool.call(|runtime| runtime.accumulate(&pool.name, &state.0, input)) + pool.call(|runtime| runtime.accumulate(&pool.builder.name, &state.0, input)) } #[cfg(feature = "python-udf")] - UDAFRuntime::Python(info) => { - let runtime = GLOBAL_PYTHON_RUNTIME.read(); - runtime.accumulate(&info.name, &state.0, input) + UDAFRuntime::Python(pool) => { + pool.call(|runtime| runtime.accumulate(&pool.builder.name, &state.0, input)) } _ => unimplemented!(), }?; @@ -426,11 +433,12 @@ impl UDAFRuntime { fn merge(&self, states: &Arc) -> anyhow::Result { let state = match self { - UDAFRuntime::JavaScript(pool) => pool.call(|runtime| runtime.merge(&pool.name, states)), + UDAFRuntime::JavaScript(pool) => { + pool.call(|runtime| runtime.merge(&pool.builder.name, states)) + } #[cfg(feature = "python-udf")] - UDAFRuntime::Python(info) => { - let runtime = GLOBAL_PYTHON_RUNTIME.read(); - runtime.merge(&info.name, states) + UDAFRuntime::Python(pool) => { + pool.call(|runtime| runtime.merge(&pool.builder.name, states)) } _ => unimplemented!(), }?; @@ -440,12 +448,11 @@ impl UDAFRuntime { fn finish(&self, state: &UdfAggState) -> anyhow::Result> { match self { UDAFRuntime::JavaScript(pool) => { - pool.call(|runtime| runtime.finish(&pool.name, &state.0)) + pool.call(|runtime| runtime.finish(&pool.builder.name, &state.0)) } #[cfg(feature = "python-udf")] - UDAFRuntime::Python(info) => { - let runtime = GLOBAL_PYTHON_RUNTIME.read(); - runtime.finish(&info.name, &state.0) + UDAFRuntime::Python(pool) => { + pool.call(|runtime| runtime.finish(&pool.builder.name, &state.0)) } _ => unimplemented!(), } @@ -495,9 +502,9 @@ mod tests { Field::new("sum", ArrowType::Int64, false), Field::new("weight", ArrowType::Int64, false), ]; - let pool = JsRuntimePool::new( - agg_name.clone(), - r#" + let builder = JsRuntimeBuilder { + name: agg_name.clone(), + code: r#" export function create_state() { return {sum: 0, weight: 0}; } @@ -521,9 +528,10 @@ export function finish(state) { } "# .to_string(), - ArrowType::Struct(fields.clone().into()), - Float32Type::data_type(), - ); + state_type: ArrowType::Struct(fields.clone().into()), + output_type: Float32Type::data_type(), + }; + let pool = JsRuntimePool::new(builder); let state = pool.call(|runtime| runtime.create_state(&agg_name))?; diff --git a/src/query/service/src/pipelines/processors/transforms/mod.rs b/src/query/service/src/pipelines/processors/transforms/mod.rs index 55078470553b9..2e76f346a0e40 100644 --- a/src/query/service/src/pipelines/processors/transforms/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/mod.rs @@ -15,6 +15,7 @@ pub mod aggregator; mod hash_join; pub(crate) mod range_join; +mod runtime_pool; mod transform_add_computed_columns; mod transform_add_const_columns; mod transform_add_internal_columns; @@ -66,16 +67,3 @@ pub use transform_stream_sort_spill::*; pub use transform_udf_script::TransformUdfScript; pub use transform_udf_server::TransformUdfServer; pub use window::*; - -#[cfg(feature = "python-udf")] -mod python_udf { - use std::sync::Arc; - use std::sync::LazyLock; - - use arrow_udf_python::Runtime; - use parking_lot::RwLock; - - /// python runtime should be only initialized once by gil lock, see: https://github.com/python/cpython/blob/main/Python/pystate.c - pub static GLOBAL_PYTHON_RUNTIME: LazyLock>> = - LazyLock::new(|| Arc::new(RwLock::new(Runtime::new().unwrap()))); -} diff --git a/src/query/service/src/pipelines/processors/transforms/runtime_pool.rs b/src/query/service/src/pipelines/processors/transforms/runtime_pool.rs new file mode 100644 index 0000000000000..94c689f75f47e --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/runtime_pool.rs @@ -0,0 +1,54 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Mutex; + +pub trait RuntimeBuilder { + type Error; + fn build(&self) -> Result; +} + +pub struct Pool> { + pub builder: B, + runtimes: Mutex>, +} + +impl> Pool { + pub fn new(builder: B) -> Self { + Self { + builder, + runtimes: Mutex::new(vec![]), + } + } + + pub fn call(&self, op: F) -> Result + where + F: FnOnce(&R) -> Result, + E: From, + { + let mut runtimes = self.runtimes.lock().unwrap(); + let runtime = match runtimes.pop() { + Some(runtime) => runtime, + None => self.builder.build()?, + }; + drop(runtimes); + + let result = op(&runtime)?; + + let mut runtimes = self.runtimes.lock().unwrap(); + runtimes.push(runtime); + + Ok(result) + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs index 9c499496806f8..05ae25841e709 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs @@ -15,15 +15,15 @@ use std::collections::btree_map::Entry; use std::collections::BTreeMap; use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering; use std::sync::Arc; use arrow_array::RecordBatch; -use arrow_schema::Schema; +use arrow_schema::DataType as ArrowType; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::converts::arrow::ARROW_EXT_TYPE_VARIANT; use databend_common_expression::converts::arrow::EXTENSION_KEY; +use databend_common_expression::types::DataType; use databend_common_expression::variant_transform::contains_variant; use databend_common_expression::variant_transform::transform_variant; use databend_common_expression::BlockEntry; @@ -36,47 +36,58 @@ use databend_common_sql::executor::physical_plans::UdfFunctionDesc; use databend_common_sql::plans::UDFLanguage; use databend_common_sql::plans::UDFScriptCode; use databend_common_sql::plans::UDFType; -use parking_lot::RwLock; -#[cfg(feature = "python-udf")] -use super::python_udf::GLOBAL_PYTHON_RUNTIME; +use super::runtime_pool::Pool; +use super::runtime_pool::RuntimeBuilder; pub enum ScriptRuntime { - JavaScript(Vec>>), - WebAssembly(Arc>), + JavaScript(JsRuntimePool), + WebAssembly(arrow_udf_wasm::Runtime), #[cfg(feature = "python-udf")] - Python, + Python(python_pool::PyRuntimePool), } impl ScriptRuntime { - pub fn try_create(lang: UDFLanguage, code: Option<&[u8]>, runtime_num: usize) -> Result { - match lang { + pub fn try_create(func: &UdfFunctionDesc) -> Result { + let UDFType::Script(UDFScriptCode { language, code, .. }) = &func.udf_type else { + unreachable!() + }; + match language { UDFLanguage::JavaScript => { - // Create multiple runtimes to execute in parallel to avoid blocking caused by js udf runtime locks. - let runtimes = (0..runtime_num) - .map(|_| { - arrow_udf_js::Runtime::new() - .map(|mut runtime| { - runtime - .converter_mut() - .set_arrow_extension_key(EXTENSION_KEY); - runtime - .converter_mut() - .set_json_extension_name(ARROW_EXT_TYPE_VARIANT); - Arc::new(RwLock::new(runtime)) - }) - .map_err(|err| { - ErrorCode::UDFRuntimeError(format!( - "Cannot create js runtime: {err}", - )) - }) - }) - .collect::>>>>()?; - Ok(Self::JavaScript(runtimes)) + let builder = JsRuntimeBuilder { + name: func.name.clone(), + handler: func.func_name.clone(), + code: String::from_utf8(code.to_vec())?, + output_type: func.data_type.as_ref().clone(), + counter: Default::default(), + }; + Ok(Self::JavaScript(JsRuntimePool::new(builder))) + } + UDFLanguage::WebAssembly => { + let start = std::time::Instant::now(); + let runtime = arrow_udf_wasm::Runtime::new(code).map_err(|err| { + ErrorCode::UDFRuntimeError(format!( + "Failed to create WASM runtime for module: {err}" + )) + })?; + log::info!( + "Init WebAssembly UDF runtime for {:?} took: {:?}", + func.name, + start.elapsed() + ); + Ok(ScriptRuntime::WebAssembly(runtime)) } - UDFLanguage::WebAssembly => Self::create_wasm_runtime(code), #[cfg(feature = "python-udf")] - UDFLanguage::Python => Ok(Self::Python), + UDFLanguage::Python => { + let builder = PyRuntimeBuilder { + name: func.name.clone(), + handler: func.func_name.clone(), + code: String::from_utf8(code.to_vec())?, + output_type: func.data_type.as_ref().clone(), + counter: Default::default(), + }; + Ok(Self::Python(python_pool::PyRuntimePool::new(builder))) + } #[cfg(not(feature = "python-udf"))] UDFLanguage::Python => Err(ErrorCode::EnterpriseFeatureNotEnable( "Failed to create python script udf", @@ -84,89 +95,30 @@ impl ScriptRuntime { } } - fn create_wasm_runtime(code_blob: Option<&[u8]>) -> Result { - let decoded_code_blob = code_blob - .ok_or_else(|| ErrorCode::UDFDataError("WASM module not provided".to_string()))?; - - let runtime = arrow_udf_wasm::Runtime::new(decoded_code_blob).map_err(|err| { - ErrorCode::UDFRuntimeError(format!("Failed to create WASM runtime for module: {err}")) - })?; - - Ok(ScriptRuntime::WebAssembly(Arc::new(RwLock::new(runtime)))) - } - - pub fn add_function_with_handler(&self, func: &UdfFunctionDesc, code: &[u8]) -> Result<()> { - let tmp_schema = - DataSchema::new(vec![DataField::new("tmp", func.data_type.as_ref().clone())]); - let arrow_schema = Schema::from(&tmp_schema); - - match self { - ScriptRuntime::JavaScript(runtimes) => { - let code = std::str::from_utf8(code)?; - for runtime in runtimes { - let mut runtime = runtime.write(); - runtime.add_function_with_handler( - &func.name, - // we pass the field instead of the data type because arrow-udf-js - // now takes the field as an argument here so that it can get any - // metadata associated with the field - arrow_schema.field(0).clone(), - arrow_udf_js::CallMode::ReturnNullOnNullInput, - code, - &func.func_name, - )?; - } - } - #[cfg(feature = "python-udf")] - ScriptRuntime::Python => { - let code: &str = std::str::from_utf8(code)?; - let mut runtime = GLOBAL_PYTHON_RUNTIME.write(); - runtime.add_function_with_handler( - &func.name, - arrow_schema.field(0).data_type().clone(), - arrow_udf_python::CallMode::ReturnNullOnNullInput, - code, - &func.func_name, - )?; - } - // Ignore the execution for WASM context - ScriptRuntime::WebAssembly(_) => {} - } - - Ok(()) - } - pub fn handle_execution( &self, func: &UdfFunctionDesc, input_batch: &RecordBatch, - index: usize, ) -> Result { let result_batch = match self { - ScriptRuntime::JavaScript(runtimes) => { - // Choose a js runtime in order to avoid blocking - let idx = index % runtimes.len(); - let runtime = &runtimes[idx]; - let runtime = runtime.read(); - runtime.call(&func.name, input_batch).map_err(|err| { + ScriptRuntime::JavaScript(pool) => pool + .call(|runtime| runtime.call(&func.name, input_batch)) + .map_err(|err| { ErrorCode::UDFRuntimeError(format!( "JavaScript UDF {:?} execution failed: {err}", func.name )) - })? - } + })?, #[cfg(feature = "python-udf")] - ScriptRuntime::Python => { - let runtime = GLOBAL_PYTHON_RUNTIME.read(); - runtime.call(&func.name, input_batch).map_err(|err| { + ScriptRuntime::Python(pool) => pool + .call(|runtime| runtime.call(&func.name, input_batch)) + .map_err(|err| { ErrorCode::UDFRuntimeError(format!( "Python UDF {:?} execution failed: {err}", func.name )) - })? - } + })?, ScriptRuntime::WebAssembly(runtime) => { - let runtime = runtime.read(); runtime.call(&func.func_name, input_batch).map_err(|err| { ErrorCode::UDFRuntimeError(format!( "WASM UDF {:?} execution failed: {err}", @@ -179,25 +131,110 @@ impl ScriptRuntime { } } +pub struct JsRuntimeBuilder { + name: String, + handler: String, + code: String, + output_type: DataType, + + counter: AtomicUsize, +} + +impl RuntimeBuilder for JsRuntimeBuilder { + type Error = ErrorCode; + + fn build(&self) -> Result { + let start = std::time::Instant::now(); + let mut runtime = arrow_udf_js::Runtime::new() + .map_err(|e| ErrorCode::UDFDataError(format!("Cannot create js runtime: {e}")))?; + + let output_type: ArrowType = (&self.output_type).into(); + runtime.add_function_with_handler( + &self.name, + output_type, + arrow_udf_js::CallMode::ReturnNullOnNullInput, + &self.code, + &self.handler, + )?; + + let converter = runtime.converter_mut(); + converter.set_arrow_extension_key(EXTENSION_KEY); + converter.set_json_extension_name(ARROW_EXT_TYPE_VARIANT); + + log::info!( + "Init JavaScript UDF runtime for {:?} #{} took: {:?}", + self.name, + self.counter + .fetch_add(1, std::sync::atomic::Ordering::SeqCst), + start.elapsed() + ); + + Ok(runtime) + } +} + +type JsRuntimePool = Pool; + +#[cfg(feature = "python-udf")] +struct PyRuntimeBuilder { + name: String, + handler: String, + code: String, + output_type: DataType, + + counter: AtomicUsize, +} + +#[cfg(feature = "python-udf")] +mod python_pool { + use super::*; + + impl RuntimeBuilder for PyRuntimeBuilder { + type Error = ErrorCode; + + fn build(&self) -> Result { + let start = std::time::Instant::now(); + let mut runtime = arrow_udf_python::Builder::default() + .sandboxed(true) + .build()?; + let output_type: ArrowType = (&self.output_type).into(); + runtime.add_function_with_handler( + &self.name, + output_type, + arrow_udf_python::CallMode::CalledOnNullInput, + &self.code, + &self.handler, + )?; + + log::info!( + "Init Python UDF runtime for {:?} #{} took: {:?}", + self.name, + self.counter + .fetch_add(1, std::sync::atomic::Ordering::SeqCst), + start.elapsed() + ); + + Ok(runtime) + } + } + + pub type PyRuntimePool = Pool; +} + pub struct TransformUdfScript { funcs: Vec, script_runtimes: BTreeMap>, - index_seq: Arc, } -unsafe impl Send for TransformUdfScript {} - impl TransformUdfScript { pub fn new( _func_ctx: FunctionContext, funcs: Vec, script_runtimes: BTreeMap>, - index_seq: Arc, ) -> Self { Self { funcs, script_runtimes, - index_seq, } } } @@ -212,79 +249,39 @@ impl Transform for TransformUdfScript { return Ok(data_block); } - let index = self.index_seq.fetch_add(1, Ordering::SeqCst); for func in &self.funcs { let num_rows = data_block.num_rows(); let block_entries = self.prepare_block_entries(func, &data_block)?; let input_batch = self.create_input_batch(block_entries, num_rows)?; - let runtime_key = Self::get_runtime_key(func)?; - - if let Some(runtime) = self.script_runtimes.get(&runtime_key) { - let result_batch = runtime.handle_execution(func, &input_batch, index)?; - self.update_datablock(func, result_batch, &mut data_block)?; - } else { - return Err(ErrorCode::UDFDataError(format!( - "Failed to find runtime for function {:?} with key: {:?}", - func.name, runtime_key - ))); - } + let runtime = self.script_runtimes.get(&func.name).unwrap(); + let result_batch = runtime.handle_execution(func, &input_batch)?; + self.update_datablock(func, result_batch, &mut data_block)?; } Ok(data_block) } } impl TransformUdfScript { - fn get_runtime_key(func: &UdfFunctionDesc) -> Result { - let (lang, func_name) = match &func.udf_type { - UDFType::Script(UDFScriptCode { language: lang, .. }) => (lang, &func.func_name), - _ => { - return Err(ErrorCode::UDFDataError(format!( - "Unsupported UDFType variant for function {:?}", - func.name - ))); - } - }; - - let runtime_key = format!("{}-{}", lang, func_name.trim()); - Ok(runtime_key) - } - - pub fn init_runtime( - funcs: &[UdfFunctionDesc], - runtime_num: usize, - ) -> Result>> { + pub fn init_runtime(funcs: &[UdfFunctionDesc]) -> Result>> { let mut script_runtimes: BTreeMap> = BTreeMap::new(); - let start = std::time::Instant::now(); for func in funcs { - let (&lang, code_opt) = match &func.udf_type { - UDFType::Script(UDFScriptCode { language, code, .. }) => { - (language, Some(code.as_ref().as_ref())) - } + let code = match &func.udf_type { + UDFType::Script(code) => code, _ => continue, }; - let runtime_key = Self::get_runtime_key(func)?; - let runtime = match script_runtimes.entry(runtime_key.clone()) { - Entry::Occupied(entry) => entry.into_mut().clone(), - Entry::Vacant(entry) => { - let new_runtime = ScriptRuntime::try_create(lang, code_opt, runtime_num) - .map(Arc::new) - .map_err(|err| { - ErrorCode::UDFDataError(format!( - "Failed to create UDF runtime for language {lang:?} with error: {err}", - )) - })?; - entry.insert(new_runtime).clone() - } + if let Entry::Vacant(entry) = script_runtimes.entry(func.name.clone()) { + let runtime = ScriptRuntime::try_create(func).map_err(|err| { + ErrorCode::UDFDataError(format!( + "Failed to create UDF runtime for language {:?} with error: {err}", + code.language + )) + })?; + entry.insert(Arc::new(runtime)); }; - - if let UDFType::Script(UDFScriptCode { code, .. }) = &func.udf_type { - runtime.add_function_with_handler(func, code.as_ref().as_ref())?; - } } - log::info!("Init UDF runtimes took: {:?}", start.elapsed()); Ok(script_runtimes) } @@ -378,7 +375,7 @@ impl TransformUdfScript { if col.data_type != func.data_type.as_ref().clone() { return Err(ErrorCode::UDFDataError(format!( - "Function '{}' returned column with data type {:?} but expected {:?}", + "Function {:?} returned column with data type {:?} but expected {:?}", func.name, col.data_type, func.data_type ))); } From 3bcb14c20185e4b64abc1c01bb9d2c3d9d6440ab Mon Sep 17 00:00:00 2001 From: coldWater Date: Thu, 16 Jan 2025 16:59:50 +0800 Subject: [PATCH 2/5] fix --- .../src/pipelines/processors/transforms/transform_udf_script.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs index 05ae25841e709..053936fa2130a 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs @@ -176,7 +176,7 @@ impl RuntimeBuilder for JsRuntimeBuilder { type JsRuntimePool = Pool; #[cfg(feature = "python-udf")] -struct PyRuntimeBuilder { +pub struct PyRuntimeBuilder { name: String, handler: String, code: String, From db3ec12a36cd6b6d21ff4441afd19a40e0fba789 Mon Sep 17 00:00:00 2001 From: coldWater Date: Thu, 16 Jan 2025 18:46:53 +0800 Subject: [PATCH 3/5] fix --- .../transforms/transform_udf_script.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs index 053936fa2130a..554af421d81be 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs @@ -148,19 +148,24 @@ impl RuntimeBuilder for JsRuntimeBuilder { let mut runtime = arrow_udf_js::Runtime::new() .map_err(|e| ErrorCode::UDFDataError(format!("Cannot create js runtime: {e}")))?; - let output_type: ArrowType = (&self.output_type).into(); + let converter = runtime.converter_mut(); + converter.set_arrow_extension_key(EXTENSION_KEY); + converter.set_json_extension_name(ARROW_EXT_TYPE_VARIANT); + + // we pass the field instead of the data type because arrow-udf-js + // now takes the field as an argument here so that it can get any + // metadata associated with the field + let tmp_schema = DataSchema::new(vec![DataField::new("tmp", self.output_type.clone())]); + let return_type = arrow_schema::Schema::from(&tmp_schema).field(0).clone(); + runtime.add_function_with_handler( &self.name, - output_type, + return_type, arrow_udf_js::CallMode::ReturnNullOnNullInput, &self.code, &self.handler, )?; - let converter = runtime.converter_mut(); - converter.set_arrow_extension_key(EXTENSION_KEY); - converter.set_json_extension_name(ARROW_EXT_TYPE_VARIANT); - log::info!( "Init JavaScript UDF runtime for {:?} #{} took: {:?}", self.name, From b8ca9bd4d1301be24e5f40b526222d3fe696211c Mon Sep 17 00:00:00 2001 From: coldWater Date: Thu, 16 Jan 2025 23:36:23 +0800 Subject: [PATCH 4/5] fix --- .../transforms/aggregator/udaf_script.rs | 14 +++++++++---- .../processors/transforms/runtime_pool.rs | 4 ++-- .../transforms/transform_udf_script.rs | 20 +++++++++---------- 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/udaf_script.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/udaf_script.rs index 85ecbc850e41e..0b324b6c1ef63 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/udaf_script.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/udaf_script.rs @@ -326,12 +326,14 @@ impl RuntimeBuilder for JsRuntimeBuilder { converter.set_arrow_extension_key(EXTENSION_KEY); converter.set_json_extension_name(ARROW_EXT_TYPE_VARIANT); - let output_type: ArrowType = (&self.output_type).into(); runtime .add_aggregate( &self.name, self.state_type.clone(), - output_type, + // we pass the field instead of the data type because arrow-udf-js + // now takes the field as an argument here so that it can get any + // metadata associated with the field + arrow_field_from_data_type(&self.name, self.output_type.clone()), arrow_udf_js::CallMode::CalledOnNullInput, &self.code, ) @@ -341,6 +343,11 @@ impl RuntimeBuilder for JsRuntimeBuilder { } } +fn arrow_field_from_data_type(name: &str, dt: DataType) -> arrow_schema::Field { + let field = DataField::new(name, dt); + (&field).into() +} + type JsRuntimePool = Pool; #[cfg(feature = "python-udf")] @@ -361,11 +368,10 @@ mod python_pool { let mut runtime = arrow_udf_python::Builder::default() .sandboxed(true) .build()?; - let output_type: ArrowType = (&self.output_type).into(); runtime.add_aggregate( &self.name, self.state_type.clone(), - output_type, + arrow_field_from_data_type(&self.name, self.output_type.clone()), arrow_udf_python::CallMode::CalledOnNullInput, &self.code, )?; diff --git a/src/query/service/src/pipelines/processors/transforms/runtime_pool.rs b/src/query/service/src/pipelines/processors/transforms/runtime_pool.rs index 94c689f75f47e..5a3fc8437b3a4 100644 --- a/src/query/service/src/pipelines/processors/transforms/runtime_pool.rs +++ b/src/query/service/src/pipelines/processors/transforms/runtime_pool.rs @@ -44,11 +44,11 @@ impl> Pool { }; drop(runtimes); - let result = op(&runtime)?; + let result = op(&runtime); let mut runtimes = self.runtimes.lock().unwrap(); runtimes.push(runtime); - Ok(result) + result } } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs index 554af421d81be..1447e95897693 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs @@ -18,7 +18,6 @@ use std::sync::atomic::AtomicUsize; use std::sync::Arc; use arrow_array::RecordBatch; -use arrow_schema::DataType as ArrowType; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::converts::arrow::ARROW_EXT_TYPE_VARIANT; @@ -152,15 +151,12 @@ impl RuntimeBuilder for JsRuntimeBuilder { converter.set_arrow_extension_key(EXTENSION_KEY); converter.set_json_extension_name(ARROW_EXT_TYPE_VARIANT); - // we pass the field instead of the data type because arrow-udf-js - // now takes the field as an argument here so that it can get any - // metadata associated with the field - let tmp_schema = DataSchema::new(vec![DataField::new("tmp", self.output_type.clone())]); - let return_type = arrow_schema::Schema::from(&tmp_schema).field(0).clone(); - runtime.add_function_with_handler( &self.name, - return_type, + // we pass the field instead of the data type because arrow-udf-js + // now takes the field as an argument here so that it can get any + // metadata associated with the field + arrow_field_from_data_type(&self.name, self.output_type.clone()), arrow_udf_js::CallMode::ReturnNullOnNullInput, &self.code, &self.handler, @@ -178,6 +174,11 @@ impl RuntimeBuilder for JsRuntimeBuilder { } } +fn arrow_field_from_data_type(name: &str, dt: DataType) -> arrow_schema::Field { + let field = DataField::new(name, dt); + (&field).into() +} + type JsRuntimePool = Pool; #[cfg(feature = "python-udf")] @@ -202,10 +203,9 @@ mod python_pool { let mut runtime = arrow_udf_python::Builder::default() .sandboxed(true) .build()?; - let output_type: ArrowType = (&self.output_type).into(); runtime.add_function_with_handler( &self.name, - output_type, + arrow_field_from_data_type(&self.name, self.output_type.clone()), arrow_udf_python::CallMode::CalledOnNullInput, &self.code, &self.handler, From f26b627c185624bf4956591865ec147518c3a381 Mon Sep 17 00:00:00 2001 From: coldWater Date: Fri, 17 Jan 2025 15:16:25 +0800 Subject: [PATCH 5/5] test --- .../20_0023_udf_script_parallelism.result | 3 +++ .../20_0023_udf_script_parallelism.sh | 19 +++++++++++++++++++ ...able.result => 07_0000_amend_table.result} | 0 ..._amend_table.sh => 07_0000_amend_table.sh} | 0 .../08_udf_python/08_0000_parallelism.result | 3 +++ .../5_ee/08_udf_python/08_0000_parallelism.sh | 19 +++++++++++++++++++ 6 files changed, 44 insertions(+) create mode 100755 tests/suites/0_stateless/20+_others/20_0023_udf_script_parallelism.result create mode 100755 tests/suites/0_stateless/20+_others/20_0023_udf_script_parallelism.sh rename tests/suites/5_ee/07_failsafe/{08_0000_amend_table.result => 07_0000_amend_table.result} (100%) rename tests/suites/5_ee/07_failsafe/{08_0000_amend_table.sh => 07_0000_amend_table.sh} (100%) create mode 100755 tests/suites/5_ee/08_udf_python/08_0000_parallelism.result create mode 100755 tests/suites/5_ee/08_udf_python/08_0000_parallelism.sh diff --git a/tests/suites/0_stateless/20+_others/20_0023_udf_script_parallelism.result b/tests/suites/0_stateless/20+_others/20_0023_udf_script_parallelism.result new file mode 100755 index 0000000000000..26fa101a44a09 --- /dev/null +++ b/tests/suites/0_stateless/20+_others/20_0023_udf_script_parallelism.result @@ -0,0 +1,3 @@ +1000000 +2000000 +100000000 diff --git a/tests/suites/0_stateless/20+_others/20_0023_udf_script_parallelism.sh b/tests/suites/0_stateless/20+_others/20_0023_udf_script_parallelism.sh new file mode 100755 index 0000000000000..a9303b2a6287d --- /dev/null +++ b/tests/suites/0_stateless/20+_others/20_0023_udf_script_parallelism.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env bash +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. "$CURDIR"/../../../shell_env.sh + +$BENDSQL_CLIENT_CONNECT --query=""" +CREATE OR REPLACE FUNCTION js_wait_test ( INT64 ) RETURNS INT64 LANGUAGE javascript HANDLER = 'wait' AS \$\$ +export function wait(n) { + let i = 0; + while (i < n * 1000000) { i++ } + return i; +} +\$\$; +""" + +$BENDSQL_CLIENT_CONNECT --query='select js_wait_test(100)' & +job_pid=$! +$BENDSQL_CLIENT_CONNECT --query='select js_wait_test(1)' +$BENDSQL_CLIENT_CONNECT --query='select js_wait_test(2)' +wait $job_pid diff --git a/tests/suites/5_ee/07_failsafe/08_0000_amend_table.result b/tests/suites/5_ee/07_failsafe/07_0000_amend_table.result similarity index 100% rename from tests/suites/5_ee/07_failsafe/08_0000_amend_table.result rename to tests/suites/5_ee/07_failsafe/07_0000_amend_table.result diff --git a/tests/suites/5_ee/07_failsafe/08_0000_amend_table.sh b/tests/suites/5_ee/07_failsafe/07_0000_amend_table.sh similarity index 100% rename from tests/suites/5_ee/07_failsafe/08_0000_amend_table.sh rename to tests/suites/5_ee/07_failsafe/07_0000_amend_table.sh diff --git a/tests/suites/5_ee/08_udf_python/08_0000_parallelism.result b/tests/suites/5_ee/08_udf_python/08_0000_parallelism.result new file mode 100755 index 0000000000000..040e114b163a0 --- /dev/null +++ b/tests/suites/5_ee/08_udf_python/08_0000_parallelism.result @@ -0,0 +1,3 @@ +1 +2 +100 diff --git a/tests/suites/5_ee/08_udf_python/08_0000_parallelism.sh b/tests/suites/5_ee/08_udf_python/08_0000_parallelism.sh new file mode 100755 index 0000000000000..52121fd75b44f --- /dev/null +++ b/tests/suites/5_ee/08_udf_python/08_0000_parallelism.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env bash +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. "$CURDIR"/../../../shell_env.sh + +$BENDSQL_CLIENT_CONNECT --query=""" +CREATE OR REPLACE FUNCTION python_sleep ( INT64 ) RETURNS INT64 LANGUAGE python HANDLER = 'sleep' AS \$\$ +import time + +def sleep(n): + time.sleep(n/100) + return n +\$\$; +""" + +$BENDSQL_CLIENT_CONNECT --query='select python_sleep(100)' & +job_pid=$! +$BENDSQL_CLIENT_CONNECT --query='select python_sleep(1)' +$BENDSQL_CLIENT_CONNECT --query='select python_sleep(2)' +wait $job_pid