Skip to content

Commit

Permalink
fix(sqlsmith): completely cover all exprs (#3737)
Browse files Browse the repository at this point in the history
  • Loading branch information
neverchanje authored Jul 8, 2022
1 parent a777217 commit 7a86c4d
Show file tree
Hide file tree
Showing 13 changed files with 120 additions and 31 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 11 additions & 1 deletion e2e_test/batch/types/interval.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,14 @@ t
query T
SELECT interval '1' day - interval '12' hour = interval '12' hour;
----
t
t

query T
SELECT 1.5 * INTERVAL '3 mins';
----
00:04:30

query T
SELECT INTERVAL '3 mins' * 1.5;
----
00:04:30
12 changes: 11 additions & 1 deletion e2e_test/batch/types/time.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,14 @@ values(extract(hour from timestamp '2001-02-16 20:38:40'));
query TTTTT
select timestamp '2001-03-16 23:38:45' - timestamp '2001-02-16 20:38:40';
----
28 days 03:00:05
28 days 03:00:05

query T
select TIME '19:46:41' <= TIME '11:33:43';
----
f

query T
select TIME '19:46:41' >= TIME '11:33:43';
----
t
1 change: 1 addition & 0 deletions src/expr/src/expr/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ macro_rules! for_all_cmp_variants {
{ float64, decimal, float64, $general_f },
{ timestamp, timestamp, timestamp, $general_f },
{ interval, interval, interval, $general_f },
{ time, time, time, $general_f },
{ date, date, date, $general_f },
{ boolean, boolean, boolean, $general_f },
{ timestamp, date, timestamp, $general_f },
Expand Down
8 changes: 6 additions & 2 deletions src/frontend/src/expr/type_inference/func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,9 +468,13 @@ fn build_type_derive_map() -> FuncSigMap {
E::IsDistinctFrom,
];
build_binary_cmp_funcs(&mut map, cmp_exprs, &num_types);
build_binary_cmp_funcs(&mut map, cmp_exprs, &[T::Struct, T::List]);
build_binary_cmp_funcs(&mut map, cmp_exprs, &[T::Struct]);
build_binary_cmp_funcs(&mut map, cmp_exprs, &[T::List]);
build_binary_cmp_funcs(&mut map, cmp_exprs, &[T::Date, T::Timestamp, T::Timestampz]);
build_binary_cmp_funcs(&mut map, cmp_exprs, &[T::Time, T::Interval]);
// TODO: add support for time-interval comparison
// build_binary_cmp_funcs(&mut map, cmp_exprs, &[T::Time, T::Interval]);
build_binary_cmp_funcs(&mut map, cmp_exprs, &[T::Time]);
build_binary_cmp_funcs(&mut map, cmp_exprs, &[T::Interval]);
for e in cmp_exprs {
for t in [T::Boolean, T::Varchar] {
map.insert(*e, vec![t, t], T::Boolean);
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/handler/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,14 @@ async fn distribute_execute(

let plan = root.gen_batch_query_plan()?;

info!(
tracing::trace!(
"Generated distributed plan: {:?}",
plan.explain_to_string()?
);

let plan_fragmenter = BatchPlanFragmenter::new(session.env().worker_node_manager_ref());
let query = plan_fragmenter.split(plan)?;
info!("Generated query after plan fragmenter: {:?}", &query);
tracing::trace!("Generated query after plan fragmenter: {:?}", &query);
(query, pg_descs)
};

Expand Down Expand Up @@ -137,7 +137,7 @@ fn local_execute(

let plan_fragmenter = BatchPlanFragmenter::new(session.env().worker_node_manager_ref());
let query = plan_fragmenter.split(plan)?;
info!("Generated query after plan fragmenter: {:?}", &query);
tracing::trace!("Generated query after plan fragmenter: {:?}", &query);
(query, pg_descs)
};

Expand Down
22 changes: 13 additions & 9 deletions src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,10 @@ impl QueryExecution {
.await
.map_err(|e| anyhow!("Starting query execution failed: {:?}", e))??;

info!(
tracing::trace!(
"Received root stage query result fetcher: {:?}, query id: {:?}",
root_stage, self.query.query_id
root_stage,
self.query.query_id
);

*state = QueryState::Running {
Expand Down Expand Up @@ -211,17 +212,19 @@ impl QueryRunner {
let leaf_stages = self.query.leaf_stages();
for stage_id in &leaf_stages {
// TODO: We should not return error here, we should abort query.
info!(
tracing::trace!(
"Starting query stage: {:?}-{:?}",
self.query.query_id, stage_id
self.query.query_id,
stage_id
);
self.stage_executions[stage_id].start().await.map_err(|e| {
error!("Failed to start stage: {}, reason: {:?}", stage_id, e);
e
})?;
info!(
tracing::trace!(
"Query stage {:?}-{:?} started.",
self.query.query_id, stage_id
self.query.query_id,
stage_id
);
}
let mut stages_with_table_scan = self.query.stages_with_table_scan();
Expand All @@ -230,17 +233,18 @@ impl QueryRunner {
while let Some(msg) = self.msg_receiver.recv().await {
match msg {
Stage(Scheduled(stage_id)) => {
info!(
tracing::trace!(
"Query stage {:?}-{:?} scheduled.",
self.query.query_id, stage_id
self.query.query_id,
stage_id
);
self.scheduled_stages_count += 1;
stages_with_table_scan.remove(&stage_id);
if stages_with_table_scan.is_empty() {
// We can be sure here that all the Hummock iterators have been created,
// thus they all successfully pinned a HummockVersion.
// So we can now unpin their epoch.
info!("Query {:?} has scheduled all of its stages that have table scan (iterator creation).", self.query.query_id);
tracing::trace!("Query {:?} has scheduled all of its stages that have table scan (iterator creation).", self.query.query_id);
self.hummock_snapshot_manager
.unpin_snapshot(self.epoch, self.query.query_id())
.await?;
Expand Down
7 changes: 4 additions & 3 deletions src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use tokio::spawn;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tracing::{error, info};
use tracing::error;
use uuid::Uuid;
use StageEvent::Failed;

Expand Down Expand Up @@ -188,9 +188,10 @@ impl StageExecution {
_ => {
// This is possible since we notify stage schedule event to query runner, which may
// receive multi events and start stage multi times.
info!(
tracing::trace!(
"Staged {:?}-{:?} already started, skipping.",
&self.stage.query_id, &self.stage.id
&self.stage.query_id,
&self.stage.id
);
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions src/tests/sqlsmith/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ anyhow = { version = "1", features = ["backtrace"] }
chrono = "0.4"
clap = { version = "3", features = ["derive"] }
env_logger = { version = "0.9" }
itertools = "0.10"
lazy_static = "1"
log = "0.4"
madsim = "=0.2.0-alpha.3"
Expand Down
11 changes: 10 additions & 1 deletion src/tests/sqlsmith/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,14 @@ In the second mode, it will test the entire query handling end-to-end. We provid

```sh
cd risingwave
./target/debug/sqlsmith --testdata ./src/tests/sqlsmith/tests/testdata
./target/debug/sqlsmith test --testdata ./src/tests/sqlsmith/tests/testdata
```

Additionally, in some cases where you may want to debug whether we have defined some function/operator incorrectly,
you can try:

```sh
./target/debug/sqlsmith print-function-table > ft.txt
```

Check out ft.txt that will contain all the function signatures.
46 changes: 41 additions & 5 deletions src/tests/sqlsmith/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@ use std::time::Duration;
use clap::Parser as ClapParser;
use risingwave_sqlparser::ast::Statement;
use risingwave_sqlparser::parser::Parser;
use risingwave_sqlsmith::{sql_gen, Table};
use risingwave_sqlsmith::{print_function_table, sql_gen, Table};
use tokio_postgres::NoTls;

#[derive(ClapParser, Debug, Clone)]
#[clap(about, version, author)]
struct Opt {
#[clap(subcommand)]
command: Commands,
}

#[derive(clap::Args, Clone, Debug)]
struct TestOptions {
/// The database server host.
#[clap(short, long, default_value = "localhost")]
#[clap(long, default_value = "localhost")]
host: String,

/// The database server port.
Expand All @@ -47,9 +53,23 @@ struct Opt {
/// Path to the testing data files.
#[clap(short, long)]
testdata: String,

/// The number of test cases to generate.
#[clap(long, default_value = "1000")]
count: usize,
}

#[derive(clap::Subcommand, Clone, Debug)]
enum Commands {
/// Prints the currently supported function/operator table.
#[clap(name = "print-function-table")]
PrintFunctionTable,

/// Run testing.
Test(TestOptions),
}

async fn create_tables(opt: &Opt, client: &tokio_postgres::Client) -> Vec<Table> {
async fn create_tables(opt: &TestOptions, client: &tokio_postgres::Client) -> Vec<Table> {
log::info!("Preparing tables...");

let sql = std::fs::read_to_string(format!("{}/tpch.sql", opt.testdata)).unwrap();
Expand All @@ -72,11 +92,26 @@ async fn create_tables(opt: &Opt, client: &tokio_postgres::Client) -> Vec<Table>
.collect()
}

async fn drop_tables(opt: &TestOptions, client: &tokio_postgres::Client) {
log::info!("Cleaning tables...");
let sql = std::fs::read_to_string(format!("{}/drop_tpch.sql", opt.testdata)).unwrap();
for stmt in sql.lines() {
client.execute(stmt, &[]).await.unwrap();
}
}

#[tokio::main(flavor = "multi_thread", worker_threads = 5)]
async fn main() {
env_logger::init();

let opt = Opt::parse();
let opt = match opt.command {
Commands::PrintFunctionTable => {
println!("{}", print_function_table());
return;
}
Commands::Test(test_opts) => test_opts,
};
let (client, connection) = tokio_postgres::Config::new()
.host(&opt.host)
.port(opt.port)
Expand All @@ -96,13 +131,14 @@ async fn main() {
let tables = create_tables(&opt, &client).await;

let mut rng = rand::thread_rng();

for _ in 0..100 {
for _ in 0..opt.count {
let sql = sql_gen(&mut rng, tables.clone());
log::info!("Executing: {}", sql);
let _ = client
.query(sql.as_str(), &[])
.await
.unwrap_or_else(|e| panic!("Failed to execute query: {}\n{}", e, sql));
}

drop_tables(&opt, &client).await;
}
17 changes: 17 additions & 0 deletions src/tests/sqlsmith/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::HashMap;

use itertools::Itertools;
use rand::seq::SliceRandom;
use rand::Rng;
use risingwave_frontend::expr::{func_sigs, DataTypeName, ExprType, FuncSign};
Expand Down Expand Up @@ -222,3 +223,19 @@ fn make_bin_op(func: ExprType, exprs: &[Expr]) -> Option<Expr> {
pub(crate) fn sql_null() -> Expr {
Expr::Value(Value::Null)
}

pub fn print_function_table() -> String {
func_sigs()
.map(|sign| {
format!(
"{:?}({}) -> {:?}",
sign.func,
sign.inputs_type
.iter()
.map(|arg| format!("{:?}", arg))
.join(", "),
sign.ret_type,
)
})
.join("\n")
}
7 changes: 1 addition & 6 deletions src/tests/sqlsmith/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use risingwave_sqlparser::ast::{
};

mod expr;
pub use expr::print_function_table;
mod relation;
mod scalar;

Expand Down Expand Up @@ -130,7 +131,6 @@ impl<'a, R: Rng> SqlGenerator<'a, R> {
fn gen_select_stmt(&mut self) -> (Select, Vec<Column>) {
// Generate random tables/relations first so that select items can refer to them.
let from = self.gen_from();
let rel_num = from.len();
let (select_list, schema) = self.gen_select_list();
let select = Select {
distinct: false,
Expand All @@ -141,11 +141,6 @@ impl<'a, R: Rng> SqlGenerator<'a, R> {
group_by: self.gen_group_by(),
having: self.gen_having(),
};
// The relations used in the inner query can not be used in the outer query.
(0..rel_num).for_each(|_| {
let rel = self.bound_relations.pop();
assert!(rel.is_some());
});
(select, schema)
}

Expand Down

0 comments on commit 7a86c4d

Please sign in to comment.