Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Interval suppport #1752

Merged
merged 4 commits into from
Sep 10, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/datavalues/src/data_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ impl From<&DataType> for DataValue {
DataType::List(f) => DataValue::List(None, f.data_type().clone()),
DataType::Struct(_) => DataValue::Struct(vec![]),
DataType::String => DataValue::String(None),
DataType::Interval(_) => DataValue::Int64(None),
}
}
}
Expand Down
47 changes: 47 additions & 0 deletions common/datavalues/src/types/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,31 @@ pub enum DataType {
/// Option<String> indicates the timezone, if it's None, it's UTC
DateTime32(Option<String>),

Interval(IntervalUnit),

List(Box<DataField>),
Struct(Vec<DataField>),
String,
}

#[derive(
serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord,
)]
pub enum IntervalUnit {
YearMonth,
DayTime,
}

impl fmt::Display for IntervalUnit {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let str = match self {
IntervalUnit::YearMonth => "YearMonth",
IntervalUnit::DayTime => "DayTime",
};
write!(f, "{}", str)
}
}

impl DataType {
pub fn to_physical_type(&self) -> PhysicalDataType {
self.clone().into()
Expand Down Expand Up @@ -80,6 +100,11 @@ impl DataType {
ArrowDataType::Struct(arrows_fields)
}
String => ArrowDataType::LargeBinary,
Interval(unit) => ArrowDataType::Extension(
junli1026 marked this conversation as resolved.
Show resolved Hide resolved
"Interval".to_string(),
Box::new(ArrowDataType::Int64),
Some(unit.to_string()),
),
}
}
}
Expand Down Expand Up @@ -116,6 +141,25 @@ impl From<&ArrowDataType> for DataType {
ArrowDataType::Timestamp(_, tz) => DataType::DateTime32(tz.clone()),
ArrowDataType::Date32 => DataType::Date16,
ArrowDataType::Date64 => DataType::Date32,

ArrowDataType::Extension(name, _arrow_type, extra) => match name.as_str() {
"Date16" => DataType::Date16,
"Date32" => DataType::Date32,
"DateTime32" => DataType::DateTime32(extra.clone()),
"Interval" => {
if let Some(unit) = extra {
match unit.as_str() {
"YearMonth" => DataType::Interval(IntervalUnit::YearMonth),
"DayTime" => DataType::Interval(IntervalUnit::DayTime),
_ => unreachable!(),
}
} else {
unreachable!()
}
}
_ => unimplemented!("data_type: {}", dt),
},

// this is safe, because we define the datatype firstly
_ => {
unimplemented!("data_type: {}", dt)
Expand Down Expand Up @@ -159,6 +203,9 @@ impl fmt::Debug for DataType {
Self::List(arg0) => f.debug_tuple("List").field(arg0).finish(),
Self::Struct(arg0) => f.debug_tuple("Struct").field(arg0).finish(),
Self::String => write!(f, "String"),
Self::Interval(unit) => {
write!(f, "Interval({})", unit.to_string())
}
}
}
}
Expand Down
36 changes: 35 additions & 1 deletion common/datavalues/src/types/data_type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ pub fn is_numeric(dt: &DataType) -> bool {
)
}

pub fn is_interval(dt: &DataType) -> bool {
matches!(dt, DataType::Interval(_))
}

fn next_size(size: usize) -> usize {
if size < 8_usize {
return size * 2;
Expand Down Expand Up @@ -273,7 +277,7 @@ pub fn datetime_arithmetic_coercion(
DataValueArithmeticOperator::Plus => Ok(a),

DataValueArithmeticOperator::Minus => {
if is_numeric(&b) {
if is_numeric(&b) || is_interval(&b) {
Ok(a)
} else {
// Date minus Date or DateTime minus DateTime
Expand All @@ -284,6 +288,36 @@ pub fn datetime_arithmetic_coercion(
}
}

#[inline]
pub fn interval_arithmetic_coercion(
op: &DataValueArithmeticOperator,
lhs_type: &DataType,
rhs_type: &DataType,
) -> Result<DataType> {
let e = Result::Err(ErrorCode::BadDataValueType(format!(
"DataValue Error: Unsupported date coercion ({:?}) {} ({:?})",
lhs_type, op, rhs_type
)));

// only allow date/datetime [+/-] interval
if !(is_date_or_date_time(lhs_type) && is_interval(rhs_type)
|| is_date_or_date_time(rhs_type) && is_interval(lhs_type))
{
return e;
}

match op {
DataValueArithmeticOperator::Plus | DataValueArithmeticOperator::Minus => {
if is_date_or_date_time(lhs_type) {
Ok(lhs_type.clone())
} else {
Ok(rhs_type.clone())
}
}
_ => e,
}
}

#[inline]
pub fn numerical_signed_coercion(val_type: &DataType) -> Result<DataType> {
// error on any non-numeric type
Expand Down
1 change: 1 addition & 0 deletions common/datavalues/src/types/physical_data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl From<DataType> for PhysicalDataType {
DataType::List(x) => List(x),
DataType::Struct(x) => Struct(x),
DataType::String => String,
DataType::Interval(_) => Int64,
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions common/datavalues/src/types/serializations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ impl DataType {
DataType::String => Ok(Box::new(StringSerializer {
builder: StringArrayBuilder::with_capacity(capacity),
})),
DataType::Interval(_) => Ok(Box::new(DateSerializer::<i64> {
builder: PrimitiveArrayBuilder::<i64>::with_capacity(capacity),
})),
other => Err(ErrorCode::BadDataValueType(format!(
"create_serializer does not support type '{:?}'",
other
Expand Down
13 changes: 12 additions & 1 deletion common/functions/src/scalars/arithmetics/arithmetic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ impl Function for ArithmeticFunction {
if args.len() == 1 {
return Ok(args[0].clone());
}

if is_interval(&args[0]) || is_interval(&args[1]) {
return interval_arithmetic_coercion(&self.op, &args[0], &args[1]);
}
if is_date_or_date_time(&args[0]) || is_date_or_date_time(&args[1]) {
return datetime_arithmetic_coercion(&self.op, &args[0], &args[1]);
}
Expand All @@ -74,14 +78,21 @@ impl Function for ArithmeticFunction {
}

fn eval(&self, columns: &DataColumnsWithField, _input_rows: usize) -> Result<DataColumn> {
let has_date_or_date_time = columns.iter().any(|c| is_date_or_date_time(c.data_type()));
let has_interval = columns.iter().any(|c| is_interval(c.data_type()));

if has_date_or_date_time && has_interval {
//TODO(Jun): implement data/datatime +/- interval
unimplemented!()
junli1026 marked this conversation as resolved.
Show resolved Hide resolved
}

let result = match columns.len() {
1 => std::ops::Neg::neg(columns[0].column()),
_ => columns[0]
.column()
.arithmetic(self.op.clone(), columns[1].column()),
}?;

let has_date_or_date_time = columns.iter().any(|c| is_date_or_date_time(c.data_type()));
if has_date_or_date_time {
let args = columns
.iter()
Expand Down
1 change: 1 addition & 0 deletions common/functions/src/scalars/dates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod date;
mod date_function_test;
#[cfg(test)]
mod date_test;

mod now;
mod number_function;
mod round_function;
Expand Down
3 changes: 3 additions & 0 deletions query/src/servers/clickhouse/writers/query_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,9 @@ pub fn to_clickhouse_block(block: DataBlock) -> Result<Block> {
.collect();
result.column(name, vs)
}
DataType::Interval(_) => {
result.column(name, column.i64()?.inner().values().as_slice().to_vec())
}
_ => {
return Err(ErrorCode::BadDataValueType(format!(
"Unsupported column type:{:?}",
Expand Down
1 change: 1 addition & 0 deletions query/src/servers/mysql/writers/query_result_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl<'a, W: std::io::Write> DFQueryResultWriter<'a, W> {
DataType::Date16 | DataType::Date32 => Ok(ColumnType::MYSQL_TYPE_DATE),
DataType::DateTime32(_) => Ok(ColumnType::MYSQL_TYPE_DATETIME),
DataType::Null => Ok(ColumnType::MYSQL_TYPE_NULL),
DataType::Interval(_) => Ok(ColumnType::MYSQL_TYPE_LONG),
_ => Err(ErrorCode::UnImplement(format!(
"Unsupported column type:{:?}",
field.data_type()
Expand Down
68 changes: 68 additions & 0 deletions query/src/sql/plan_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,45 @@ impl PlanParser {
}
}

fn interval_to_day_time(days: i32, ms: i32) -> Result<Expression> {
let milliseconds_per_day = 24 * 3600 * 1000;
let wrapped_days = days + ms / milliseconds_per_day;
let wrapped_ms = ms % milliseconds_per_day;
let num = (wrapped_days as i64) << 32 | (wrapped_ms as i64);
let data_type = DataType::Interval(IntervalUnit::DayTime);

Ok(Expression::Literal {
value: DataValue::Int64(Some(num)),
column_name: Some(num.to_string()),
data_type,
})
}

fn interval_to_year_month(months: i32) -> Result<Expression> {
let data_type = DataType::Interval(IntervalUnit::YearMonth);

Ok(Expression::Literal {
value: DataValue::Int64(Some(months as i64)),
column_name: Some(months.to_string()),
data_type,
})
}

fn interval_to_rex(
value: &str,
interval_kind: sqlparser::ast::DateTimeField,
) -> Result<Expression> {
let num = value.parse::<i32>()?; // we only accept i32 for number in "interval [num] [year|month|day|hour|minute|second]"
match interval_kind {
sqlparser::ast::DateTimeField::Year => Self::interval_to_year_month(num * 12),
sqlparser::ast::DateTimeField::Month => Self::interval_to_year_month(num),
sqlparser::ast::DateTimeField::Day => Self::interval_to_day_time(num, 0),
sqlparser::ast::DateTimeField::Hour => Self::interval_to_day_time(0, num * 3600 * 1000),
sqlparser::ast::DateTimeField::Minute => Self::interval_to_day_time(0, num * 60 * 1000),
sqlparser::ast::DateTimeField::Second => Self::interval_to_day_time(0, num * 1000),
}
}

fn value_to_rex(value: &sqlparser::ast::Value) -> Result<Expression> {
match value {
sqlparser::ast::Value::Number(ref n, _) => {
Expand All @@ -900,6 +939,35 @@ impl PlanParser {
sqlparser::ast::Value::Boolean(b) => {
Ok(Expression::create_literal(DataValue::Boolean(Some(*b))))
}
sqlparser::ast::Value::Interval {
value: value_expr,
leading_field,
leading_precision,
last_field,
fractional_seconds_precision,
} => {
// We don't support full interval expression like 'Interval ..To.. '. Currently only partial interval expression like "interval [num] [unit]" is supported.
if leading_precision.is_some()
|| last_field.is_some()
|| fractional_seconds_precision.is_some()
{
return Result::Err(ErrorCode::SyntaxException(format!(
"Unsupported interval expression: {}.",
value
)));
}

// When the input is like "interval '1 hour'", leading_field will be None and value_expr will be '1 hour'.
// We may want to support this pattern in native paser (sqlparser-rs), to have a parsing result that leading_field is Some(Hour) and value_expr is number '1'.
if leading_field.is_none() {
//TODO: support parsing literal interval like '1 hour'
return Result::Err(ErrorCode::SyntaxException(format!(
"Unsupported interval expression: {}.",
value
)));
}
Self::interval_to_rex(value_expr, leading_field.clone().unwrap())
}
sqlparser::ast::Value::Null => Ok(Expression::create_literal(DataValue::Null)),
other => Result::Err(ErrorCode::SyntaxException(format!(
"Unsupported value expression: {}, type: {:?}",
Expand Down
13 changes: 6 additions & 7 deletions query/src/sql/plan_parser_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,12 @@ fn test_plan_parser() -> Result<()> {
expect: "",
error: "Code: 8, displayText = Unsupported function: \"unsupported\".",
},
// TODO: support interval type
// Test {
// name: "interval-passed",
// sql: "SELECT INTERVAL '1 year', INTERVAL '1 month', INTERVAL '1 day', INTERVAL '1 hour', INTERVAL '1 minute', INTERVAL '1 second'",
// expect: "Projection: 12:Interval(YearMonth), 1:Interval(YearMonth), 4294967296:Interval(DayTime), 3600000:Interval(DayTime), 60000:Interval(DayTime), 1000:Interval(DayTime)\n Expression: 12:Interval(YearMonth), 1:Interval(YearMonth), 4294967296:Interval(DayTime), 3600000:Interval(DayTime), 60000:Interval(DayTime), 1000:Interval(DayTime) (Before Projection)\n ReadDataSource: scan partitions: [1], scan schema: [dummy:UInt8], statistics: [read_rows: 1, read_bytes: 1]",
// error: "",
// },
Test {
name: "interval-passed",
sql: "SELECT INTERVAL '1' year, INTERVAL '1' month, INTERVAL '1' day, INTERVAL '1' hour, INTERVAL '1' minute, INTERVAL '1' second",
expect: "Projection: 12:Interval(YearMonth), 1:Interval(YearMonth), 4294967296:Interval(DayTime), 3600000:Interval(DayTime), 60000:Interval(DayTime), 1000:Interval(DayTime)\n Expression: 12:Interval(YearMonth), 1:Interval(YearMonth), 4294967296:Interval(DayTime), 3600000:Interval(DayTime), 60000:Interval(DayTime), 1000:Interval(DayTime) (Before Projection)\n ReadDataSource: scan partitions: [1], scan schema: [dummy:UInt8], statistics: [read_rows: 1, read_bytes: 1]",
error: "",
},
// Test {
// name: "interval-unsupported",
// sql: "SELECT INTERVAL '1 year 1 day'",
Expand Down