diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 8b6e8fa9775f0..d87e89c1cf65d 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -93,6 +93,7 @@ opendal = { workspace = true, features = [ "services-gcs", "services-memory", "services-s3", + "services-webhdfs", ] } openssl = "0.10" parking_lot = { workspace = true } diff --git a/src/connector/src/sink/file_sink/mod.rs b/src/connector/src/sink/file_sink/mod.rs index fe25df4a5d1eb..f32c6812f3c00 100644 --- a/src/connector/src/sink/file_sink/mod.rs +++ b/src/connector/src/sink/file_sink/mod.rs @@ -17,3 +17,4 @@ pub mod fs; pub mod gcs; pub mod opendal_sink; pub mod s3; +pub mod webhdfs; diff --git a/src/connector/src/sink/file_sink/opendal_sink.rs b/src/connector/src/sink/file_sink/opendal_sink.rs index 1f6ec2b635fe9..1fd461015b4ba 100644 --- a/src/connector/src/sink/file_sink/opendal_sink.rs +++ b/src/connector/src/sink/file_sink/opendal_sink.rs @@ -87,6 +87,7 @@ pub enum EngineType { S3, Fs, Azblob, + Webhdfs, } impl Sink for FileSink { diff --git a/src/connector/src/sink/file_sink/webhdfs.rs b/src/connector/src/sink/file_sink/webhdfs.rs new file mode 100644 index 0000000000000..b41a27e12db1f --- /dev/null +++ b/src/connector/src/sink/file_sink/webhdfs.rs @@ -0,0 +1,107 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use std::collections::{BTreeMap, HashMap}; + +use anyhow::anyhow; +use opendal::layers::LoggingLayer; +use opendal::services::Webhdfs; +use opendal::Operator; +use serde::Deserialize; +use serde_with::serde_as; +use with_options::WithOptions; + +use super::opendal_sink::FileSink; +use crate::sink::file_sink::opendal_sink::OpendalSinkBackend; +use crate::sink::{Result, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; +use crate::source::UnknownFields; +#[derive(Deserialize, Debug, Clone, WithOptions)] +pub struct WebhdfsCommon { + #[serde(rename = "webhdfs.endpoint")] + pub endpoint: String, + /// The directory where the sink file is located. + #[serde(rename = "webhdfs.path")] + pub path: String, +} + +#[serde_as] +#[derive(Clone, Debug, Deserialize, WithOptions)] +pub struct WebhdfsConfig { + #[serde(flatten)] + pub common: WebhdfsCommon, + + pub r#type: String, // accept "append-only" + + #[serde(flatten)] + pub unknown_fields: HashMap, +} + +pub const WEBHDFS_SINK: &str = "webhdfs"; + +impl FileSink { + pub fn new_webhdfs_sink(config: WebhdfsConfig) -> Result { + // Create webhdfs backend builder. + let mut builder = Webhdfs::default(); + // Set the name node for hdfs. + builder.endpoint(&config.common.endpoint); + builder.root(&config.common.path); + + let operator: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .finish(); + + Ok(operator) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct WebhdfsSink; + +impl UnknownFields for WebhdfsConfig { + fn unknown_fields(&self) -> HashMap { + self.unknown_fields.clone() + } +} + +impl OpendalSinkBackend for WebhdfsSink { + type Properties = WebhdfsConfig; + + const SINK_NAME: &'static str = WEBHDFS_SINK; + + fn from_btreemap(btree_map: BTreeMap) -> Result { + let config = + serde_json::from_value::(serde_json::to_value(btree_map).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e)))?; + if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT { + return Err(SinkError::Config(anyhow!( + "`{}` must be {}, or {}", + SINK_TYPE_OPTION, + SINK_TYPE_APPEND_ONLY, + SINK_TYPE_UPSERT + ))); + } + Ok(config) + } + + fn new_operator(properties: WebhdfsConfig) -> Result { + FileSink::::new_webhdfs_sink(properties) + } + + fn get_path(properties: Self::Properties) -> String { + properties.common.path + } + + fn get_engine_type() -> super::opendal_sink::EngineType { + super::opendal_sink::EngineType::Webhdfs + } +} diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index b734551080296..dafbc856207a9 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -105,6 +105,7 @@ macro_rules! for_all_sinks { { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink> }, { Azblob, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::azblob::AzblobSink>}, + { Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>}, { Fs, $crate::sink::file_sink::opendal_sink::FileSink }, { Snowflake, $crate::sink::snowflake::SnowflakeSink }, diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 6b661139e13e7..9d704d31942e9 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -1022,3 +1022,15 @@ StarrocksConfig: - name: r#type field_type: String required: true +WebhdfsConfig: + fields: + - name: webhdfs.endpoint + field_type: String + required: true + - name: webhdfs.path + field_type: String + comments: The directory where the sink file is located. + required: true + - name: r#type + field_type: String + required: true diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index f0c1ed074ed1e..7ef118891865e 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -874,6 +874,7 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock::SINK_NAME => hashmap!( Format::Plain => vec![Encode::Parquet], ), + FileSink::::SINK_NAME => hashmap!( + Format::Plain => vec![Encode::Parquet], + ), FileSink::::SINK_NAME => hashmap!( Format::Plain => vec![Encode::Parquet], ),