Skip to content

Commit

Permalink
fix(query): backport to 680 stick the created_by infos in parquet wri…
Browse files Browse the repository at this point in the history
…ter (#17223)

* fix(query): stick the created_by infos in parquet writer (#17220)

* fix(query): stick the created_by infos in parquet writer

* fix(query): stick the created_by infos in parquet writer

* fix(query): stick the created_by infos in parquet writer

* fix(query): stick the created_by infos in parquet writer

* fix(query): stick the created_by infos in parquet writer

* fix(query): fix register function working with nullable scalar (#17217)

* fix(query): fix register function working with nullable scalar

* fix(query): fix register function working with nullable scalar

* fix(query): increase pool

* Update 19_0005_fuzz_cte.sh

* Update mysql_source.rs

* fix(query): fix register function working with nullable scalar

* fix(query): rollback tests
  • Loading branch information
sundy-li authored Jan 9, 2025
1 parent 5cd121f commit 749610a
Show file tree
Hide file tree
Showing 20 changed files with 185 additions and 51 deletions.
25 changes: 9 additions & 16 deletions src/query/expression/src/register_vectorize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,7 @@ pub fn passthrough_nullable_1_arg<I1: ArgType, O: ArgType>(

match out {
Value::Column(out) => Value::Column(NullableColumn::new(out, args_validity)),
Value::Scalar(out) if args_validity.get_bit(0) => Value::Scalar(Some(out)),
_ => Value::Scalar(None),
Value::Scalar(out) => Value::Scalar(Some(out)),
}
}
_ => Value::Scalar(None),
Expand All @@ -308,15 +307,15 @@ pub fn passthrough_nullable_2_arg<I1: ArgType, I2: ArgType, O: ArgType>(
if let Some(validity) = ctx.validity.as_ref() {
args_validity = &args_validity & validity;
}

ctx.validity = Some(args_validity.clone());
match (arg1.value(), arg2.value()) {
(Some(arg1), Some(arg2)) => {
let out = func(arg1, arg2, ctx);

match out {
Value::Column(out) => Value::Column(NullableColumn::new(out, args_validity)),
Value::Scalar(out) if args_validity.get_bit(0) => Value::Scalar(Some(out)),
_ => Value::Scalar(None),
Value::Scalar(out) => Value::Scalar(Some(out)),
}
}
_ => Value::Scalar(None),
Expand Down Expand Up @@ -352,8 +351,7 @@ pub fn passthrough_nullable_3_arg<I1: ArgType, I2: ArgType, I3: ArgType, O: ArgT

match out {
Value::Column(out) => Value::Column(NullableColumn::new(out, args_validity)),
Value::Scalar(out) if args_validity.get_bit(0) => Value::Scalar(Some(out)),
_ => Value::Scalar(None),
Value::Scalar(out) => Value::Scalar(Some(out)),
}
}
_ => Value::Scalar(None),
Expand Down Expand Up @@ -397,8 +395,7 @@ pub fn passthrough_nullable_4_arg<

match out {
Value::Column(out) => Value::Column(NullableColumn::new(out, args_validity)),
Value::Scalar(out) if args_validity.get_bit(0) => Value::Scalar(Some(out)),
_ => Value::Scalar(None),
Value::Scalar(out) => Value::Scalar(Some(out)),
}
}
_ => Value::Scalar(None),
Expand Down Expand Up @@ -427,8 +424,7 @@ pub fn combine_nullable_1_arg<I1: ArgType, O: ArgType>(
out.column,
&args_validity & &out.validity,
)),
Value::Scalar(out) if args_validity.get_bit(0) => Value::Scalar(out),
_ => Value::Scalar(None),
Value::Scalar(out) => Value::Scalar(out),
}
}
_ => Value::Scalar(None),
Expand Down Expand Up @@ -465,8 +461,7 @@ pub fn combine_nullable_2_arg<I1: ArgType, I2: ArgType, O: ArgType>(
out.column,
&args_validity & &out.validity,
)),
Value::Scalar(out) if args_validity.get_bit(0) => Value::Scalar(out),
_ => Value::Scalar(None),
Value::Scalar(out) => Value::Scalar(out),
}
}
_ => Value::Scalar(None),
Expand Down Expand Up @@ -505,8 +500,7 @@ pub fn combine_nullable_3_arg<I1: ArgType, I2: ArgType, I3: ArgType, O: ArgType>
out.column,
&args_validity & &out.validity,
)),
Value::Scalar(out) if args_validity.get_bit(0) => Value::Scalar(out),
_ => Value::Scalar(None),
Value::Scalar(out) => Value::Scalar(out),
}
}
_ => Value::Scalar(None),
Expand Down Expand Up @@ -552,8 +546,7 @@ pub fn combine_nullable_4_arg<I1: ArgType, I2: ArgType, I3: ArgType, I4: ArgType
out.column,
&args_validity & &out.validity,
)),
Value::Scalar(out) if args_validity.get_bit(0) => Value::Scalar(out),
_ => Value::Scalar(None),
Value::Scalar(out) => Value::Scalar(out),
}
}
_ => Value::Scalar(None),
Expand Down
40 changes: 40 additions & 0 deletions src/query/functions/tests/it/scalars/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,43 @@ fn list_all_builtin_functions() {
fn check_ambiguity() {
BUILTIN_FUNCTIONS.check_ambiguity()
}

#[test]
fn test_if_function() -> Result<()> {
use databend_common_expression::types::*;
use databend_common_expression::FromData;
use databend_common_expression::Scalar;
let raw_expr = parser::parse_raw_expr("if(eq(n,1), sum_sid + 1,100)", &[
("n", UInt8Type::data_type()),
("sum_sid", Int32Type::data_type().wrap_nullable()),
]);
let expr = type_check::check(&raw_expr, &BUILTIN_FUNCTIONS)?;
let block = DataBlock::new(
vec![
BlockEntry {
data_type: UInt8Type::data_type(),
value: Value::Column(UInt8Type::from_data(vec![2_u8, 1])),
},
BlockEntry {
data_type: Int32Type::data_type().wrap_nullable(),
value: Value::Scalar(Scalar::Number(NumberScalar::Int32(2400_i32))),
},
],
2,
);
let func_ctx = FunctionContext::default();
let evaluator = Evaluator::new(&block, &func_ctx, &BUILTIN_FUNCTIONS);
let result = evaluator.run(&expr).unwrap();
let result = result
.as_column()
.unwrap()
.clone()
.as_nullable()
.unwrap()
.clone();

let bm = Bitmap::from_iter([true, true]);
assert_eq!(result.validity, bm);
assert_eq!(result.column, Int64Type::from_data(vec![100, 2401]));
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,35 @@ pub struct ParquetFileWriter {
const MAX_BUFFER_SIZE: usize = 64 * 1024 * 1024;
// this is number of rows, not size
const MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
const CREATE_BY_LEN: usize = 24; // "Databend 1.2.333-nightly".len();

fn create_writer(
arrow_schema: Arc<Schema>,
targe_file_size: Option<usize>,
) -> Result<ArrowWriter<Vec<u8>>> {
// example: 1.2.333-nightly
// tags may contain other items like `1.2.680-p2`, we will fill it with `1.2.680-p2.....`
let mut create_by = format!(
"Databend {}.{}.{}-{:.<7}",
DATABEND_SEMVER.major,
DATABEND_SEMVER.minor,
DATABEND_SEMVER.patch,
DATABEND_SEMVER.pre.as_str()
);

if create_by.len() != CREATE_BY_LEN {
create_by = format!("{:.<24}", create_by);
create_by.truncate(24);
}

let props = WriterProperties::builder()
.set_compression(TableCompression::Zstd.into())
.set_max_row_group_size(MAX_ROW_GROUP_SIZE)
.set_encoding(Encoding::PLAIN)
.set_dictionary_enabled(false)
.set_statistics_enabled(EnabledStatistics::Chunk)
.set_bloom_filter_enabled(false)
.set_created_by(format!("Databend {}", *DATABEND_SEMVER))
.set_created_by(create_by)
.build();
let buf_size = match targe_file_size {
Some(n) if n < MAX_BUFFER_SIZE => n,
Expand Down
61 changes: 60 additions & 1 deletion tests/sqllogictests/suites/query/cte/basic_r_cte.test
Original file line number Diff line number Diff line change
Expand Up @@ -227,5 +227,64 @@ select cte1.a from cte1;
8
9


statement ok
create table train(
train_id varchar(8) not null ,
departure_station varchar(32) not null,
arrival_station varchar(32) not null,
seat_count int not null
);

statement ok
create table passenger(
passenger_id varchar(16) not null,
departure_station varchar(32) not null,
arrival_station varchar(32) not null
);

statement ok
create table city(city varchar(32));

statement ok
insert into city
with t as (select 1 n union select 2 union select 3 union select 4 union select 5)
,t1 as(select row_number()over() rn from t ,t t2,t t3)
select concat('城市',rn::varchar) city from t1 where rn<=5;

statement ok
insert into train
select concat('G',row_number()over()::varchar),c1.city,c2.city, n from city c1, city c2, (select 600 n union select 800 union select 1200 union select 1600) a ;

statement ok
insert into passenger
select concat('P',substr((100000000+row_number()over())::varchar,2)),c1.city,c2.city from city c1, city c2 ,city c3, city c4, city c5,
city c6, (select 1 n union select 2 union select 3 union select 4) c7,(select 1 n union select 2) c8;


query III
with
t0 as (
select
train_id,
seat_count,
sum(seat_count) over (
partition by departure_station, arrival_station order by train_id
) ::int sum_sid
from
train
)
select
sum(case when n=1 then sum_sid+1 else 0 end::int),
sum(sum_sid),
sum(seat_count)
from
t0,(select 1 n union all select 2);
----
261700 523200 210000

statement ok
use default;

statement ok
drop table t1;
drop database db;
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ remove @data/unload/parquet/null_if/
query
copy into @data/unload/parquet/null_if from string
----
3 56 379
3 56 387

statement ok
drop file format if exists parquet_null_if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ remove @data/parquet/unload/uuid
query
copy into @data/parquet/unload/uuid/ from (select 1 as a) file_format = (type = parquet)
----
1 1 366
1 1 374

query error column id doesn't exist
copy into t_uuid from @data/parquet/unload/uuid file_format = (type = parquet) RETURN_FAILED_ONLY=TRUE
Expand All @@ -22,7 +22,7 @@ select * from t_uuid
query
copy into @data/parquet/unload/uuid/ from (select 1 as a) file_format = (type = parquet)
----
1 1 366
1 1 374

statement ok
truncate table t_uuid
Expand Down
2 changes: 1 addition & 1 deletion tests/suites/0_stateless/05_hints/05_0001_set_var.result
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ America/Toronto
1
2022-02-02 03:00:00
2022-02-02 03:00:00
1 13 419
1 13 427
Asia/Shanghai
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
1
200
=== test stage ===
1 8 392
1 8 400
0
=== test udf ===
2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ Error: APIError: QueryFailed: [1063]Permission denied: privilege READ is require
Error: APIError: QueryFailed: [1063]Permission denied: No privilege on database root_db for user b.
Error: APIError: QueryFailed: [1063]Permission denied: No privilege on table root_table for user b.
Error: APIError: QueryFailed: [1063]Permission denied: No privilege on table root_table for user b.
1 1 366
1 1 374
Error: APIError: QueryFailed: [1063]Permission denied: privilege [Select] is required on 'default'.'default'.'t1' for user 'b'@'%' with roles [public]
Error: APIError: QueryFailed: [1063]Permission denied: privilege [Read] is required on STAGE s3 for user 'b'@'%' with roles [public]. Note: Please ensure that your current role have the appropriate permissions to create a new Warehouse|Database|Table|UDF|Stage.
Error: APIError: QueryFailed: [1063]Permission denied: privilege [Select] is required on 'default'.'default'.'t' for user 'b'@'%' with roles [public]
Expand Down
1 change: 1 addition & 0 deletions tests/suites/0_stateless/19_fuzz/19_0005_fuzz_cte.result
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
OK
25 changes: 25 additions & 0 deletions tests/suites/0_stateless/19_fuzz/19_0005_fuzz_cte.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/usr/bin/env bash

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../../../shell_env.sh


times=256

echo "" > /tmp/fuzz_a.txt
echo "" > /tmp/fuzz_b.txt

for i in `seq 1 ${times}`;do
echo """with t0(sum_sid) as (select sum(number) over(partition by number order by number)
from numbers(3)) select n, if(n =1, sum_sid +1, 0) from t0, (select 1 n union all select 2) order by 1,2;
""" | $BENDSQL_CLIENT_CONNECT >> /tmp/fuzz_a.txt
done


for i in `seq 1 ${times}`;do
echo """with t0(sum_sid) as (select sum(number) over(partition by number order by number)
from numbers(3)) select n, if(n =1, sum_sid +1, 0) from t0, (select 1 n union all select 2) order by 1,2;
""" | $BENDSQL_CLIENT_CONNECT >> /tmp/fuzz_b.txt
done

diff /tmp/fuzz_a.txt /tmp/fuzz_b.txt && echo "OK"
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
1
1
2 10 391
2 10 399
expects .stats.write_progress.rows be 2
expects .error be null
2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
2
2
20 160 160
20 530 810
20 530 818
2
20 160 160
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
>>>> create or replace connection c_00_0005 storage_type='s3' access_key_id = 'minioadmin' endpoint_url = 'http://127.0.0.1:9900' secret_access_key = 'minioadmin'
>>>> copy into 's3://testbucket/c_00_0005/ab de/f' connection=(connection_name='c_00_0005') from (select 1) detailed_output=true use_raw_path=true single=true overwrite=true
c_00_0005/ab de/f 366 1
c_00_0005/ab de/f 374 1
<<<<
2 changes: 1 addition & 1 deletion tests/suites/1_stateful/00_stage/00_0012_stage_priv.result
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Error: APIError: QueryFailed: [1063]Permission denied: privilege [Write] is requ
Error: APIError: QueryFailed: [1063]Permission denied: privilege [Read] is required on STAGE presign_stage for user 'u1'@'%' with roles [public]. Note: Please ensure that your current role have the appropriate permissions to create a new Warehouse|Database|Table|UDF|Stage.
000
Error: APIError: QueryFailed: [1063]Permission denied: privilege [Write] is required on STAGE s3 for user 'u1'@'%' with roles [public]. Note: Please ensure that your current role have the appropriate permissions to create a new Warehouse|Database|Table|UDF|Stage.
1 1 366
1 1 374
Error: APIError: QueryFailed: [1063]Permission denied: privilege [Read] is required on STAGE s3 for user 'u1'@'%' with roles [public]. Note: Please ensure that your current role have the appropriate permissions to create a new Warehouse|Database|Table|UDF|Stage.
Error: APIError: QueryFailed: [1063]Permission denied: privilege READ is required on stage s3 for user 'u1'@'%'
Error: APIError: QueryFailed: [1063]Permission denied: privilege READ is required on stage s3 for user 'u1'@'%'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
>>>> create stage my_stage url= 's3://testbucket/admin/tempdata/' connection = (connection_name='my_conn');
>>>> remove @my_stage;
>>>> copy into @my_stage/a.csv from my_table
3 13 393
3 13 401
>>>> select * from @my_stage order by a;
1
2
Expand Down
Loading

0 comments on commit 749610a

Please sign in to comment.