Skip to content

Commit

Permalink
Merge pull request #1752 from junli1026/jun/dev
Browse files Browse the repository at this point in the history
Add Interval suppport
  • Loading branch information
databend-bot authored Sep 10, 2021
2 parents a5d9cf9 + 001a7b7 commit f9aab56
Show file tree
Hide file tree
Showing 11 changed files with 164 additions and 9 deletions.
1 change: 1 addition & 0 deletions common/datavalues/src/data_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ impl From<&DataType> for DataValue {
DataType::List(f) => DataValue::List(None, f.data_type().clone()),
DataType::Struct(_) => DataValue::Struct(vec![]),
DataType::String => DataValue::String(None),
DataType::Interval(_) => DataValue::Int64(None),
}
}
}
Expand Down
30 changes: 30 additions & 0 deletions common/datavalues/src/types/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,31 @@ pub enum DataType {
/// Option<String> indicates the timezone, if it's None, it's UTC
DateTime32(Option<String>),

Interval(IntervalUnit),

List(Box<DataField>),
Struct(Vec<DataField>),
String,
}

#[derive(
serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord,
)]
pub enum IntervalUnit {
YearMonth,
DayTime,
}

impl fmt::Display for IntervalUnit {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let str = match self {
IntervalUnit::YearMonth => "YearMonth",
IntervalUnit::DayTime => "DayTime",
};
write!(f, "{}", str)
}
}

impl DataType {
pub fn to_physical_type(&self) -> PhysicalDataType {
self.clone().into()
Expand Down Expand Up @@ -80,6 +100,7 @@ impl DataType {
ArrowDataType::Struct(arrows_fields)
}
String => ArrowDataType::LargeBinary,
Interval(_) => ArrowDataType::Int64,
}
}
}
Expand Down Expand Up @@ -116,6 +137,14 @@ impl From<&ArrowDataType> for DataType {
ArrowDataType::Timestamp(_, tz) => DataType::DateTime32(tz.clone()),
ArrowDataType::Date32 => DataType::Date16,
ArrowDataType::Date64 => DataType::Date32,

ArrowDataType::Extension(name, _arrow_type, extra) => match name.as_str() {
"Date16" => DataType::Date16,
"Date32" => DataType::Date32,
"DateTime32" => DataType::DateTime32(extra.clone()),
_ => unimplemented!("data_type: {}", dt),
},

// this is safe, because we define the datatype firstly
_ => {
unimplemented!("data_type: {}", dt)
Expand Down Expand Up @@ -159,6 +188,7 @@ impl fmt::Debug for DataType {
Self::List(arg0) => f.debug_tuple("List").field(arg0).finish(),
Self::Struct(arg0) => f.debug_tuple("Struct").field(arg0).finish(),
Self::String => write!(f, "String"),
Self::Interval(unit) => write!(f, "Interval({})", unit.to_string()),
}
}
}
Expand Down
36 changes: 35 additions & 1 deletion common/datavalues/src/types/data_type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ pub fn is_numeric(dt: &DataType) -> bool {
)
}

pub fn is_interval(dt: &DataType) -> bool {
matches!(dt, DataType::Interval(_))
}

fn next_size(size: usize) -> usize {
if size < 8_usize {
return size * 2;
Expand Down Expand Up @@ -273,7 +277,7 @@ pub fn datetime_arithmetic_coercion(
DataValueArithmeticOperator::Plus => Ok(a),

DataValueArithmeticOperator::Minus => {
if is_numeric(&b) {
if is_numeric(&b) || is_interval(&b) {
Ok(a)
} else {
// Date minus Date or DateTime minus DateTime
Expand All @@ -284,6 +288,36 @@ pub fn datetime_arithmetic_coercion(
}
}

#[inline]
pub fn interval_arithmetic_coercion(
op: &DataValueArithmeticOperator,
lhs_type: &DataType,
rhs_type: &DataType,
) -> Result<DataType> {
let e = Result::Err(ErrorCode::BadDataValueType(format!(
"DataValue Error: Unsupported date coercion ({:?}) {} ({:?})",
lhs_type, op, rhs_type
)));

// only allow date/datetime [+/-] interval
if !(is_date_or_date_time(lhs_type) && is_interval(rhs_type)
|| is_date_or_date_time(rhs_type) && is_interval(lhs_type))
{
return e;
}

match op {
DataValueArithmeticOperator::Plus | DataValueArithmeticOperator::Minus => {
if is_date_or_date_time(lhs_type) {
Ok(lhs_type.clone())
} else {
Ok(rhs_type.clone())
}
}
_ => e,
}
}

#[inline]
pub fn numerical_signed_coercion(val_type: &DataType) -> Result<DataType> {
// error on any non-numeric type
Expand Down
1 change: 1 addition & 0 deletions common/datavalues/src/types/physical_data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl From<DataType> for PhysicalDataType {
DataType::List(x) => List(x),
DataType::Struct(x) => Struct(x),
DataType::String => String,
DataType::Interval(_) => Int64,
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions common/datavalues/src/types/serializations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ impl DataType {
DataType::String => Ok(Box::new(StringSerializer {
builder: StringArrayBuilder::with_capacity(capacity),
})),
DataType::Interval(_) => Ok(Box::new(DateSerializer::<i64> {
builder: PrimitiveArrayBuilder::<i64>::with_capacity(capacity),
})),
other => Err(ErrorCode::BadDataValueType(format!(
"create_serializer does not support type '{:?}'",
other
Expand Down
16 changes: 15 additions & 1 deletion common/functions/src/scalars/arithmetics/arithmetic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use common_datavalues::columns::DataColumn;
use common_datavalues::prelude::*;
use common_datavalues::DataSchema;
use common_datavalues::DataValueArithmeticOperator;
use common_exception::ErrorCode;
use common_exception::Result;

use crate::scalars::ArithmeticDivFunction;
Expand Down Expand Up @@ -63,6 +64,10 @@ impl Function for ArithmeticFunction {
if args.len() == 1 {
return Ok(args[0].clone());
}

if is_interval(&args[0]) || is_interval(&args[1]) {
return interval_arithmetic_coercion(&self.op, &args[0], &args[1]);
}
if is_date_or_date_time(&args[0]) || is_date_or_date_time(&args[1]) {
return datetime_arithmetic_coercion(&self.op, &args[0], &args[1]);
}
Expand All @@ -74,14 +79,23 @@ impl Function for ArithmeticFunction {
}

fn eval(&self, columns: &DataColumnsWithField, _input_rows: usize) -> Result<DataColumn> {
let has_date_or_date_time = columns.iter().any(|c| is_date_or_date_time(c.data_type()));
let has_interval = columns.iter().any(|c| is_interval(c.data_type()));

if has_date_or_date_time && has_interval {
//TODO(Jun): implement data/datatime +/- interval
return Err(ErrorCode::UnImplement(
"Arithmetic operation between date and interval is not implemented yet.".to_owned(),
));
}

let result = match columns.len() {
1 => std::ops::Neg::neg(columns[0].column()),
_ => columns[0]
.column()
.arithmetic(self.op.clone(), columns[1].column()),
}?;

let has_date_or_date_time = columns.iter().any(|c| is_date_or_date_time(c.data_type()));
if has_date_or_date_time {
let args = columns
.iter()
Expand Down
1 change: 1 addition & 0 deletions common/functions/src/scalars/dates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod date;
mod date_function_test;
#[cfg(test)]
mod date_test;

mod now;
mod number_function;
mod round_function;
Expand Down
3 changes: 3 additions & 0 deletions query/src/servers/clickhouse/writers/query_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,9 @@ pub fn to_clickhouse_block(block: DataBlock) -> Result<Block> {
.collect();
result.column(name, vs)
}
DataType::Interval(_) => {
result.column(name, column.i64()?.inner().values().as_slice().to_vec())
}
_ => {
return Err(ErrorCode::BadDataValueType(format!(
"Unsupported column type:{:?}",
Expand Down
1 change: 1 addition & 0 deletions query/src/servers/mysql/writers/query_result_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl<'a, W: std::io::Write> DFQueryResultWriter<'a, W> {
DataType::Date16 | DataType::Date32 => Ok(ColumnType::MYSQL_TYPE_DATE),
DataType::DateTime32(_) => Ok(ColumnType::MYSQL_TYPE_DATETIME),
DataType::Null => Ok(ColumnType::MYSQL_TYPE_NULL),
DataType::Interval(_) => Ok(ColumnType::MYSQL_TYPE_LONG),
_ => Err(ErrorCode::UnImplement(format!(
"Unsupported column type:{:?}",
field.data_type()
Expand Down
68 changes: 68 additions & 0 deletions query/src/sql/plan_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,45 @@ impl PlanParser {
}
}

fn interval_to_day_time(days: i32, ms: i32) -> Result<Expression> {
let milliseconds_per_day = 24 * 3600 * 1000;
let wrapped_days = days + ms / milliseconds_per_day;
let wrapped_ms = ms % milliseconds_per_day;
let num = (wrapped_days as i64) << 32 | (wrapped_ms as i64);
let data_type = DataType::Interval(IntervalUnit::DayTime);

Ok(Expression::Literal {
value: DataValue::Int64(Some(num)),
column_name: Some(num.to_string()),
data_type,
})
}

fn interval_to_year_month(months: i32) -> Result<Expression> {
let data_type = DataType::Interval(IntervalUnit::YearMonth);

Ok(Expression::Literal {
value: DataValue::Int64(Some(months as i64)),
column_name: Some(months.to_string()),
data_type,
})
}

fn interval_to_rex(
value: &str,
interval_kind: sqlparser::ast::DateTimeField,
) -> Result<Expression> {
let num = value.parse::<i32>()?; // we only accept i32 for number in "interval [num] [year|month|day|hour|minute|second]"
match interval_kind {
sqlparser::ast::DateTimeField::Year => Self::interval_to_year_month(num * 12),
sqlparser::ast::DateTimeField::Month => Self::interval_to_year_month(num),
sqlparser::ast::DateTimeField::Day => Self::interval_to_day_time(num, 0),
sqlparser::ast::DateTimeField::Hour => Self::interval_to_day_time(0, num * 3600 * 1000),
sqlparser::ast::DateTimeField::Minute => Self::interval_to_day_time(0, num * 60 * 1000),
sqlparser::ast::DateTimeField::Second => Self::interval_to_day_time(0, num * 1000),
}
}

fn value_to_rex(value: &sqlparser::ast::Value) -> Result<Expression> {
match value {
sqlparser::ast::Value::Number(ref n, _) => {
Expand All @@ -902,6 +941,35 @@ impl PlanParser {
sqlparser::ast::Value::Boolean(b) => {
Ok(Expression::create_literal(DataValue::Boolean(Some(*b))))
}
sqlparser::ast::Value::Interval {
value: value_expr,
leading_field,
leading_precision,
last_field,
fractional_seconds_precision,
} => {
// We don't support full interval expression like 'Interval ..To.. '. Currently only partial interval expression like "interval [num] [unit]" is supported.
if leading_precision.is_some()
|| last_field.is_some()
|| fractional_seconds_precision.is_some()
{
return Result::Err(ErrorCode::SyntaxException(format!(
"Unsupported interval expression: {}.",
value
)));
}

// When the input is like "interval '1 hour'", leading_field will be None and value_expr will be '1 hour'.
// We may want to support this pattern in native paser (sqlparser-rs), to have a parsing result that leading_field is Some(Hour) and value_expr is number '1'.
if leading_field.is_none() {
//TODO: support parsing literal interval like '1 hour'
return Result::Err(ErrorCode::SyntaxException(format!(
"Unsupported interval expression: {}.",
value
)));
}
Self::interval_to_rex(value_expr, leading_field.clone().unwrap())
}
sqlparser::ast::Value::Null => Ok(Expression::create_literal(DataValue::Null)),
other => Result::Err(ErrorCode::SyntaxException(format!(
"Unsupported value expression: {}, type: {:?}",
Expand Down
13 changes: 6 additions & 7 deletions query/src/sql/plan_parser_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,12 @@ fn test_plan_parser() -> Result<()> {
expect: "",
error: "Code: 8, displayText = Unsupported function: \"unsupported\".",
},
// TODO: support interval type
// Test {
// name: "interval-passed",
// sql: "SELECT INTERVAL '1 year', INTERVAL '1 month', INTERVAL '1 day', INTERVAL '1 hour', INTERVAL '1 minute', INTERVAL '1 second'",
// expect: "Projection: 12:Interval(YearMonth), 1:Interval(YearMonth), 4294967296:Interval(DayTime), 3600000:Interval(DayTime), 60000:Interval(DayTime), 1000:Interval(DayTime)\n Expression: 12:Interval(YearMonth), 1:Interval(YearMonth), 4294967296:Interval(DayTime), 3600000:Interval(DayTime), 60000:Interval(DayTime), 1000:Interval(DayTime) (Before Projection)\n ReadDataSource: scan partitions: [1], scan schema: [dummy:UInt8], statistics: [read_rows: 1, read_bytes: 1]",
// error: "",
// },
Test {
name: "interval-passed",
sql: "SELECT INTERVAL '1' year, INTERVAL '1' month, INTERVAL '1' day, INTERVAL '1' hour, INTERVAL '1' minute, INTERVAL '1' second",
expect: "Projection: 12:Interval(YearMonth), 1:Interval(YearMonth), 4294967296:Interval(DayTime), 3600000:Interval(DayTime), 60000:Interval(DayTime), 1000:Interval(DayTime)\n Expression: 12:Interval(YearMonth), 1:Interval(YearMonth), 4294967296:Interval(DayTime), 3600000:Interval(DayTime), 60000:Interval(DayTime), 1000:Interval(DayTime) (Before Projection)\n ReadDataSource: scan partitions: [1], scan schema: [dummy:UInt8], statistics: [read_rows: 1, read_bytes: 1]",
error: "",
},
// Test {
// name: "interval-unsupported",
// sql: "SELECT INTERVAL '1 year 1 day'",
Expand Down

0 comments on commit f9aab56

Please sign in to comment.