Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(query): backport to 680 stick the created_by infos in parquet writer #17223

Merged
merged 3 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 9 additions & 16 deletions src/query/expression/src/register_vectorize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,7 @@ pub fn passthrough_nullable_1_arg<I1: ArgType, O: ArgType>(

match out {
Value::Column(out) => Value::Column(NullableColumn::new(out, args_validity)),
Value::Scalar(out) if args_validity.get_bit(0) => Value::Scalar(Some(out)),
_ => Value::Scalar(None),
Value::Scalar(out) => Value::Scalar(Some(out)),
}
}
_ => Value::Scalar(None),
Expand All @@ -308,15 +307,15 @@ pub fn passthrough_nullable_2_arg<I1: ArgType, I2: ArgType, O: ArgType>(
if let Some(validity) = ctx.validity.as_ref() {
args_validity = &args_validity & validity;
}

ctx.validity = Some(args_validity.clone());
match (arg1.value(), arg2.value()) {
(Some(arg1), Some(arg2)) => {
let out = func(arg1, arg2, ctx);

match out {
Value::Column(out) => Value::Column(NullableColumn::new(out, args_validity)),
Value::Scalar(out) if args_validity.get_bit(0) => Value::Scalar(Some(out)),
_ => Value::Scalar(None),
Value::Scalar(out) => Value::Scalar(Some(out)),
}
}
_ => Value::Scalar(None),
Expand Down Expand Up @@ -352,8 +351,7 @@ pub fn passthrough_nullable_3_arg<I1: ArgType, I2: ArgType, I3: ArgType, O: ArgT

match out {
Value::Column(out) => Value::Column(NullableColumn::new(out, args_validity)),
Value::Scalar(out) if args_validity.get_bit(0) => Value::Scalar(Some(out)),
_ => Value::Scalar(None),
Value::Scalar(out) => Value::Scalar(Some(out)),
}
}
_ => Value::Scalar(None),
Expand Down Expand Up @@ -397,8 +395,7 @@ pub fn passthrough_nullable_4_arg<

match out {
Value::Column(out) => Value::Column(NullableColumn::new(out, args_validity)),
Value::Scalar(out) if args_validity.get_bit(0) => Value::Scalar(Some(out)),
_ => Value::Scalar(None),
Value::Scalar(out) => Value::Scalar(Some(out)),
}
}
_ => Value::Scalar(None),
Expand Down Expand Up @@ -427,8 +424,7 @@ pub fn combine_nullable_1_arg<I1: ArgType, O: ArgType>(
out.column,
&args_validity & &out.validity,
)),
Value::Scalar(out) if args_validity.get_bit(0) => Value::Scalar(out),
_ => Value::Scalar(None),
Value::Scalar(out) => Value::Scalar(out),
}
}
_ => Value::Scalar(None),
Expand Down Expand Up @@ -465,8 +461,7 @@ pub fn combine_nullable_2_arg<I1: ArgType, I2: ArgType, O: ArgType>(
out.column,
&args_validity & &out.validity,
)),
Value::Scalar(out) if args_validity.get_bit(0) => Value::Scalar(out),
_ => Value::Scalar(None),
Value::Scalar(out) => Value::Scalar(out),
}
}
_ => Value::Scalar(None),
Expand Down Expand Up @@ -505,8 +500,7 @@ pub fn combine_nullable_3_arg<I1: ArgType, I2: ArgType, I3: ArgType, O: ArgType>
out.column,
&args_validity & &out.validity,
)),
Value::Scalar(out) if args_validity.get_bit(0) => Value::Scalar(out),
_ => Value::Scalar(None),
Value::Scalar(out) => Value::Scalar(out),
}
}
_ => Value::Scalar(None),
Expand Down Expand Up @@ -552,8 +546,7 @@ pub fn combine_nullable_4_arg<I1: ArgType, I2: ArgType, I3: ArgType, I4: ArgType
out.column,
&args_validity & &out.validity,
)),
Value::Scalar(out) if args_validity.get_bit(0) => Value::Scalar(out),
_ => Value::Scalar(None),
Value::Scalar(out) => Value::Scalar(out),
}
}
_ => Value::Scalar(None),
Expand Down
40 changes: 40 additions & 0 deletions src/query/functions/tests/it/scalars/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,43 @@ fn list_all_builtin_functions() {
fn check_ambiguity() {
BUILTIN_FUNCTIONS.check_ambiguity()
}

#[test]
fn test_if_function() -> Result<()> {
use databend_common_expression::types::*;
use databend_common_expression::FromData;
use databend_common_expression::Scalar;
let raw_expr = parser::parse_raw_expr("if(eq(n,1), sum_sid + 1,100)", &[
("n", UInt8Type::data_type()),
("sum_sid", Int32Type::data_type().wrap_nullable()),
]);
let expr = type_check::check(&raw_expr, &BUILTIN_FUNCTIONS)?;
let block = DataBlock::new(
vec![
BlockEntry {
data_type: UInt8Type::data_type(),
value: Value::Column(UInt8Type::from_data(vec![2_u8, 1])),
},
BlockEntry {
data_type: Int32Type::data_type().wrap_nullable(),
value: Value::Scalar(Scalar::Number(NumberScalar::Int32(2400_i32))),
},
],
2,
);
let func_ctx = FunctionContext::default();
let evaluator = Evaluator::new(&block, &func_ctx, &BUILTIN_FUNCTIONS);
let result = evaluator.run(&expr).unwrap();
let result = result
.as_column()
.unwrap()
.clone()
.as_nullable()
.unwrap()
.clone();

let bm = Bitmap::from_iter([true, true]);
assert_eq!(result.validity, bm);
assert_eq!(result.column, Int64Type::from_data(vec![100, 2401]));
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,35 @@ pub struct ParquetFileWriter {
const MAX_BUFFER_SIZE: usize = 64 * 1024 * 1024;
// this is number of rows, not size
const MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
const CREATE_BY_LEN: usize = 24; // "Databend 1.2.333-nightly".len();

fn create_writer(
arrow_schema: Arc<Schema>,
targe_file_size: Option<usize>,
) -> Result<ArrowWriter<Vec<u8>>> {
// example: 1.2.333-nightly
// tags may contain other items like `1.2.680-p2`, we will fill it with `1.2.680-p2.....`
let mut create_by = format!(
"Databend {}.{}.{}-{:.<7}",
DATABEND_SEMVER.major,
DATABEND_SEMVER.minor,
DATABEND_SEMVER.patch,
DATABEND_SEMVER.pre.as_str()
);

if create_by.len() != CREATE_BY_LEN {
create_by = format!("{:.<24}", create_by);
create_by.truncate(24);
}

let props = WriterProperties::builder()
.set_compression(TableCompression::Zstd.into())
.set_max_row_group_size(MAX_ROW_GROUP_SIZE)
.set_encoding(Encoding::PLAIN)
.set_dictionary_enabled(false)
.set_statistics_enabled(EnabledStatistics::Chunk)
.set_bloom_filter_enabled(false)
.set_created_by(format!("Databend {}", *DATABEND_SEMVER))
.set_created_by(create_by)
.build();
let buf_size = match targe_file_size {
Some(n) if n < MAX_BUFFER_SIZE => n,
Expand Down
61 changes: 60 additions & 1 deletion tests/sqllogictests/suites/query/cte/basic_r_cte.test
Original file line number Diff line number Diff line change
Expand Up @@ -227,5 +227,64 @@ select cte1.a from cte1;
8
9


statement ok
create table train(
train_id varchar(8) not null ,
departure_station varchar(32) not null,
arrival_station varchar(32) not null,
seat_count int not null
);

statement ok
create table passenger(
passenger_id varchar(16) not null,
departure_station varchar(32) not null,
arrival_station varchar(32) not null
);

statement ok
create table city(city varchar(32));

statement ok
insert into city
with t as (select 1 n union select 2 union select 3 union select 4 union select 5)
,t1 as(select row_number()over() rn from t ,t t2,t t3)
select concat('城市',rn::varchar) city from t1 where rn<=5;

statement ok
insert into train
select concat('G',row_number()over()::varchar),c1.city,c2.city, n from city c1, city c2, (select 600 n union select 800 union select 1200 union select 1600) a ;

statement ok
insert into passenger
select concat('P',substr((100000000+row_number()over())::varchar,2)),c1.city,c2.city from city c1, city c2 ,city c3, city c4, city c5,
city c6, (select 1 n union select 2 union select 3 union select 4) c7,(select 1 n union select 2) c8;


query III
with
t0 as (
select
train_id,
seat_count,
sum(seat_count) over (
partition by departure_station, arrival_station order by train_id
) ::int sum_sid
from
train
)
select
sum(case when n=1 then sum_sid+1 else 0 end::int),
sum(sum_sid),
sum(seat_count)
from
t0,(select 1 n union all select 2);
----
261700 523200 210000

statement ok
use default;

statement ok
drop table t1;
drop database db;
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ remove @data/unload/parquet/null_if/
query
copy into @data/unload/parquet/null_if from string
----
3 56 379
3 56 387

statement ok
drop file format if exists parquet_null_if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ remove @data/parquet/unload/uuid
query
copy into @data/parquet/unload/uuid/ from (select 1 as a) file_format = (type = parquet)
----
1 1 366
1 1 374

query error column id doesn't exist
copy into t_uuid from @data/parquet/unload/uuid file_format = (type = parquet) RETURN_FAILED_ONLY=TRUE
Expand All @@ -22,7 +22,7 @@ select * from t_uuid
query
copy into @data/parquet/unload/uuid/ from (select 1 as a) file_format = (type = parquet)
----
1 1 366
1 1 374

statement ok
truncate table t_uuid
Expand Down
2 changes: 1 addition & 1 deletion tests/suites/0_stateless/05_hints/05_0001_set_var.result
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ America/Toronto
1
2022-02-02 03:00:00
2022-02-02 03:00:00
1 13 419
1 13 427
Asia/Shanghai
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
1
200
=== test stage ===
1 8 392
1 8 400
0
=== test udf ===
2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ Error: APIError: QueryFailed: [1063]Permission denied: privilege READ is require
Error: APIError: QueryFailed: [1063]Permission denied: No privilege on database root_db for user b.
Error: APIError: QueryFailed: [1063]Permission denied: No privilege on table root_table for user b.
Error: APIError: QueryFailed: [1063]Permission denied: No privilege on table root_table for user b.
1 1 366
1 1 374
Error: APIError: QueryFailed: [1063]Permission denied: privilege [Select] is required on 'default'.'default'.'t1' for user 'b'@'%' with roles [public]
Error: APIError: QueryFailed: [1063]Permission denied: privilege [Read] is required on STAGE s3 for user 'b'@'%' with roles [public]. Note: Please ensure that your current role have the appropriate permissions to create a new Warehouse|Database|Table|UDF|Stage.
Error: APIError: QueryFailed: [1063]Permission denied: privilege [Select] is required on 'default'.'default'.'t' for user 'b'@'%' with roles [public]
Expand Down
1 change: 1 addition & 0 deletions tests/suites/0_stateless/19_fuzz/19_0005_fuzz_cte.result
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
OK
25 changes: 25 additions & 0 deletions tests/suites/0_stateless/19_fuzz/19_0005_fuzz_cte.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/usr/bin/env bash

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../../../shell_env.sh


times=256

echo "" > /tmp/fuzz_a.txt
echo "" > /tmp/fuzz_b.txt

for i in `seq 1 ${times}`;do
echo """with t0(sum_sid) as (select sum(number) over(partition by number order by number)
from numbers(3)) select n, if(n =1, sum_sid +1, 0) from t0, (select 1 n union all select 2) order by 1,2;
""" | $BENDSQL_CLIENT_CONNECT >> /tmp/fuzz_a.txt
done


for i in `seq 1 ${times}`;do
echo """with t0(sum_sid) as (select sum(number) over(partition by number order by number)
from numbers(3)) select n, if(n =1, sum_sid +1, 0) from t0, (select 1 n union all select 2) order by 1,2;
""" | $BENDSQL_CLIENT_CONNECT >> /tmp/fuzz_b.txt
done

diff /tmp/fuzz_a.txt /tmp/fuzz_b.txt && echo "OK"
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
1
1
2 10 391
2 10 399
expects .stats.write_progress.rows be 2
expects .error be null
2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
2
2
20 160 160
20 530 810
20 530 818
2
20 160 160
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
>>>> create or replace connection c_00_0005 storage_type='s3' access_key_id = 'minioadmin' endpoint_url = 'http://127.0.0.1:9900' secret_access_key = 'minioadmin'
>>>> copy into 's3://testbucket/c_00_0005/ab de/f' connection=(connection_name='c_00_0005') from (select 1) detailed_output=true use_raw_path=true single=true overwrite=true
c_00_0005/ab de/f 366 1
c_00_0005/ab de/f 374 1
<<<<
2 changes: 1 addition & 1 deletion tests/suites/1_stateful/00_stage/00_0012_stage_priv.result
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Error: APIError: QueryFailed: [1063]Permission denied: privilege [Write] is requ
Error: APIError: QueryFailed: [1063]Permission denied: privilege [Read] is required on STAGE presign_stage for user 'u1'@'%' with roles [public]. Note: Please ensure that your current role have the appropriate permissions to create a new Warehouse|Database|Table|UDF|Stage.
000
Error: APIError: QueryFailed: [1063]Permission denied: privilege [Write] is required on STAGE s3 for user 'u1'@'%' with roles [public]. Note: Please ensure that your current role have the appropriate permissions to create a new Warehouse|Database|Table|UDF|Stage.
1 1 366
1 1 374
Error: APIError: QueryFailed: [1063]Permission denied: privilege [Read] is required on STAGE s3 for user 'u1'@'%' with roles [public]. Note: Please ensure that your current role have the appropriate permissions to create a new Warehouse|Database|Table|UDF|Stage.
Error: APIError: QueryFailed: [1063]Permission denied: privilege READ is required on stage s3 for user 'u1'@'%'
Error: APIError: QueryFailed: [1063]Permission denied: privilege READ is required on stage s3 for user 'u1'@'%'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
>>>> create stage my_stage url= 's3://testbucket/admin/tempdata/' connection = (connection_name='my_conn');
>>>> remove @my_stage;
>>>> copy into @my_stage/a.csv from my_table
3 13 393
3 13 401
>>>> select * from @my_stage order by a;
1
2
Expand Down
Loading
Loading