Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support to add random seed on random engine #15167

Merged
merged 5 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 31 additions & 30 deletions src/query/expression/src/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1158,25 +1158,29 @@ impl Column {
}
}

pub fn random(ty: &DataType, len: usize) -> Self {
pub fn random(ty: &DataType, len: usize, seed: Option<u64>) -> Self {
use rand::distributions::Alphanumeric;
use rand::rngs::SmallRng;
use rand::Rng;
use rand::SeedableRng;

let mut rng = match seed {
None => SmallRng::from_entropy(),
Some(seed) => SmallRng::seed_from_u64(seed),
};
match ty {
DataType::Null => Column::Null { len },
DataType::EmptyArray => Column::EmptyArray { len },
DataType::EmptyMap => Column::EmptyMap { len },
DataType::Boolean => BooleanType::from_data(
(0..len)
.map(|_| SmallRng::from_entropy().gen_bool(0.5))
.collect_vec(),
),
DataType::Boolean => {
BooleanType::from_data((0..len).map(|_| rng.gen_bool(0.5)).collect_vec())
}
DataType::Binary => BinaryType::from_data(
(0..len)
.map(|_| {
let rng = SmallRng::from_entropy();
let rng = match seed {
None => SmallRng::from_entropy(),
Some(seed) => SmallRng::seed_from_u64(seed),
};
rng.sample_iter(&Alphanumeric)
// randomly generate 5 characters.
.take(5)
Expand All @@ -1188,7 +1192,10 @@ impl Column {
DataType::String => StringType::from_data(
(0..len)
.map(|_| {
let rng = SmallRng::from_entropy();
let rng = match seed {
None => SmallRng::from_entropy(),
Some(seed) => SmallRng::seed_from_u64(seed),
};
rng.sample_iter(&Alphanumeric)
// randomly generate 5 characters.
.take(5)
Expand All @@ -1201,55 +1208,49 @@ impl Column {
with_number_mapped_type!(|NUM_TYPE| match num_ty {
NumberDataType::NUM_TYPE => {
NumberType::<NUM_TYPE>::from_data(
(0..len)
.map(|_| SmallRng::from_entropy().gen::<NUM_TYPE>())
.collect_vec(),
(0..len).map(|_| rng.gen::<NUM_TYPE>()).collect_vec(),
)
}
})
}
DataType::Decimal(t) => match t {
DecimalDataType::Decimal128(size) => {
let values = (0..len)
.map(|_| i128::from(SmallRng::from_entropy().gen::<i16>()))
.map(|_| i128::from(rng.gen::<i16>()))
.collect::<Vec<i128>>();
Column::Decimal(DecimalColumn::Decimal128(values.into(), *size))
}
DecimalDataType::Decimal256(size) => {
let values = (0..len)
.map(|_| i256::from(SmallRng::from_entropy().gen::<i16>()))
.map(|_| i256::from(rng.gen::<i16>()))
.collect::<Vec<i256>>();
Column::Decimal(DecimalColumn::Decimal256(values.into(), *size))
}
},
DataType::Timestamp => TimestampType::from_data(
(0..len)
.map(|_| SmallRng::from_entropy().gen_range(TIMESTAMP_MIN..=TIMESTAMP_MAX))
.map(|_| rng.gen_range(TIMESTAMP_MIN..=TIMESTAMP_MAX))
.collect::<Vec<i64>>(),
),
DataType::Date => DateType::from_data(
(0..len)
.map(|_| SmallRng::from_entropy().gen_range(DATE_MIN..=DATE_MAX))
.map(|_| rng.gen_range(DATE_MIN..=DATE_MAX))
.collect::<Vec<i32>>(),
),
DataType::Nullable(ty) => Column::Nullable(Box::new(NullableColumn {
column: Column::random(ty, len),
validity: Bitmap::from(
(0..len)
.map(|_| SmallRng::from_entropy().gen_bool(0.5))
.collect::<Vec<bool>>(),
),
column: Column::random(ty, len, seed),
validity: Bitmap::from((0..len).map(|_| rng.gen_bool(0.5)).collect::<Vec<bool>>()),
})),
DataType::Array(inner_ty) => {
let mut inner_len = 0;
let mut offsets: Vec<u64> = Vec::with_capacity(len + 1);
offsets.push(0);
for _ in 0..len {
inner_len += SmallRng::from_entropy().gen_range(0..=3);
inner_len += rng.gen_range(0..=3);
offsets.push(inner_len);
}
Column::Array(Box::new(ArrayColumn {
values: Column::random(inner_ty, inner_len as usize),
values: Column::random(inner_ty, inner_len as usize, seed),
offsets: offsets.into(),
}))
}
Expand All @@ -1258,18 +1259,18 @@ impl Column {
let mut offsets: Vec<u64> = Vec::with_capacity(len + 1);
offsets.push(0);
for _ in 0..len {
inner_len += SmallRng::from_entropy().gen_range(0..=3);
inner_len += rng.gen_range(0..=3);
offsets.push(inner_len);
}
Column::Map(Box::new(ArrayColumn {
values: Column::random(inner_ty, inner_len as usize),
values: Column::random(inner_ty, inner_len as usize, seed),
offsets: offsets.into(),
}))
}
DataType::Bitmap => BitmapType::from_data(
(0..len)
.map(|_| {
let data: [u64; 4] = SmallRng::from_entropy().gen();
let data: [u64; 4] = rng.gen();
let rb = RoaringTreemap::from_iter(data.iter());
let mut buf = vec![];
rb.serialize_into(&mut buf)
Expand All @@ -1281,7 +1282,7 @@ impl Column {
DataType::Tuple(fields) => {
let fields = fields
.iter()
.map(|ty| Column::random(ty, len))
.map(|ty| Column::random(ty, len, seed))
.collect::<Vec<_>>();
Column::Tuple(fields)
}
Expand All @@ -1296,8 +1297,8 @@ impl Column {
DataType::Geometry => {
let mut data = Vec::with_capacity(len);
(0..len).for_each(|_| {
let x = SmallRng::from_entropy().gen::<f64>();
let y = SmallRng::from_entropy().gen::<f64>();
let x = rng.gen::<f64>();
let y = rng.gen::<f64>();
let val = Point::new(x, y);
data.push(
Geometry::from(val)
Expand Down
4 changes: 2 additions & 2 deletions src/query/expression/tests/it/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ pub fn test_take_and_filter_and_concat() -> databend_common_exception::Result<()
let slice_end = rng.gen_range(slice_start..len);
let slice_len = slice_end - slice_start;

let mut filter = Column::random(&DataType::Boolean, len)
let mut filter = Column::random(&DataType::Boolean, len, None)
.into_boolean()
.unwrap();
filter.slice(slice_start, slice_len);
Expand Down Expand Up @@ -405,7 +405,7 @@ pub fn test_filters() -> databend_common_exception::Result<()> {
let end = (start + rng.gen_range(1..num_rows)).min(c.num_rows() - a.num_rows());

// random filter
let mut f = Column::random(&DataType::Boolean, num_rows * blocks)
let mut f = Column::random(&DataType::Boolean, num_rows * blocks, None)
.into_boolean()
.unwrap();
f.slice(start, end - start);
Expand Down
4 changes: 2 additions & 2 deletions src/query/expression/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ fn rand_block_for_all_types(num_rows: usize) -> DataBlock {
let types = get_all_test_data_types();
let mut columns = Vec::with_capacity(types.len());
for data_type in types.iter() {
columns.push(Column::random(data_type, num_rows));
columns.push(Column::random(data_type, num_rows, None));
}

let block = DataBlock::new_from_columns(columns);
Expand All @@ -53,7 +53,7 @@ fn rand_block_for_simple_types(num_rows: usize) -> DataBlock {
let types = get_simple_types();
let mut columns = Vec::with_capacity(types.len());
for data_type in types.iter() {
columns.push(Column::random(data_type, num_rows));
columns.push(Column::random(data_type, num_rows, None));
}

let block = DataBlock::new_from_columns(columns);
Expand Down
12 changes: 12 additions & 0 deletions src/query/service/src/interpreters/interpreter_table_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use databend_storages_common_table_meta::table::OPT_KEY_CONNECTION_NAME;
use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID;
use databend_storages_common_table_meta::table::OPT_KEY_ENGINE;
use databend_storages_common_table_meta::table::OPT_KEY_LOCATION;
use databend_storages_common_table_meta::table::OPT_KEY_RANDOM_SEED;
use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION;
use databend_storages_common_table_meta::table::OPT_KEY_STORAGE_FORMAT;
use databend_storages_common_table_meta::table::OPT_KEY_STORAGE_PREFIX;
Expand Down Expand Up @@ -353,6 +354,8 @@ impl CreateTableInterpreter {
// check bloom_index_columns.
is_valid_bloom_index_columns(&table_meta.options, schema)?;
is_valid_change_tracking(&table_meta.options)?;
// check random seed
is_valid_random_seed(&table_meta.options)?;

for table_option in table_meta.options.iter() {
let key = table_option.0.to_lowercase();
Expand Down Expand Up @@ -471,6 +474,8 @@ pub static CREATE_TABLE_OPTIONS: LazyLock<HashSet<&'static str>> = LazyLock::new
r.insert(OPT_KEY_LOCATION);
r.insert(OPT_KEY_CONNECTION_NAME);

r.insert(OPT_KEY_RANDOM_SEED);

r.insert("transient");
r
});
Expand Down Expand Up @@ -533,3 +538,10 @@ pub fn is_valid_change_tracking(options: &BTreeMap<String, String>) -> Result<()
}
Ok(())
}

pub fn is_valid_random_seed(options: &BTreeMap<String, String>) -> Result<()> {
if let Some(value) = options.get(OPT_KEY_RANDOM_SEED) {
value.parse::<u64>()?;
}
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub fn test_filter_executor() -> databend_common_exception::Result<()> {

// 1. Generate a random `DataBlock`.
let columns = (0..num_columns)
.map(|_| Column::random(data_type, num_rows))
.map(|_| Column::random(data_type, num_rows, None))
.collect_vec();
let block = DataBlock::new_from_columns(columns);
block.check_valid()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,10 @@ fn replace_const_to_const_and_column_ref(
}
} else {
// Replace the child to `Constant`
let scalar = Column::random(data_type, 1).index(0).unwrap().to_owned();
let scalar = Column::random(data_type, 1, None)
.index(0)
.unwrap()
.to_owned();
Expr::Constant {
span: None,
scalar,
Expand Down
2 changes: 2 additions & 0 deletions src/query/storages/common/table_meta/src/table/table_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pub const OPT_KEY_ENGINE_META: &str = "engine_meta";
///
/// If both OPT_KEY_SNAPSHOT_LOC and OPT_KEY_SNAPSHOT_LOCATION exist, the latter will be used
pub const OPT_KEY_LEGACY_SNAPSHOT_LOC: &str = "snapshot_loc";
// the following are used in for random engine
pub const OPT_KEY_RANDOM_SEED: &str = "seed";

/// Table option keys that reserved for internal usage only
/// - Users are not allowed to specified this option keys in DDL
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/random/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ databend-common-expression = { path = "../../expression" }
databend-common-meta-app = { path = "../../../meta/app" }
databend-common-pipeline-core = { path = "../../pipeline/core" }
databend-common-pipeline-sources = { path = "../../pipeline/sources" }
databend-storages-common-table-meta = { path = "../common/table_meta" }

async-backtrace = { workspace = true }
async-trait = { workspace = true }
Expand Down
30 changes: 21 additions & 9 deletions src/query/storages/random/src/random_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,22 @@ use databend_common_pipeline_core::Pipeline;
use databend_common_pipeline_core::SourcePipeBuilder;
use databend_common_pipeline_sources::SyncSource;
use databend_common_pipeline_sources::SyncSourcer;
use databend_storages_common_table_meta::table::OPT_KEY_RANDOM_SEED;

use crate::RandomPartInfo;

pub struct RandomTable {
table_info: TableInfo,
seed: Option<u64>,
}

impl RandomTable {
pub fn try_create(table_info: TableInfo) -> Result<Box<dyn Table>> {
Ok(Box::new(Self { table_info }))
let seed = match table_info.meta.options.get(OPT_KEY_RANDOM_SEED) {
None => None,
Some(seed_str) => Some(seed_str.parse::<u64>()?),
};
Ok(Box::new(Self { table_info, seed }))
}

pub fn description() -> StorageDescription {
Expand Down Expand Up @@ -125,10 +131,8 @@ impl Table for RandomTable {
.iter()
.map(|f| {
let data_type: DataType = f.data_type().into();
BlockEntry::new(
data_type.clone(),
Value::Column(Column::random(&data_type, 1)),
)
let column = Column::random(&data_type, 1, self.seed);
BlockEntry::new(data_type.clone(), Value::Column(column))
})
.collect::<Vec<_>>();
let block = DataBlock::new(columns, 1);
Expand Down Expand Up @@ -179,15 +183,21 @@ impl Table for RandomTable {
let parts = RandomPartInfo::from_part(&plan.parts.partitions[index])?;
builder.add_source(
output.clone(),
RandomSource::create(ctx.clone(), output, output_schema.clone(), parts.rows)?,
RandomSource::create(
ctx.clone(),
output,
output_schema.clone(),
parts.rows,
self.seed,
)?,
);
}

if plan.parts.is_empty() {
let output = OutputPort::create();
builder.add_source(
output.clone(),
RandomSource::create(ctx.clone(), output, output_schema, 0)?,
RandomSource::create(ctx.clone(), output, output_schema, 0, self.seed)?,
);
}

Expand All @@ -200,6 +210,7 @@ struct RandomSource {
schema: TableSchemaRef,
/// how many rows are needed to generate
rows: usize,
seed: Option<u64>,
}

impl RandomSource {
Expand All @@ -208,8 +219,9 @@ impl RandomSource {
output: Arc<OutputPort>,
schema: TableSchemaRef,
rows: usize,
seed: Option<u64>,
) -> Result<ProcessorPtr> {
SyncSourcer::create(ctx, output, RandomSource { schema, rows })
SyncSourcer::create(ctx, output, RandomSource { schema, rows, seed })
}
}

Expand All @@ -228,7 +240,7 @@ impl SyncSource for RandomSource {
.iter()
.map(|f| {
let data_type = f.data_type().into();
let value = Value::Column(Column::random(&data_type, self.rows));
let value = Value::Column(Column::random(&data_type, self.rows, self.seed));
BlockEntry::new(data_type, value)
})
.collect();
Expand Down
2 changes: 1 addition & 1 deletion src/tests/sqlsmith/src/sql_gen/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ impl<'a, R: Rng + 'a> SqlGenerator<'a, R> {
fn gen_columns(&mut self, data_types: &[DataType], row_count: usize) -> Vec<Column> {
data_types
.iter()
.map(|ty| Column::random(ty, row_count))
.map(|ty| Column::random(ty, row_count, None))
.collect()
}
}
Loading