diff --git a/docs/src/main/sphinx/admin.rst b/docs/src/main/sphinx/admin.rst index 500e01bc092c..a69c92fc1e79 100644 --- a/docs/src/main/sphinx/admin.rst +++ b/docs/src/main/sphinx/admin.rst @@ -15,6 +15,7 @@ Administration admin/dist-sort admin/dynamic-filtering admin/graceful-shutdown + admin/fault-tolerant-execution *************** Event listeners diff --git a/docs/src/main/sphinx/admin/fault-tolerant-execution.rst b/docs/src/main/sphinx/admin/fault-tolerant-execution.rst new file mode 100644 index 000000000000..4a51cf432f75 --- /dev/null +++ b/docs/src/main/sphinx/admin/fault-tolerant-execution.rst @@ -0,0 +1,351 @@ +======================== +Fault-tolerant execution +======================== + +By default, if a Trino node lacks the resources to execute a task or +otherwise fails during query execution, the query fails and must be run again +manually. The longer the runtime of a query, the more likely it is to be +susceptible to such failures. + +Fault-tolerant execution is a mechanism in Trino that enables a cluster to +mitigate query failures by retrying queries or their component tasks in +the event of failure. With fault-tolerant execution enabled, intermediate +exchange data is spooled and can be re-used by another worker in the event of a +worker outage or other fault during query execution. + +.. note:: + + Fault tolerance does not apply to broken queries or other user error. For + example, Trino does not spend resources retrying a query that fails because + its SQL cannot be parsed. + +Configuration +------------- + +Fault-tolerant execution is disabled by default. To enable the feature, set the +``retry-policy`` configuration property to either ``QUERY`` or ``TASK`` +depending on the desired :ref:`retry policy `. + +.. code-block:: properties + + retry-policy=QUERY + +The following configuration properties control the behavior of fault-tolerant +execution on a Trino cluster: + +.. list-table:: Fault-tolerant execution configuration properties + :widths: 30, 40, 30 + :header-rows: 1 + + * - Property name + - Description + - Default value + * - ``retry-policy`` + - Configures what is retried in the event of failure, either + ``QUERY`` to retry the whole query, or ``TASK`` to retry tasks + individually if they fail. See :ref:`retry policy ` for + more information. + - ``NONE`` + * - ``exchange.deduplication-buffer-size`` + - Size of the coordinator's in-memory buffer used by fault-tolerant + execution to store output of query :ref:`stages `. + If this buffer is filled during query execution, the query fails unless + an :ref:`exchange manager ` is configured. + - ``32MB`` + +.. _fte-retry-policy: + +Retry policy +------------ + +The ``retry-policy`` configuration property designates whether Trino retries +entire queries or a query's individual tasks in the event of failure. + +QUERY +^^^^^ + +A ``QUERY`` retry policy instructs Trino to automatically retry a query in the +event of an error occuring on a worker node. A ``QUERY`` retry policy is +recommended when the majority of the Trino cluster's workload consists of many +small queries, or if an :ref:`exchange manager ` is not +configured. + +By default Trino does not implement fault tolerance for queries whose result set +exceeds 32MB in size, such as :doc:`/sql/select` statements that return a very +large data set to the user. This limit can be increased by modifying the +``exchange.deduplication-buffer-size`` configuration property to be greater than +the default value of ``32MB``, but this results in higher memory usage on the +coordinator. + +To enable fault-tolerant execution on queries with a larger result set, it is +strongly recommended to configure an :ref:`exchange manager +` that utilizes external storage for spooled data and +therefore allows for storage of spilled data beyond the in-memory buffer size. + +TASK +^^^^ + +A ``TASK`` retry policy instructs Trino to retry individual query +:ref:`tasks ` in the event of failure. This policy is +recommended when executing large batch queries, as the cluster can more +efficiently retry smaller tasks within the query rather than retry the whole +query. + +``TASK`` retry policy requires a configured :ref:`exchange manager +` to store spooled exchange data used for each task. It is +also strongly recommended to set the ``query.low-memory-killer.policy`` +configuration property to ``total-reservation-on-blocked-nodes``, or queries may +need to be manually killed if the cluster runs out of memory. + +Advanced configuration +---------------------- + +You can further configure fault-tolerant execution with the following +configuration properties. The default values for these properties should work +for most deployments, but you can change these values for testing or +troubleshooting purposes. + +Retry limits +^^^^^^^^^^^^ + +The following configuration properties control the thresholds at which +queries/tasks are no longer retried in the event of repeated failures: + +.. list-table:: Fault tolerance retry limit configuration properties + :widths: 30, 40, 30, 30 + :header-rows: 1 + + * - Property name + - Description + - Default value + - Retry policy + * - ``query-retry-attempts`` + - Maximum number of times Trino may attempt to retry a query before + declaring the query as failed. + - ``4`` + - Only ``QUERY`` + * - ``task-retry-attempts-overall`` + - Maximum number retries across all tasks within a given query + before declaring the query as failed. + - ``null`` (no limit) + - Only ``TASK`` + * - ``task-retry-attempts-per-task`` + - Maximum number of times Trino may attempt to retry a single task before + declaring the query as failed. + - ``2`` + - Only ``TASK`` + * - ``retry-initial-delay`` + - Minimum time that a failed query must wait before it is retried. May be + overridden with the ``retry_initial_delay`` :ref:`session property + `. + - ``10s`` + - Only ``QUERY`` + * - ``retry-max-delay`` + - Maximum time that a failed query must wait before it is retried. + Wait time is increased on each subsequent query failure. May be + overridden with the ``retry_initial_delay`` :ref:`session property + `. + - ``1m`` + - Only ``QUERY`` + +Task sizing +^^^^^^^^^^^ + +With a ``TASK`` retry policy, it is important to manage the amount of data +processed in each task. If tasks are too small, the management of task +coordination can take more processing time and resources than executing the task +itself. If tasks are too large, then a single task may require more resources +than are available on any one node and therefore prevent the query from +completing. + +Trino supports limited automatic task sizing. If issues are occurring +during fault-tolerant task execution, you can configure the following +configuration properties to manually control task sizing. These configuration +properties only apply to a ``TASK`` retry policy. + +.. list-table:: Task sizing configuration properties + :widths: 30, 40, 30 + :header-rows: 1 + + * - Property name + - Description + - Default value + * - ``fault-tolerant-execution-target-task-input-size`` + - Target size in bytes of all task inputs for a single fault-tolerant task. + Applies to tasks that read input from spooled data written by other + tasks. + + May be overridden for the current session with the + ``fault_tolerant_execution_target_task_input_size`` + :ref:`session property `. + - ``1GB`` + * - ``fault-tolerant-execution-target-task-split-count`` + - Target number of standard :ref:`splits ` processed + by a single task that reads data from source tables. Value is interpreted + with split weight taken into account. If the weight of splits produced by + a catalog denotes that they are lighter or heavier than "standard" split, + then the number of splits processed by single task is adjusted + accordingly. + + May be overridden for the current session with the + ``fault_tolerant_execution_target_task_split_count`` + :ref:`session property `. + - ``16`` + * - ``fault-tolerant-execution-min-task-split-count`` + - Minimum number of :ref:`splits ` processed by + a single task. This value is not split weight-adjusted and serves as + protection against situations where catalogs report an incorrect split + weight. + + May be overridden for the current session with the + ``fault_tolerant_execution_min_task_split_count`` + :ref:`session property `. + - ``16`` + * - ``fault-tolerant-execution-max-task-split-count`` + - Maximum number of :ref:`splits ` processed by a + single task. This value is not split weight-adjusted and serves as + protection against situations where catalogs report an incorrect split + weight. + + May be overridden for the current session with the + ``fault_tolerant_execution_max_task_split_count`` + :ref:`session property `. + - ``256`` + +Node allocation +^^^^^^^^^^^^^^^ + +With a ``TASK`` retry policy, nodes are allocated to tasks based on available +memory and estimated memory usage. If task failure occurs due to exceeding +available memory on a node, the task is restarted with a request to allocate the +full node for its execution. + +The initial task memory-requirements estimation is static and configured with +the ``fault-tolerant-task-memory`` configuration property. This property only +applies to a ``TASK`` retry policy. + +.. list-table:: Node allocation configuration properties + :widths: 30, 40, 30 + :header-rows: 1 + + * - Property name + - Description + - Default value + * - ``fault-tolerant-execution-task-memory`` + - Initial task memory estimation used for bin-packing when allocating nodes + for tasks. May be overridden for the current session with the + ``fault_tolerant_execution_task_memory`` + :ref:`session property `. + - ``4GB`` + +Other tuning +^^^^^^^^^^^^ + +The following additional configuration property can be used to manage +fault-tolerant execution: + +.. list-table:: Other fault-tolerant execution configuration properties + :widths: 30, 40, 30, 30 + :header-rows: 1 + + * - Property name + - Description + - Default value + - Retry policy + * - ``fault-tolerant-execution-task-descriptor-storage-max-memory`` + - Maximum amount of memory to be used to store task descriptors for fault + tolerant queries on coordinator. Extra memory is needed to be able to + reschedule tasks in case of a failure. + - (JVM heap size * 0.15) + - Only ``TASK`` + +.. _fte-exchange-manager: + +Exchange manager +---------------- + +Exchange spooling is responsible for storing and managing spooled data for +fault-tolerant execution. You can configure a filesystem-based exchange manager +that stores spooled data in a specified location, either an S3-compatible +storage system or a local filesystem. + +To configure an exchange manager, create a new +``etc/exchange-manager.properties`` configuration file on the coordinator and +all worker nodes. In this file, set the ``exchange-manager.name`` configuration +propertry to ``filesystem``, and additional configuration properties as needed +for your storage solution. + +.. list-table:: Exchange manager configuration properties + :widths: 30, 40, 30 + :header-rows: 1 + + * - Property name + - Description + - Default value + * - ``exchange.base-directory`` + - The base directory URI location that the exchange manager uses to store + spooling data. Only supports S3 and local filesystems. + - + * - ``exchange.encryption-enabled`` + - Enable encrypting of spooling data. + - ``true`` + * - ``exchange.sink-buffer-pool-min-size`` + - The minimum buffer pool size for an exchange sink. The larger the buffer + pool size, the larger the write parallelism and memory usage. + - ``10`` + * - ``exchange.source-concurrent-reader`` + - The number of concurrent readers to read from spooling storage. The + larger the number of concurrent readers, the larger the read parallelism + and memory usage. + - ``4`` + * - ``exchange.s3.aws-access-key`` + - AWS access key to use. Required for a connection to AWS S3, can be + ignored for other S3 storage systems. + - + * - ``exchange.s3.aws-secret-key`` + - AWS secret key to use. Required for a connection to AWS S3, can be + ignored for other S3 storage systems. + - + * - ``exchange.s3.region`` + - Region of the S3 bucket. + - + * - ``exchange.s3.endpoint`` + - S3 storage endpoint server if using an S3-compatible storage system that + is not AWS. If using AWS S3, can be ignored. + - + * - ``exchange.s3.max-error-retries`` + - Maximum number of times the exchange manager's S3 client should retry + a request. + - ``3`` + * - ``exchange.s3.upload.part-size`` + - Part size for S3 multi-part upload. + - ``5MB`` + +The following example ``exchange-manager.properties`` configuration specifies an +AWS S3 bucket as the spooling storage destination. Note that the destination +does not have to be in AWS, but can be any S3-compatible storage system. + +.. code-block:: properties + + exchange-manager.name=filesystem + exchange.base-directory=s3n://trino-exchange-manager + exchange.encryption-enabled=true + exchange.s3.region=us-west-1 + exchange.s3.aws-access-key=example-access-key + exchange.s3.aws-secret-key=example-secret-key + +The following example ``exchange-manager.properties`` configuration specifies a +local directory, ``/tmp/trino-exchange-manager``, as the spooling storage +destination. + +.. note:: + + It is only recommended to use a local filesystem for exchange in standalone, + non-production clusters. A local directory can only be used for exchange in + a distributed cluster if the exchange directory is shared and accessible + from all worker nodes. + +.. code-block:: properties + + exchange-manager.name=filesystem + exchange.base-directory=/tmp/trino-exchange-manager diff --git a/docs/src/main/sphinx/admin/properties-query-management.rst b/docs/src/main/sphinx/admin/properties-query-management.rst index 194f11239849..62ca74a98899 100644 --- a/docs/src/main/sphinx/admin/properties-query-management.rst +++ b/docs/src/main/sphinx/admin/properties-query-management.rst @@ -32,6 +32,20 @@ stages of a query. You can use the following execution policies: The number of partitions to use for processing distributed operations, such as joins, aggregations, partitioned window functions and others. +``query.low-memory-killer.policy`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** :ref:`prop-type-string` +* **Default value:** ``total-reservation-on-blocked-nodes`` + +Configures the behavior to handle killing running queries in the event of low +memory availability. Supports the following values: + +* ``none`` - Do not kill any queries in the event of low memory. +* ``total-reservation`` - Kill the query currently using the most total memory. +* ``total-reservation-on-blocked-nodes`` - Kill the query currently using the + most memory specifically on nodes that are now out of memory. + ``query.max-execution-time`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -103,3 +117,17 @@ removed based on age. The minimal age of a query in the history before it is expired. An expired query is removed from the query history buffer and no longer available in the :doc:`/admin/web-interface`. + +``retry-policy`` +^^^^^^^^^^^^^^^^ + +* **Type:** :ref:`prop-type-string` +* **Default value:** ``NONE`` + +The :ref:`retry policy ` to use for +:doc:`/admin/fault-tolerant-execution`. Supports the following values: + +* ``NONE`` - Disable fault-tolerant execution. +* ``TASK`` - Retry individual tasks within a query in the event of failure. + Requires configuration of an :ref:`exchange manager `. +* ``QUERY`` - Retry the whole query in the event of failure. \ No newline at end of file diff --git a/docs/src/main/sphinx/admin/properties-resource-management.rst b/docs/src/main/sphinx/admin/properties-resource-management.rst index ccbb9be4bc59..79d0151b593a 100644 --- a/docs/src/main/sphinx/admin/properties-resource-management.rst +++ b/docs/src/main/sphinx/admin/properties-resource-management.rst @@ -56,3 +56,12 @@ must be greater than ``query.max-memory``. This is the amount of memory set aside as headroom/buffer in the JVM heap for allocations that are not tracked by Trino. + +``exchange.deduplication-buffer-size`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** :ref:`prop-type-data-size` +* **Default value:** ``32MB`` + +Size of the buffer used for spooled data during +:doc:`/admin/fault-tolerant-execution`. diff --git a/docs/src/main/sphinx/overview/concepts.rst b/docs/src/main/sphinx/overview/concepts.rst index 57f0b6b8ba04..28f2a3f1532d 100644 --- a/docs/src/main/sphinx/overview/concepts.rst +++ b/docs/src/main/sphinx/overview/concepts.rst @@ -176,6 +176,8 @@ that statement. A query encompasses stages, tasks, splits, connectors, and other components and data sources working in concert to produce a result. +.. _trino-concept-stage: + Stage ^^^^^ @@ -192,6 +194,8 @@ the output from other stages. Stages are what the coordinator uses to model a distributed query plan, but stages themselves don't run on Trino workers. +.. _trino-concept-task: + Task ^^^^ @@ -208,6 +212,8 @@ task has inputs and outputs, and just as a stage can be executed in parallel by a series of tasks, a task is executing in parallel with a series of drivers. +.. _trino-concept-splits: + Split ^^^^^