Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support to add random seed on random engine #15167

Merged
merged 5 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

154 changes: 153 additions & 1 deletion src/query/expression/src/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1163,7 +1163,6 @@ impl Column {
use rand::rngs::SmallRng;
use rand::Rng;
use rand::SeedableRng;

match ty {
DataType::Null => Column::Null { len },
DataType::EmptyArray => Column::EmptyArray { len },
Expand Down Expand Up @@ -1311,6 +1310,159 @@ impl Column {
}
}

pub fn random_with_seed(ty: &DataType, len: usize, seed: u64) -> Self {
blackstar-baba marked this conversation as resolved.
Show resolved Hide resolved
use rand::distributions::Alphanumeric;
use rand::rngs::SmallRng;
use rand::Rng;
use rand::SeedableRng;
// wo need to generate random value with seed
match ty {
DataType::Null => Column::Null { len },
DataType::EmptyArray => Column::EmptyArray { len },
DataType::EmptyMap => Column::EmptyMap { len },
DataType::Boolean => BooleanType::from_data(
(0..len)
.map(|_| SmallRng::seed_from_u64(seed).gen_bool(0.5))
.collect_vec(),
),
DataType::Binary => BinaryType::from_data(
(0..len)
.map(|_| {
let rng = SmallRng::seed_from_u64(seed);
rng.sample_iter(&Alphanumeric)
// randomly generate 5 characters.
.take(5)
.map(u8::from)
.collect::<Vec<_>>()
})
.collect_vec(),
),
DataType::String => StringType::from_data(
(0..len)
.map(|_| {
let rng = SmallRng::seed_from_u64(seed);
rng.sample_iter(&Alphanumeric)
// randomly generate 5 characters.
.take(5)
.map(char::from)
.collect::<String>()
})
.collect_vec(),
),
DataType::Number(num_ty) => {
with_number_mapped_type!(|NUM_TYPE| match num_ty {
NumberDataType::NUM_TYPE => {
NumberType::<NUM_TYPE>::from_data(
(0..len)
.map(|_| SmallRng::seed_from_u64(seed).gen::<NUM_TYPE>())
.collect_vec(),
)
}
})
}
DataType::Decimal(t) => match t {
DecimalDataType::Decimal128(size) => {
let values = (0..len)
.map(|_| i128::from(SmallRng::seed_from_u64(seed).gen::<i16>()))
.collect::<Vec<i128>>();
Column::Decimal(DecimalColumn::Decimal128(values.into(), *size))
}
DecimalDataType::Decimal256(size) => {
let values = (0..len)
.map(|_| i256::from(SmallRng::seed_from_u64(seed).gen::<i16>()))
.collect::<Vec<i256>>();
Column::Decimal(DecimalColumn::Decimal256(values.into(), *size))
}
},
DataType::Timestamp => TimestampType::from_data(
(0..len)
.map(|_| SmallRng::seed_from_u64(seed).gen_range(TIMESTAMP_MIN..=TIMESTAMP_MAX))
.collect::<Vec<i64>>(),
),
DataType::Date => DateType::from_data(
(0..len)
.map(|_| SmallRng::seed_from_u64(seed).gen_range(DATE_MIN..=DATE_MAX))
.collect::<Vec<i32>>(),
),
DataType::Nullable(ty) => Column::Nullable(Box::new(NullableColumn {
column: Column::random(ty, len),
validity: Bitmap::from(
(0..len)
.map(|_| SmallRng::seed_from_u64(seed).gen_bool(0.5))
.collect::<Vec<bool>>(),
),
})),
DataType::Array(inner_ty) => {
let mut inner_len = 0;
let mut offsets: Vec<u64> = Vec::with_capacity(len + 1);
offsets.push(0);
for _ in 0..len {
inner_len += SmallRng::seed_from_u64(seed).gen_range(0..=3);
offsets.push(inner_len);
}
Column::Array(Box::new(ArrayColumn {
values: Column::random_with_seed(inner_ty, inner_len as usize, seed),
offsets: offsets.into(),
}))
}
DataType::Map(inner_ty) => {
let mut inner_len = 0;
let mut offsets: Vec<u64> = Vec::with_capacity(len + 1);
offsets.push(0);
for _ in 0..len {
inner_len += SmallRng::seed_from_u64(seed).gen_range(0..=3);
offsets.push(inner_len);
}
Column::Map(Box::new(ArrayColumn {
values: Column::random_with_seed(inner_ty, inner_len as usize, seed),
offsets: offsets.into(),
}))
}
DataType::Bitmap => BitmapType::from_data(
(0..len)
.map(|_| {
let data: [u64; 4] = SmallRng::seed_from_u64(seed).gen();
let rb = RoaringTreemap::from_iter(data.iter());
let mut buf = vec![];
rb.serialize_into(&mut buf)
.expect("failed serialize roaring treemap");
buf
})
.collect_vec(),
),
DataType::Tuple(fields) => {
let fields = fields
.iter()
.map(|ty| Column::random_with_seed(ty, len, seed))
.collect::<Vec<_>>();
Column::Tuple(fields)
}
DataType::Variant => {
let mut data = Vec::with_capacity(len);
for _ in 0..len {
let val = jsonb::rand_value();
data.push(val.to_vec());
}
VariantType::from_data(data)
}
DataType::Geometry => {
let mut data = Vec::with_capacity(len);
(0..len).for_each(|_| {
let x = SmallRng::seed_from_u64(seed).gen::<f64>();
let y = SmallRng::seed_from_u64(seed).gen::<f64>();
let val = Point::new(x, y);
data.push(
Geometry::from(val)
.to_ewkb(CoordDimensions::xy(), None)
.unwrap(),
);
});
GeometryType::from_data(data)
}
DataType::Generic(_) => unreachable!(),
}
}

pub fn remove_nullable(&self) -> Self {
match self {
Column::Nullable(inner) => inner.column.clone(),
Expand Down
12 changes: 12 additions & 0 deletions src/query/service/src/interpreters/interpreter_table_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use databend_storages_common_table_meta::table::OPT_KEY_CONNECTION_NAME;
use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID;
use databend_storages_common_table_meta::table::OPT_KEY_ENGINE;
use databend_storages_common_table_meta::table::OPT_KEY_LOCATION;
use databend_storages_common_table_meta::table::OPT_KEY_RANDOM_SEED;
use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION;
use databend_storages_common_table_meta::table::OPT_KEY_STORAGE_FORMAT;
use databend_storages_common_table_meta::table::OPT_KEY_STORAGE_PREFIX;
Expand Down Expand Up @@ -353,6 +354,8 @@ impl CreateTableInterpreter {
// check bloom_index_columns.
is_valid_bloom_index_columns(&table_meta.options, schema)?;
is_valid_change_tracking(&table_meta.options)?;
// check random seed
is_valid_random_seed(&table_meta.options)?;

for table_option in table_meta.options.iter() {
let key = table_option.0.to_lowercase();
Expand Down Expand Up @@ -471,6 +474,8 @@ pub static CREATE_TABLE_OPTIONS: LazyLock<HashSet<&'static str>> = LazyLock::new
r.insert(OPT_KEY_LOCATION);
r.insert(OPT_KEY_CONNECTION_NAME);

r.insert(OPT_KEY_RANDOM_SEED);

r.insert("transient");
r
});
Expand Down Expand Up @@ -533,3 +538,10 @@ pub fn is_valid_change_tracking(options: &BTreeMap<String, String>) -> Result<()
}
Ok(())
}

pub fn is_valid_random_seed(options: &BTreeMap<String, String>) -> Result<()> {
if let Some(value) = options.get(OPT_KEY_RANDOM_SEED) {
value.parse::<u64>()?;
}
Ok(())
}
2 changes: 2 additions & 0 deletions src/query/storages/common/table_meta/src/table/table_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pub const OPT_KEY_ENGINE_META: &str = "engine_meta";
///
/// If both OPT_KEY_SNAPSHOT_LOC and OPT_KEY_SNAPSHOT_LOCATION exist, the latter will be used
pub const OPT_KEY_LEGACY_SNAPSHOT_LOC: &str = "snapshot_loc";
// the following are used in for random engine
pub const OPT_KEY_RANDOM_SEED: &str = "seed";

/// Table option keys that reserved for internal usage only
/// - Users are not allowed to specified this option keys in DDL
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/random/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ databend-common-expression = { path = "../../expression" }
databend-common-meta-app = { path = "../../../meta/app" }
databend-common-pipeline-core = { path = "../../pipeline/core" }
databend-common-pipeline-sources = { path = "../../pipeline/sources" }
databend-storages-common-table-meta = { path = "../common/table_meta" }

async-backtrace = { workspace = true }
async-trait = { workspace = true }
Expand Down
38 changes: 29 additions & 9 deletions src/query/storages/random/src/random_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,22 @@ use databend_common_pipeline_core::Pipeline;
use databend_common_pipeline_core::SourcePipeBuilder;
use databend_common_pipeline_sources::SyncSource;
use databend_common_pipeline_sources::SyncSourcer;
use databend_storages_common_table_meta::table::OPT_KEY_RANDOM_SEED;

use crate::RandomPartInfo;

pub struct RandomTable {
table_info: TableInfo,
seed: Option<u64>,
}

impl RandomTable {
pub fn try_create(table_info: TableInfo) -> Result<Box<dyn Table>> {
Ok(Box::new(Self { table_info }))
let seed = match table_info.meta.options.get(OPT_KEY_RANDOM_SEED) {
None => None,
Some(seed_str) => Some(seed_str.parse::<u64>()?),
};
Ok(Box::new(Self { table_info, seed }))
}

pub fn description() -> StorageDescription {
Expand Down Expand Up @@ -125,10 +131,11 @@ impl Table for RandomTable {
.iter()
.map(|f| {
let data_type: DataType = f.data_type().into();
BlockEntry::new(
data_type.clone(),
Value::Column(Column::random(&data_type, 1)),
)
let column = match self.seed {
None => Column::random(&data_type, 1),
Some(seed) => Column::random_with_seed(&data_type, 1, seed),
};
BlockEntry::new(data_type.clone(), Value::Column(column))
})
.collect::<Vec<_>>();
let block = DataBlock::new(columns, 1);
Expand Down Expand Up @@ -179,15 +186,21 @@ impl Table for RandomTable {
let parts = RandomPartInfo::from_part(&plan.parts.partitions[index])?;
builder.add_source(
output.clone(),
RandomSource::create(ctx.clone(), output, output_schema.clone(), parts.rows)?,
RandomSource::create(
ctx.clone(),
output,
output_schema.clone(),
parts.rows,
self.seed.clone(),
)?,
);
}

if plan.parts.is_empty() {
let output = OutputPort::create();
builder.add_source(
output.clone(),
RandomSource::create(ctx.clone(), output, output_schema, 0)?,
RandomSource::create(ctx.clone(), output, output_schema, 0, self.seed.clone())?,
);
}

Expand All @@ -200,6 +213,7 @@ struct RandomSource {
schema: TableSchemaRef,
/// how many rows are needed to generate
rows: usize,
seed: Option<u64>,
}

impl RandomSource {
Expand All @@ -208,8 +222,9 @@ impl RandomSource {
output: Arc<OutputPort>,
schema: TableSchemaRef,
rows: usize,
seed: Option<u64>,
) -> Result<ProcessorPtr> {
SyncSourcer::create(ctx, output, RandomSource { schema, rows })
SyncSourcer::create(ctx, output, RandomSource { schema, rows, seed })
}
}

Expand All @@ -228,7 +243,12 @@ impl SyncSource for RandomSource {
.iter()
.map(|f| {
let data_type = f.data_type().into();
let value = Value::Column(Column::random(&data_type, self.rows));
let value = match self.seed {
None => Value::Column(Column::random(&data_type, self.rows)),
Some(seed) => {
Value::Column(Column::random_with_seed(&data_type, self.rows, seed))
}
};
BlockEntry::new(data_type, value)
})
.collect();
Expand Down
Loading