Skip to content
This repository has been archived by the owner on Aug 4, 2023. It is now read-only.

Terminate PG query when task is killed via Airflow #717

Merged
merged 49 commits into from
Feb 28, 2023

Conversation

rwidom
Copy link
Collaborator

@rwidom rwidom commented Sep 6, 2022

Fixes

Fixes WordPress/openverse#1455 by @AetherUnbound

Description

This summarizes a lot of discussion (and help from @stacimc and @AetherUnbound!):

  • Big picture goal: make sure that airflow task execution_timeouts are enforced in the postgres database.
  • Big picture design: create new postgres hook and operator that run SET statement_timeout TO '{execution_timeout}s'; before any other SQL, so that postgres knows when to kill long-running queries.

Testing Instructions

Switch to the branch and just test, and then run some dags. I ran the Cleveland Museum and iNaturalist (with small test files) locally and it went fine.

Checklist

  • My pull request has a descriptive title (not a vague title like Update index.md).
  • My pull request targets the default branch of the repository (main) or a parent feature branch.
  • My commit messages follow best practices.
  • My code follows the established code style of the repository.
  • I added or updated tests for the changes I made (if applicable).
  • I added or updated documentation (if applicable).
  • I tried running the project locally and verified that there are no visible errors.

Developer Certificate of Origin

Developer Certificate of Origin
Developer Certificate of Origin
Version 1.1

Copyright (C) 2004, 2006 The Linux Foundation and its contributors.
1 Letterman Drive
Suite D4700
San Francisco, CA, 94129

Everyone is permitted to copy and distribute verbatim copies of this
license document, but changing it is not allowed.


Developer's Certificate of Origin 1.1

By making a contribution to this project, I certify that:

(a) The contribution was created in whole or in part by me and I
    have the right to submit it under the open source license
    indicated in the file; or

(b) The contribution is based upon previous work that, to the best
    of my knowledge, is covered under an appropriate open source
    license and I have the right under that license to submit that
    work with modifications, whether created in whole or in part
    by me, under the same open source license (unless I am
    permitted to submit under a different license), as indicated
    in the file; or

(c) The contribution was provided directly to me by some other
    person who certified (a), (b) or (c) and I have not modified
    it.

(d) I understand and agree that this project and the contribution
    are public and that a record of the contribution (including all
    personal information I submit with it, including my sign-off) is
    maintained indefinitely and may be redistributed consistent with
    this project or the open source license(s) involved.

@openverse-bot openverse-bot added 💻 aspect: code Concerns the software code in the repository 🛠 goal: fix Bug fix 🟧 priority: high Stalls work on the project or its dependents labels Sep 6, 2022
Copy link
Contributor

@stacimc stacimc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks really great so far! The approach seems sound to me, although I agree this will be a tricky one to test.

I tried testing this by just adding a SELECT pg_sleep(120) to the create_loading_table task, and adjusting its execution timeout to 10 seconds. When I ran a provider script locally, though, the task hung for the full 120 seconds and only then raised a Timeout error 🤔 Are you seeing this too?

I wouldn't want to kill a long-running db job that just needs to run, but I would want to be able to kill other long-running things that might get in the first one's way.

Sorry, I didn't quite get this -- can you give an example of what you mean here?

@rwidom
Copy link
Collaborator Author

rwidom commented Sep 7, 2022

Thanks @stacimc !
I really just meant "Are you sure we should automatically kill all of these queries? Do we need any other filter?" (in the output below, one is from running the phyopic DAG in airflow and the other is from running just test.)
It may be that the way that airflow manages threads the application_name thing won't work though, because the airflow log says it executed the set application name statement, but the database isn't showing it in the query for PID. Also the task isn't marked as failed, I guess...
And maybe the answer is just "yeah, don't worry about it, kill them all!"
I'll commit the changes I made to generate these test results. Maybe I misunderstood what you did?
--R

Airflow log
*** Reading remote log from s3://openverse-airflow-logs/dag_id=phylopic_workflow/run_id=scheduled__2011-01-02T00:00:00+00:00/task_id=load_image_data.create_loading_table/attempt=1.log.
[2022-09-07, 23:21:56 UTC] {taskinstance.py:1179} INFO - Dependencies all met for <TaskInstance: phylopic_workflow.load_image_data.create_loading_table scheduled__2011-01-02T00:00:00+00:00 [queued]>
[2022-09-07, 23:21:56 UTC] {taskinstance.py:1179} INFO - Dependencies all met for <TaskInstance: phylopic_workflow.load_image_data.create_loading_table scheduled__2011-01-02T00:00:00+00:00 [queued]>
[2022-09-07, 23:21:56 UTC] {taskinstance.py:1376} INFO - 
--------------------------------------------------------------------------------
[2022-09-07, 23:21:56 UTC] {taskinstance.py:1377} INFO - Starting attempt 1 of 3
[2022-09-07, 23:21:56 UTC] {taskinstance.py:1378} INFO - 
--------------------------------------------------------------------------------
[2022-09-07, 23:21:56 UTC] {taskinstance.py:1397} INFO - Executing <Task(PythonOperator): load_image_data.create_loading_table> on 2011-01-02 00:00:00+00:00
[2022-09-07, 23:21:56 UTC] {standard_task_runner.py:52} INFO - Started process 2946 to run task
[2022-09-07, 23:21:56 UTC] {standard_task_runner.py:79} INFO - Running: ['***', 'tasks', 'run', 'phylopic_workflow', 'load_image_data.create_loading_table', 'scheduled__2011-01-02T00:00:00+00:00', '--job-id', '5', '--raw', '--subdir', 'DAGS_FOLDER/providers/provider_workflow_dag_factory.py', '--cfg-path', '/tmp/tmpnkh29a9t', '--error-file', '/tmp/tmpfuoqmyw_']
[2022-09-07, 23:21:56 UTC] {standard_task_runner.py:80} INFO - Job 5: Subtask load_image_data.create_loading_table
[2022-09-07, 23:21:56 UTC] {task_command.py:371} INFO - Running <TaskInstance: phylopic_workflow.load_image_data.create_loading_table scheduled__2011-01-02T00:00:00+00:00 [running]> on host 3b0b8f9e8fe3
[2022-09-07, 23:21:56 UTC] {taskinstance.py:1589} INFO - Exporting the following env vars:
[email protected]
AIRFLOW_CTX_DAG_OWNER=data-eng-admin
AIRFLOW_CTX_DAG_ID=phylopic_workflow
AIRFLOW_CTX_TASK_ID=load_image_data.create_loading_table
AIRFLOW_CTX_EXECUTION_DATE=2011-01-02T00:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2011-01-02T00:00:00+00:00
[2022-09-07, 23:21:56 UTC] {crypto.py:82} WARNING - empty cryptography key - values will not be stored encrypted.
[2022-09-07, 23:21:56 UTC] {base.py:68} INFO - Using connection ID 'postgres_openledger_upstream' for task execution.
[2022-09-07, 23:21:56 UTC] {dbapi.py:231} INFO - Running statement: SET application_name = 'phylopic_workflow__load_image_data.create_loading_table__20110102';
, parameters: None
[2022-09-07, 23:21:56 UTC] {base.py:68} INFO - Using connection ID 'postgres_openledger_upstream' for task execution.
[2022-09-07, 23:21:56 UTC] {dbapi.py:231} INFO - Running statement: 
  select pg_sleep(120);
  CREATE TABLE public.provider_data_image_phylopic_20110102T000000(
  foreign_identifier character varying(3000),
foreign_landing_url character varying(1000),
url character varying(3000),
thumbnail character varying(3000),
filetype character varying(5),
filesize integer,
license character varying(50),
license_version character varying(25),
creator character varying(2000),
creator_url character varying(2000),
title character varying(5000),
meta_data jsonb,
tags jsonb,
category character varying(80),
watermarked boolean,
provider character varying(80),
source character varying(80),
ingestion_type character varying(80),
width integer,
height integer);
, parameters: None
[2022-09-07, 23:23:57 UTC] {base.py:68} INFO - Using connection ID 'postgres_openledger_upstream' for task execution.
[2022-09-07, 23:23:57 UTC] {dbapi.py:231} INFO - Running statement: ALTER TABLE public.provider_data_image_phylopic_20110102T000000 OWNER TO ***;, parameters: None
[2022-09-07, 23:23:57 UTC] {base.py:68} INFO - Using connection ID 'postgres_openledger_upstream' for task execution.
[2022-09-07, 23:23:57 UTC] {dbapi.py:231} INFO - Running statement: 
CREATE INDEX IF NOT EXISTS provider_data_image_phylopic_20110102T000000_provider_key
ON public.provider_data_image_phylopic_20110102T000000 USING btree (provider);
, parameters: None
[2022-09-07, 23:23:57 UTC] {base.py:68} INFO - Using connection ID 'postgres_openledger_upstream' for task execution.
[2022-09-07, 23:23:57 UTC] {dbapi.py:231} INFO - Running statement: 
CREATE INDEX IF NOT EXISTS provider_data_image_phylopic_20110102T000000_foreign_identifier_key
ON public.provider_data_image_phylopic_20110102T000000 USING btree (provider, md5((foreign_identifier)::text));
, parameters: None
[2022-09-07, 23:23:57 UTC] {base.py:68} INFO - Using connection ID 'postgres_openledger_upstream' for task execution.
[2022-09-07, 23:23:57 UTC] {dbapi.py:231} INFO - Running statement: 
CREATE INDEX IF NOT EXISTS provider_data_image_phylopic_20110102T000000_url_key
ON public.provider_data_image_phylopic_20110102T000000 USING btree (provider, md5((url)::text));
, parameters: None
[2022-09-07, 23:23:57 UTC] {python.py:173} INFO - Done. Returned value was: None
[2022-09-07, 23:23:57 UTC] {taskinstance.py:1415} INFO - Marking task as SUCCESS. dag_id=phylopic_workflow, task_id=load_image_data.create_loading_table, execution_date=20110102T000000, start_date=20220907T232156, end_date=20220907T232357
[2022-09-07, 23:23:57 UTC] {local_task_job.py:156} INFO - Task exited with return code 0
[2022-09-07, 23:23:57 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check
DB jobs query
SELECT pid, age(clock_timestamp(), query_start), usename, application_name, query 
 FROM pg_stat_activity 
 WHERE query != '<IDLE>' AND query NOT ILIKE '%pg_stat_activity%' 
 ORDER BY query_start desc;
+------+----------------+---------+------------------+---------------------------------------------------------------------+
| pid  | age            | usename | application_name | query                                                               |
|------+----------------+---------+------------------+---------------------------------------------------------------------|
| 31   | <null>         | <null>  |                  |                                                                     |
| 33   | <null>         | deploy  |                  |                                                                     |
| 29   | <null>         | <null>  |                  |                                                                     |
| 28   | <null>         | <null>  |                  |                                                                     |
| 30   | <null>         | <null>  |                  |                                                                     |
| 43   | 0:00:00.408960 | airflow |                  | COMMIT                                                              |
| 44   | 0:00:14.036838 | airflow |                  | COMMIT                                                              |
| 1363 | 0:00:52.850229 | airflow |                  | COMMIT                                                              |
| 49   | 0:00:52.851721 | airflow |                  | COMMIT                                                              |
| 1364 | 0:00:53.157167 | airflow |                  | COMMIT                                                              |
| 1365 | 0:00:53.159587 | airflow |                  | COMMIT                                                              |
| 1366 | 0:00:53.162553 | airflow |                  | COMMIT                                                              |
| 47   | 0:00:53.368939 | airflow |                  | COMMIT                                                              |
| 1766 | 0:01:01.635456 | deploy  |                  |                                                                     |
|      |                |         |                  |   select pg_sleep(120);                                             |
|      |                |         |                  |   CREATE TABLE public.provider_data_image_phylopic_20110109T000000( |
|      |                |         |                  |   foreign_identifier character varying(3000),                       |
|      |                |         |                  | foreign_landing_url character varying(1000),                        |
|      |                |         |                  | url character varying(3000),                                        |
|      |                |         |                  | thumbnail character varying(3000),                                  |
|      |                |         |                  | filetype character varying(5),                                      |
|      |                |         |                  | filesize integer,                                                   |
|      |                |         |                  | license character varying(50),                                      |
|      |                |         |                  | license_version character varying(25),                              |
|      |                |         |                  | creator character varying(2000),                                    |
|      |                |         |                  | creator_url character varying(2000),                                |
|      |                |         |                  | title character varying(5000),                                      |
|      |                |         |                  | meta_data jsonb,                                                    |
|      |                |         |                  | tags jsonb,                                                         |
|      |                |         |                  | category character varying(80),                                     |
|      |                |         |                  | watermarked boolean,                                                |
|      |                |         |                  | provider character varying(80),                                     |
|      |                |         |                  | source character varying(80),                                       |
|      |                |         |                  | ingestion_type character varying(80),                               |
|      |                |         |                  | width integer,                                                      |
|      |                |         |                  | height integer);                                                    |
| 1574 | 0:01:53.200383 | deploy  |                  |                                                                     |
|      |                |         |                  |   select pg_sleep(120);                                             |
|      |                |         |                  |   CREATE TABLE public.provider_data_image__7550167585681209474(     |
|      |                |         |                  |   foreign_identifier character varying(3000),                       |
|      |                |         |                  | foreign_landing_url character varying(1000),                        |
|      |                |         |                  | url character varying(3000),                                        |
|      |                |         |                  | thumbnail character varying(3000),                                  |
|      |                |         |                  | filetype character varying(5),                                      |
|      |                |         |                  | filesize integer,                                                   |
|      |                |         |                  | license character varying(50),                                      |
|      |                |         |                  | license_version character varying(25),                              |
|      |                |         |                  | creator character varying(2000),                                    |
|      |                |         |                  | creator_url character varying(2000),                                |
|      |                |         |                  | title character varying(5000),                                      |
|      |                |         |                  | meta_data jsonb,                                                    |
|      |                |         |                  | tags jsonb,                                                         |
|      |                |         |                  | category character varying(80),                                     |
|      |                |         |                  | watermarked boolean,                                                |
|      |                |         |                  | provider character varying(80),                                     |
|      |                |         |                  | source character varying(80),                                       |
|      |                |         |                  | ingestion_type character varying(80),                               |
|      |                |         |                  | width integer,                                                      |
|      |                |         |                  | height integer);                                                    |
| 1367 | 0:03:07.974316 | airflow |                  | COMMIT                                                              |
| 50   | 0:03:08.020433 | airflow |                  | COMMIT                                                              |
| 52   | 0:03:15.860842 | airflow |                  | COMMIT                                                              |
| 1359 | 0:03:53.310257 | deploy  |                  | COMMIT                                                              |
| 1236 | 0:05:50.341626 | airflow |                  | COMMIT                                                              |
| 1185 | 0:05:52.497251 | airflow |                  | COMMIT                                                              |
| 42   | 0:16:23.451864 | airflow |                  | COMMIT                                                              |
+------+----------------+---------+------------------+---------------------------------------------------------------------+
SELECT 22
Time: 0.024s

@stacimc
Copy link
Contributor

stacimc commented Sep 9, 2022

So, I think that we want to kill jobs associated with the failed task, but not other jobs, associated with other ongoing tasks. application_name and task_instance_key_str together could help, and that's the design represented here.

Got it, thank you! And I think you're right, we'll want to make sure we're only killing jobs associated with the failed task.

I checked out the temp commit you pushed to test a long-running query on create_loading_table (thank you!). That's exactly how I was initially testing as well! When I run it locally, I see that the entire pg_sleep runs, even though it exceeds the execution_timeout on the task 🤔 Said another way: the task ignores its execution_timeout and just hangs until the long-running query completes, so the new on_failure_callback is never hit.

I did a little bit of digging and got as far as finding a way of getting the timeouts to work in PostgresOperators, by passing in a statement_timeout. Here's an example task I made and added to the provider DAGs for testing:

sql_task = PostgresOperator(
    task_id="sql_task",
    postgres_conn_id=DB_CONN_ID,
    sql=(
        "select pg_sleep(120)"
    ),
    execution_timeout=timedelta(seconds=5) # This timeout is totally ignored
    runtime_parameters={'statement_timeout': '10s'}, # This timeout is respected
)

...

# How I added it to the DAG
sql_task >> [create_loading_table, copy_to_s3] >> load_from_s3

When I run that, the execution_timeout is still totally ignored, but the statement_timeout is respected and in fact the query is cancelled for me. In the logs I see:

psycopg2.errors.QueryCanceled: canceling statement due to statement timeout

I don't know how helpful this is, but maybe something to look into?

@rwidom
Copy link
Collaborator Author

rwidom commented Sep 19, 2022

Thanks so much @stacimc !

I have code in a temporary test dag that I just committed that works for both setting the database statement_timeout to have postgres proactively kill the job, and application_name to enable airflow to go back and kill the job after the fact (e.g. if it fails for reasons other than timing out?).

So to the extent that my goal was to learn more about the broad array of possible ways to get this done, I feel pretty accomplished. 🤣 But I still am not 100% sure what would be the best thing to do for openverse. Here are some notes based on the different ways that the catalog is running postgres jobs, and what I'm learning about these different kill-the-job design options.

PG connection tool Catalog Code Notes
PostgresOperator inaturalist.py Postgres Operator won't parse airflow variables in the SQL statement, parameters, or runtime parameters, maybe as a protection against SQL injection. So, they have to be hard coded with the task and can't be differentiated across runs of the same task created by dag factories. However, since inaturalist is only a monthly run, and the postgres operator is used for a limited set of pre- and post- ingestion tasks, Airflow templating is not crucial here.
PostgresResultOperator class definition We could modify this class to set the application_name and statement_timeout by default, but we still wouldn't be able to pass in the regular Airflow jinja templates.
PostgresResultOperator data refresh dag factory This is the only place that currently uses the result operator, and most of the underlying work for the refresh is done using a hook in popularity/sql.py.
PostgresHook common/loader/sql.py This gets called from a million different provider DAGs, which may have very different data volumes, timeouts, run frequencies, etc. So it seemed important to have a way to pass those details in to the database. It gets called only from common / loader / loader.py though, so those parameters will need to get passed through a bunch of layers.
PostgresHook common/popularity/sql.py Seems like this is just run as part of the data refresh, but there are multiple calls to the PostgresHook for different steps along the way.
PostgresHook inaturalist.py Main processing for this provider happens in SQL. Static preingestion steps use PostgresOperator, but the dynamic calls to get batches of data use the hook. Probably would rather not adjust database settings on each and every call, but probably could set things up to run once at initialization.
psycopg2 merge common crawl tags Is this something that gets run regularly or was it a onetime thing as part of restructuring things?

So, yeah, I think that my next steps / open questions are:

  • figuring out if/how we can pass the task-specific execution_timeout down to SQL loader, or finding out if there is a number that we feel comfortable using across all provider scripts, or maybe exploring numbers we could add to provider_ingestion_workflows as a parameter? How would that work for the data refresh?
  • figuring out how we feel about risks around using task_instance_key_str and ts_nodash in database settings, how important killing jobs for non-timeout reasons is and whether pursuing the application_name approach even is, whether focusing on the statement_timeout for now would be better.
  • figuring out how broadly we want this to apply: all of the examples in the table above? some of them? which? etc.

Thanks again!

@stacimc
Copy link
Contributor

stacimc commented Sep 20, 2022

This is such an interesting PR, @rwidom! I am learning a lot about how this works 😄 I love that you've added the temporary test DAG, that makes it so easy to play around with. I have a bunch of thoughts, apologies for not organizing them super well. Let me know if this makes sense!

Commoncrawl

For psycopg2, it looks like that's only used in the crawler as you've pointed out. The crawler hasn't been touched in a long time and is not running, and to my knowledge it's not been prioritized in any near term. I think we can avoid spending too much time on this part, but it's awesome that you've gotten it working!

Custom operators

As I understand what you've said: with PostgresOperators, the only problem is that to use the current approach, we still need to pass in an application_name and this is difficult because we can't use Jinja templating here

  • Could we make a new operator, extending PostgresOperator? It could take care of adding the application_name and statement_timeout. I haven't fully fleshed this out, but I do know that from within the execute method, PostgresOperator has access to self.dag_id and self.task_id. There may be a way to access the run_id directly, and worst case you can probably query the dagruns to get it? Worth exploring a little deeper, maybe!

Just for the sake of discussion 😄, have you thought about making a custom operator for PythonOperators using PostgresHook to encapsulate this logic there as well? Curious what you think, but if we go down that road we should definitely separate the work out into multiple PRs/tickets to make it easier to manage!

Approach/scope

if there is a number that we feel comfortable using across all provider scripts, or maybe exploring numbers we could add to provider_ingestion_workflows as a parameter?

I think the way we've been writing these, it's safe to assume the execution_timeout for the task should be the statement_timeout. If down the line we found a case where those numbers should be separate, maybe the workflow configuration classes would be a cool place to store it :)

whether pursuing the application_name approach even is, whether focusing on the statement_timeout for now would be better

Good question! Your idea that we'll need to be careful to kill only the query associated with the timed out task makes intuitive sense to me, but I'm really not very familiar with the database internals. Pinging @AetherUnbound for your thoughts on this especially 🧠 I do like that this approach lets you query for the specific query to kill using nothing but the task context. If we end up making our own custom operator, maybe there's something we can hook into to do clean up on timeout? I only looked briefly at the source, maybe on_kill?

figuring out how broadly we want this to apply: all of the examples in the table above? some of them? which? etc.

Another good question 😄 I think this would be useful just about everywhere, certainly in the loader steps of the provider DAGs. It's totally reasonable for this to be split across multiple PRs, however! And as I said, the commoncrawl steps can be deprioritized since that's not running and we're not able to test it.

@rwidom
Copy link
Collaborator Author

rwidom commented Sep 21, 2022

Ooh! I didn't know about self.dag_id or self.task_id! That makes so much sense. Does that mean that there is also a self.execution_timeout and a self.ts_nodash? I have to look at that, because if so, I do think that a custom hook and custom operator (maybe adding on to our existing custom pg operator?) could be a really nice and general solution (maybe in 2 separate PRs). And maybe I totally misunderstood the reason why the operator wasn't parsing variables the way I expected. Thanks @stacimc ! And thanks in advance for your thoughts @AetherUnbound !

@AetherUnbound AetherUnbound changed the title Issue/690 kill pg query from airflow Terminate PG query when task is killed via Airflow Oct 5, 2022
@AetherUnbound
Copy link
Contributor

AetherUnbound commented Oct 5, 2022

Sorry it took me so long to get to this! I love the deep dive you've done @rwidom and I've learned a bunch 🚀 The statement_timeout piece is one I feel like I definitely should have been aware of before 😅 But nonetheless I'm so glad you've found it @stacimc, it seems perfect!

One thing I'm getting caught up on as you're describing this:

Postgres Operator won't parse airflow variables in the SQL statement, parameters, or runtime parameters, maybe as a protection against SQL injection.

This doesn't quite match my understanding of what Airflow is capable of, do you mind elaborating on this a bit? At least with the PostgresOperator, the sql parameter is a templated field so we should be able to do any manner of Airflow templating we'd like there:

https://github.com/apache/airflow/blob/84d915c249617aedaf0c451e9f9d19a4e7ff767b/airflow/providers/postgres/operators/postgres.py#L47

If we wanted other attributes (e.g. runtime_parameters) to be templated, we could subclass the PostgresOperator (which is an idea already floating around in this PR!) and add it to the list of template_fields.

Personally, I think that the best way to go would be runtime_parameters exclusively. Don't get me wrong, the kill-based-on-application_name-and-task-context code you've drawn up is SO COOL. I'm way impressed that's possible at all and the code you've set up to accomplish that is really slick. But I do think that managing timeouts around the runtime_parameters's statement_timeout would be the most ideal route. It feels like what we'll want down the line is to have that field automatically be populated for PostgresOperator steps based on the execution_timeout.

Heck, we may even want a PostgresHook wrapper which brings all the logic in PostgresOperator::execute into PostgresHook::run so we can enforce timeouts there 😮 Even if we're running a PythonOperator that uses a PostgresHook, we'd still get into the scenario where a PG query either hangs or doesn't get terminated on the backend even though the python thread does. Jeez, now as I'm saying that I feel like those cases could make use of the kill-query-on-failure logic 😅 Perhaps we could at least start with utilizing statement_timeout, and see if we continue to encounter issues where we'd want the kill-query-on-failure logic then implement that down the line as needed?

@rwidom
Copy link
Collaborator Author

rwidom commented Jan 9, 2023

Hey, @AetherUnbound, I was just thinking about returning to this. Sorry for the delay! Is it still high priority? Does #939 make it irrelevant now? I am more familiar with xcoms and parameters now, so I might be better equipped to finish this up, but just trying to figure out if that makes sense right now.

@AetherUnbound
Copy link
Contributor

I don't think this is made irrelevant by #939, but the diff might look a bit different because of the dependency changes there. I think terminating the query is still important and so could still be high priority, but I could also see it being moved down to medium since we haven't encountered this particular issue recently 🤔 What do you think @WordPress/openverse-catalog?

@rwidom rwidom force-pushed the issue/690-kill-pg-query-from-airflow branch from b79b9bc to a77c6b2 Compare January 29, 2023 23:52
@rwidom
Copy link
Collaborator Author

rwidom commented Jan 30, 2023

OK, I don't know if it's just that I haven't had enough real focus time on this, or that I'm really missing the boat on some basic airflow concepts or what, but here's where I think things are...

  • I tried (see the dag in test_timeout_pg.py), and I don't think I can recreate the original issue.
  • However, that might be about failures in using task_instance attributes to identify processes using postgres application_name. In an earlier draft, I had planned to use task_instance_key_str (see https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html) but I was already concerned about tasks that run more than once a day, and now it definitely will not allow us to differentiate across mapped processes within a single task instance, which would really be a bummer if we start running mapped tasks in parallel like we've discussed for inaturalist. And/or, ideally we could pass the task info to set the database statement_timeout to match the airflow_timeout.
  • I'm not 100% clear from the original issue (#690) on whether idle state processes are the main concern or active running processes that have been abandoned. But for idle processes there might be a more general setting we could use, see https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/_modules/airflow/providers/postgres/hooks/postgres.html. And if there are really just some very long-running jobs that inspired this, maybe we should be defining separate connections for them with appropriate settings to automatically kill off idle processes?

So, I've been doing a lot of thinking in circles, and not making a ton of real progress, but now you have the latest @AetherUnbound and @stacimc . Help?

@AetherUnbound
Copy link
Contributor

@rwidom and I had a quick synchronous discussion where we talked about the various possibilities for this endeavor and what might make the most sense moving forward. She prepared this excellent doc describing what possibilities we have. Here are the outcomes of that discussion:

  • Airflow can't be trusted to manage the timeout of the Postgres steps because it will just hang until the Postgres query is done executing. This makes trying to employ anything via the on_failure_callback useless since the callback won't run until the query is complete anyway. With this in mind we are abandoning the approach of using application_name & other identifiers to find and stop the query from Airflow.
  • We want to use the statement_timeout directive on the Postgres end for setting statement timeouts. We will create a new PostgresHook (which will be a subclass of the airflow.providers.postgres.hooks.postgres.PostgresHook) which will allow both a hook-level default statement timeout AND a statement_timeout parameter to the run function on the hook.
  • We will also add a new Operator (likely to be named PgExecuteQueryOperator) which will automatically hand the execution_timeout of the task down into the hook when it is instantiated. This will ensure that cases where we execute a query using this operator have the task's execution timeout applied as the statement timeout.
  • We'll go in and add any clear/obvious statement timeouts to existing PostgresHook::run calls, but we won't spend too much time on that as part of this PR since we can always add those timeouts down the line.

I also shared this pseudo-code with Rebecca for what the hook might look like based off of her initial code:

class TimedPostgresHook(PostgresHook):
    """
    PostgresHook that sets the database timeout on any query to match the airflow task
    execution timeout.
    """

    def __init__(self, *args, default_statement_timeout: str = None, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.default_statement_timeout = default_statement_timeout

    def get_pg_timeout_sql(self, statement_timeout: str) -> str:
        return f"SET statement_timeout TO '{statement_timeout}s';"

    def run(self, sql, statement_timeout: str = None, autocommit=False, parameters=None, handler=None):
        statement_timeout = statement_timeout or self.default_statement_timeout

        if statement_timeout:
            sql = [self.get_pg_timeout_sql(statement_timeout) + sql]
        super().run(sql, autocommit, parameters, handler)

I think that covers everything we discussed, but Rebecca please chime in if I missed anything 😄

@rwidom
Copy link
Collaborator Author

rwidom commented Feb 3, 2023

Thanks so much @AetherUnbound ! That's perfect!

One more thought on naming: I have these things in a file called sql_helpers, but I think sql_timeout would probably be more clear. Thoughts?

@rwidom
Copy link
Collaborator Author

rwidom commented Feb 19, 2023

Hello!

I think I responded above about the timeouts stacking, but yes, it's something to be aware of. It's really per SQL statement, not per task, and a task can have multiple SQL statements. But maybe future PR(s) can add statement_timeouts to the run statement where needed, if the intention is to divide timeouts differently among statements within a task?

Also, yes, thanks so much for catching those op_args vs kw_args issues! I think I got them all, but we use the PostgresHook in a lot of places in the catalog. I really appreciate the second and third set of eyes. Maybe future work could implement/standardize some of the related ideas we've talked about here?

  • using the hook default postgres_conn_id, so that it doesn't need to be passed as a parameter so many times
  • using task: BaseOperator = None and implementing type checking to handle times when the task doesn't get passed to the function that instantiates the Hook, so that task can always be the last arg
  • other things?

For now, I did a lot of switching to kwargs in the function calls, mainly because it seemed like the path of least resistance and most clarity, but let me know if I should be doing something else instead.

@openverse-bot
Copy link
Contributor

Based on the high urgency of this PR, the following reviewers are being gently reminded to review this PR:

@krysal
This reminder is being automatically generated due to the urgency configuration.

Excluding weekend1 days, this PR was updated 2 day(s) ago. PRs labelled with high urgency are expected to be reviewed within 2 weekday(s)2.

@rwidom, if this PR is not ready for a review, please draft it to prevent reviewers from getting further unnecessary pings.

Footnotes

  1. Specifically, Saturday and Sunday.

  2. For the purpose of these reminders we treat Monday - Friday as weekdays. Please note that the that generates these reminders runs at midnight UTC on Monday - Friday. This means that depending on your timezone, you may be pinged outside of the expected range.

@AetherUnbound
Copy link
Contributor

I think I'd still prefer the task attribute being moved down in the argument order as part of this PR, that way we aren't changing function parameter orders significantly twice for all of these functions. Is that something you're comfortable changing @rwidom?

@rwidom
Copy link
Collaborator Author

rwidom commented Feb 23, 2023

I think I'd still prefer the task attribute being moved down in the argument order as part of this PR, that way we aren't changing function parameter orders significantly twice for all of these functions. Is that something you're comfortable changing @rwidom?

My only concern is that I have already done a bunch (if not all, I think all) of those changes in the function calls and tests, so I think I would need to now go back through all of those changes and change them again. Also, I'm not entirely sure how we want the error handling to work if the task is None. Just set the default task timeout and act like everything is ok? Add some info to the log? Raise an error? Like, how important is it that the task timeouts we're setting actually get implemented (setting aside stacking concerns) at the level of detail we're setting?

If this were only Airflow doing the passing, then I would be less concerned, but there are the situations where we have to pass the task more than once (i.e. through multiple function calls for the same step in a dag), so just listing the task in the kwargs won't make it automagically appear everywhere it needs to be.

rwidom and others added 3 commits February 24, 2023 13:26
These were removed in #985 but were actually still being issued, I had just written a very poor filter that apparently captured everything...
@AetherUnbound
Copy link
Contributor

Per an offline discussion that we had, I took on the effort of moving the function parameters around so that the task param was at the end. @rwidom please check out that commit and let me know what you think 🙂 I also got a little lost in warning land and found some crucial mistakes I had made in #985 when removing the old warnings.

I'm going to try to run a bunch of DAGs locally just to be sure, but I think this should be good to go now!

Copy link
Contributor

@AetherUnbound AetherUnbound left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks so much Rebecca for all your effort with this one! Can't wait to have it in 😄

Copy link
Contributor

@stacimc stacimc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Lowered the ingestion limit and ran just about all of our DAGs including the data refreshes. The changes to the parameters look great @AetherUnbound 💯

Thanks so much again for your incredible work here @rwidom! I'm so excited to see this in 🎉

This just improves testing speeds for most test runs, full suite runs will happen in CI/CD
@rwidom
Copy link
Collaborator Author

rwidom commented Feb 28, 2023

Thank you so much for doing this last piece @AetherUnbound !!!

This is a really small nit, but I think that technically tasks can be either BaseOperator or MappedOperator and for reasons I don't understand, the latter is not a subclass of the former. Maybe we should changeBaseOperator to AbstractOperator, per this https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/models/mappedoperator.html ? It looks like that's the parent class of BaseOperator as well (https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/models/baseoperator.html). I realized this when I was testing the mapped tasks from iNaturalist, and the timeouts weren't where I expected them to be from other operators.

@rwidom
Copy link
Collaborator Author

rwidom commented Feb 28, 2023

Ugh, I just started to make that change myself and got annoyed with myself about navigating the imports and wondering is (BaseOperator | MappedOperator) would be better. Maybe given that these things aren't really enforced we should just leave it as is and merge?

Copy link
Contributor

@AetherUnbound AetherUnbound left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the AbstractOperator typing looks great! This is ready to me - merge is "go" on your command 🚀😁

@rwidom rwidom merged commit 3ab5e57 into main Feb 28, 2023
@rwidom rwidom deleted the issue/690-kill-pg-query-from-airflow branch February 28, 2023 15:21
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
💻 aspect: code Concerns the software code in the repository 🛠 goal: fix Bug fix 🟧 priority: high Stalls work on the project or its dependents
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Ensure PG backend query is terminated on failure
4 participants