Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved table functions #13627

Merged
merged 20 commits into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 41 additions & 34 deletions docs/multi-stage-query/concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,62 +49,69 @@ To support ingestion, additional SQL functionality is available through the MSQ

<a name="extern"></a>

### Read external data with EXTERN
### Read external data with `EXTERN`

Query tasks can access external data through the EXTERN function, using any native batch [input
Query tasks can access external data through the `EXTERN` function, using any native batch [input
source](../ingestion/native-batch-input-source.md) and [input format](../ingestion/data-formats.md#input-format).

EXTERN can read multiple files in parallel across different worker tasks. However, EXTERN does not split individual
`EXTERN` can read multiple files in parallel across different worker tasks. However, `EXTERN` does not split individual
files across multiple worker tasks. If you have a small number of very large input files, you can increase query
parallelism by splitting up your input files.

For more information about the syntax, see [EXTERN](./reference.md#extern).
For more information about the syntax, see [`EXTERN`](./reference.md#extern-function).

See also the set of SQL-friendly input-source-specific table functions which may be more convenient
than `EXTERN`.

<a name="insert"></a>

### Load data with INSERT
### Load data with `INSERT`

INSERT statements can create a new datasource or append to an existing datasource. In Druid SQL, unlike standard SQL,
`INSERT` statements can create a new datasource or append to an existing datasource. In Druid SQL, unlike standard SQL,
there is no syntactical difference between creating a table and appending data to a table. Druid does not include a
`CREATE TABLE` statement.

Nearly all SELECT capabilities are available for `INSERT ... SELECT` queries. Certain exceptions are listed on the [Known
issues](./known-issues.md#select) page.
Nearly all `SELECT` capabilities are available for `INSERT ... SELECT` queries. Certain exceptions are listed on the [Known
issues](./known-issues.md#select-statement) page.

INSERT statements acquire a shared lock to the target datasource. Multiple INSERT statements can run at the same time,
`INSERT` statements acquire a shared lock to the target datasource. Multiple `INSERT` statements can run at the same time,
for the same datasource, if your cluster has enough task slots.

Like all other forms of [batch ingestion](../ingestion/index.md#batch), each INSERT statement generates new segments and
Like all other forms of [batch ingestion](../ingestion/index.md#batch), each `INSERT` statement generates new segments and
publishes them at the end of its run. For this reason, it is best suited to loading data in larger batches. Do not use
INSERT statements to load data in a sequence of microbatches; for that, use [streaming
`INSERT` statements to load data in a sequence of microbatches; for that, use [streaming
ingestion](../ingestion/index.md#streaming) instead.

When deciding whether to use REPLACE or INSERT, keep in mind that segments generated with REPLACE can be pruned with dimension-based pruning but those generated with INSERT cannot. For more information about the requirements for dimension-based pruning, see [Clustering](#clustering).
When deciding whether to use `REPLACE` or `INSERT`, keep in mind that segments generated with `REPLACE` can be pruned
with dimension-based pruning but those generated with `INSERT` cannot. For more information about the requirements
for dimension-based pruning, see [Clustering](#clustering).

For more information about the syntax, see [INSERT](./reference.md#insert).

<a name="replace"></a>

### Overwrite data with REPLACE

REPLACE statements can create a new datasource or overwrite data in an existing datasource. In Druid SQL, unlike
`REPLACE` statements can create a new datasource or overwrite data in an existing datasource. In Druid SQL, unlike
standard SQL, there is no syntactical difference between creating a table and overwriting data in a table. Druid does
not include a `CREATE TABLE` statement.

REPLACE uses an [OVERWRITE clause](reference.md#replace-specific-time-ranges) to determine which data to overwrite. You
`REPLACE` uses an [OVERWRITE clause](reference.md#replace-specific-time-ranges) to determine which data to overwrite. You
can overwrite an entire table, or a specific time range of a table. When you overwrite a specific time range, that time
range must align with the granularity specified in the PARTITIONED BY clause.
range must align with the granularity specified in the `PARTITIONED BY` clause.

REPLACE statements acquire an exclusive write lock to the target time range of the target datasource. No other ingestion
`REPLACE` statements acquire an exclusive write lock to the target time range of the target datasource. No other ingestion
or compaction operations may proceed for that time range while the task is running. However, ingestion and compaction
operations may proceed for other time ranges.

Nearly all SELECT capabilities are available for `REPLACE ... SELECT` queries. Certain exceptions are listed on the [Known
issues](./known-issues.md#select) page.
Nearly all `SELECT` capabilities are available for `REPLACE ... SELECT` queries. Certain exceptions are listed on the [Known
issues](./known-issues.md#select-statement) page.

For more information about the syntax, see [REPLACE](./reference.md#replace).

When deciding whether to use REPLACE or INSERT, keep in mind that segments generated with REPLACE can be pruned with dimension-based pruning but those generated with INSERT cannot. For more information about the requirements for dimension-based pruning, see [Clustering](#clustering).
When deciding whether to use `REPLACE` or `INSERT`, keep in mind that segments generated with `REPLACE` can be pruned
with dimension-based pruning but those generated with `INSERT` cannot. For more information about the requirements
for dimension-based pruning, see [Clustering](#clustering).

### Primary timestamp

Expand All @@ -116,7 +123,7 @@ HH:mm:ss') AS __time`.

The `__time` column is used for [partitioning by time](#partitioning-by-time). If you use `PARTITIONED BY ALL` or
`PARTITIONED BY ALL TIME`, partitioning by time is disabled. In these cases, you do not need to include a `__time`
column in your INSERT statement. However, Druid still creates a `__time` column in your Druid table and sets all
column in your `INSERT` statement. However, Druid still creates a `__time` column in your Druid table and sets all
timestamps to 1970-01-01 00:00:00.

For more information, see [Primary timestamp](../ingestion/data-model.md#primary-timestamp).
Expand All @@ -125,7 +132,7 @@ For more information, see [Primary timestamp](../ingestion/data-model.md#primary

### Partitioning by time

INSERT and REPLACE statements require the PARTITIONED BY clause, which determines how time-based partitioning is done.
`INSERT` and `REPLACE` statements require the `PARTITIONED BY` clause, which determines how time-based partitioning is done.
In Druid, data is split into one or more segments per time chunk, defined by the PARTITIONED BY granularity.

Partitioning by time is important for three reasons:
Expand Down Expand Up @@ -160,16 +167,16 @@ Clustering is important for two reasons:

To activate dimension-based pruning, these requirements must be met:

- Segments were generated by a REPLACE statement, not an INSERT statement.
- All CLUSTERED BY columns are single-valued string columns.
- Segments were generated by a `REPLACE` statement, not an `INSERT` statement.
- All `CLUSTERED BY` columns are single-valued string columns.

If these requirements are _not_ met, Druid still clusters data during ingestion but will not be able to perform
dimension-based segment pruning at query time. You can tell if dimension-based segment pruning is possible by using the
`sys.segments` table to inspect the `shard_spec` for the segments generated by an ingestion query. If they are of type
`range` or `single`, then dimension-based segment pruning is possible. Otherwise, it is not. The shard spec type is also
available in the **Segments** view under the **Partitioning** column.

For more information about syntax, see [CLUSTERED BY](./reference.md#clustered-by).
For more information about syntax, see [`CLUSTERED BY`](./reference.md#clustered-by).

### Rollup

Expand All @@ -179,14 +186,14 @@ This reduces storage footprint and improves performance, often dramatically.

To perform ingestion with rollup:

1. Use GROUP BY. The columns in the GROUP BY clause become dimensions, and aggregation functions become metrics.
1. Use `GROUP BY`. The columns in the `GROUP BY` clause become dimensions, and aggregation functions become metrics.
2. Set [`finalizeAggregations: false`](reference.md#context-parameters) in your context. This causes aggregation
functions to write their internal state to the generated segments, instead of the finalized end result, and enables
further aggregation at query time.
3. Wrap all multi-value strings in `MV_TO_ARRAY(...)` and set [`groupByEnableMultiValueUnnesting:
false`](reference.md#context-parameters) in your context. This ensures that multi-value strings are left alone and
remain lists, instead of being [automatically unnested](../querying/sql-data-types.md#multi-value-strings) by the
GROUP BY operator.
`GROUP BY` operator.

When you do all of these things, Druid understands that you intend to do an ingestion with rollup, and it writes
rollup-related metadata into the generated segments. Other applications can then use [`segmentMetadata`
Expand All @@ -196,8 +203,8 @@ If you see the error "Encountered multi-value dimension `x` that cannot be proce
groupByEnableMultiValueUnnesting set to false", then wrap that column in `MV_TO_ARRAY(x) AS x`.

The following [aggregation functions](../querying/sql-aggregations.md) are supported for rollup at ingestion time:
`COUNT` (but switch to `SUM` at query time), `SUM`, `MIN`, `MAX`, `EARLIEST` ([string only](known-issues.md#select)),
`LATEST` ([string only](known-issues.md#select)), `APPROX_COUNT_DISTINCT`, `APPROX_COUNT_DISTINCT_BUILTIN`,
`COUNT` (but switch to `SUM` at query time), `SUM`, `MIN`, `MAX`, `EARLIEST` ([string only](known-issues.md#select-statement)),
`LATEST` ([string only](known-issues.md#select-statement)), `APPROX_COUNT_DISTINCT`, `APPROX_COUNT_DISTINCT_BUILTIN`,
`APPROX_COUNT_DISTINCT_DS_HLL`, `APPROX_COUNT_DISTINCT_DS_THETA`, and `DS_QUANTILES_SKETCH` (but switch to
`APPROX_QUANTILE_DS` at query time). Do not use `AVG`; instead, use `SUM` and `COUNT` at ingest time and compute the
quotient at query time.
Expand All @@ -214,19 +221,19 @@ happens:
1. The Broker plans your SQL query into a native query, as usual.

2. The Broker wraps the native query into a task of type `query_controller`
and submits it to the indexing service.
and submits it to the indexing service.

3. The Broker returns the task ID to you and exits.

4. The controller task launches some number of worker tasks determined by
the `maxNumTasks` and `taskAssignment` [context parameters](./reference.md#context-parameters). You can set these settings individually for each query.
the `maxNumTasks` and `taskAssignment` [context parameters](./reference.md#context-parameters). You can set these settings individually for each query.

5. Worker tasks of type `query_worker` execute the query.

6. If the query is a SELECT query, the worker tasks send the results
back to the controller task, which writes them into its task report.
If the query is an INSERT or REPLACE query, the worker tasks generate and
publish new Druid segments to the provided datasource.
6. If the query is a `SELECT` query, the worker tasks send the results
back to the controller task, which writes them into its task report.
If the query is an INSERT or REPLACE query, the worker tasks generate and
publish new Druid segments to the provided datasource.

### Parallelism

Expand Down
11 changes: 6 additions & 5 deletions docs/multi-stage-query/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ description: Introduces multi-stage query architecture and its task engine
Apache Druid supports SQL-based ingestion using the bundled [`druid-multi-stage-query` extension](#load-the-extension).
This extension adds a [multi-stage query task engine for SQL](concepts.md#multi-stage-query-task-engine) that allows running SQL
[INSERT](concepts.md#insert) and [REPLACE](concepts.md#replace) statements as batch tasks. As an experimental feature,
the task engine also supports running SELECT queries as batch tasks.
the task engine also supports running `SELECT` queries as batch tasks.

Nearly all SELECT capabilities are available in the multi-stage query (MSQ) task engine, with certain exceptions listed on the [Known
issues](./known-issues.md#select) page. This allows great flexibility to apply transformations, filters, JOINs,
Nearly all `SELECT` capabilities are available in the multi-stage query (MSQ) task engine, with certain exceptions listed on the [Known
issues](./known-issues.md#select-statement) page. This allows great flexibility to apply transformations, filters, JOINs,
aggregations, and so on as part of `INSERT ... SELECT` and `REPLACE ... SELECT` statements. This also allows in-database
transformation: creating new tables based on queries of other tables.

Expand Down Expand Up @@ -64,8 +64,9 @@ To add the extension to an existing cluster, add `druid-multi-stage-query` to `d

For more information about how to load an extension, see [Loading extensions](../development/extensions.md#loading-extensions).

To use [EXTERN](reference.md#extern), you need READ permission on the resource named "EXTERNAL" of the resource type
"EXTERNAL". If you encounter a 403 error when trying to use EXTERN, verify that you have the correct permissions.
To use [EXTERN](reference.md#extern-function), you need READ permission on the resource named "EXTERNAL" of the resource type
"EXTERNAL". If you encounter a 403 error when trying to use `EXTERN`, verify that you have the correct permissions.
The same is true of any of the input-source specific table function such as `S3` or `LOCALFILES`.

## Next steps

Expand Down
28 changes: 14 additions & 14 deletions docs/multi-stage-query/known-issues.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,39 +35,39 @@ sidebar_label: Known issues
generate a large amount of output data may exhaust all available disk space. In this case, the query fails with
an [UnknownError](./reference.md#error_UnknownError) with a message including "No space left on device".

## SELECT
## `SELECT` Statement

- SELECT from a Druid datasource does not include unpublished real-time data.
- `SELECT` from a Druid datasource does not include unpublished real-time data.

- GROUPING SETS and UNION ALL are not implemented. Queries using these features return a
- `GROUPING SETS` and `UNION ALL` are not implemented. Queries using these features return a
[QueryNotSupported](reference.md#error_QueryNotSupported) error.

- For some COUNT DISTINCT queries, you'll encounter a [QueryNotSupported](reference.md#error_QueryNotSupported) error
- For some `COUNT DISTINCT` queries, you'll encounter a [QueryNotSupported](reference.md#error_QueryNotSupported) error
that includes `Must not have 'subtotalsSpec'` as one of its causes. This is caused by the planner attempting to use
GROUPING SETs, which are not implemented.
`GROUPING SET`s, which are not implemented.

- The numeric varieties of the EARLIEST and LATEST aggregators do not work properly. Attempting to use the numeric
- The numeric varieties of the `EARLIEST` and `LATEST` aggregators do not work properly. Attempting to use the numeric
varieties of these aggregators lead to an error like
`java.lang.ClassCastException: class java.lang.Double cannot be cast to class org.apache.druid.collections.SerializablePair`.
The string varieties, however, do work properly.

## INSERT and REPLACE
## `INSERT` and `REPLACE` Statements

- INSERT and REPLACE with column lists, like `INSERT INTO tbl (a, b, c) SELECT ...`, is not implemented.
- The `INSERT` and `REPLACE` statements with column lists, like `INSERT INTO tbl (a, b, c) SELECT ...`, is not implemented.

- `INSERT ... SELECT` and `REPLACE ... SELECT` insert columns from the SELECT statement based on column name. This
- `INSERT ... SELECT` and `REPLACE ... SELECT` insert columns from the `SELECT` statement based on column name. This
differs from SQL standard behavior, where columns are inserted based on position.

- INSERT and REPLACE do not support all options available in [ingestion specs](../ingestion/ingestion-spec.md),
- `INSERT` and `REPLACE` do not support all options available in [ingestion specs](../ingestion/ingestion-spec.md),
including the `createBitmapIndex` and `multiValueHandling` [dimension](../ingestion/ingestion-spec.md#dimension-objects)
properties, and the `indexSpec` [`tuningConfig`](../ingestion/ingestion-spec.md#tuningconfig) property.

## EXTERN
## `EXTERN` Function

- The [schemaless dimensions](../ingestion/ingestion-spec.md#inclusions-and-exclusions)
feature is not available. All columns and their types must be specified explicitly using the `signature` parameter
of the [EXTERN function](reference.md#extern).
of the [`EXTERN` function](reference.md#extern-function).

- EXTERN with input sources that match large numbers of files may exhaust available memory on the controller task.
- `EXTERN` with input sources that match large numbers of files may exhaust available memory on the controller task.

- EXTERN does not accept `druid` input sources. Use FROM instead.
- `EXTERN` refers to external files. Use `FROM` to access `druid` input sources.
Loading