Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

airflow db cleanup - psycopg2.errors.InFailedSqlTransaction celery_taskmeta #24339

Closed
2 tasks done
bhavaniravi opened this issue Jun 9, 2022 · 3 comments
Closed
2 tasks done
Labels
area:core kind:bug This is a clearly a bug

Comments

@bhavaniravi
Copy link
Contributor

Apache Airflow version

2.3.0

What happened

The following exception on running airflow db clean. Though a part of the fix was released in #23698 it doesn't rollback the transaction, causing the next table queries to fail

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1705, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 716, in do_execute
    cursor.execute(statement, parameters)
psycopg2.errors.InFailedSqlTransaction: current transaction is aborted, commands ignored until end of transaction block


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.9/site-packages/airflow/__main__.py", line 38, in main
    args.func(args)
  File "/usr/local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 51, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/cli.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/db_command.py", line 195, in cleanup_tables
    run_cleanup(
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 71, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/db_cleanup.py", line 311, in run_cleanup
    _cleanup_table(
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/db_cleanup.py", line 228, in _cleanup_table
    _print_entities(query=query, print_rows=False)
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/db_cleanup.py", line 137, in _print_entities
    num_entities = query.count()
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 3062, in count
    return self._from_self(col).enable_eagerloads(False).scalar()
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 2803, in scalar
    ret = self.one()
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 2780, in one
    return self._iter().one()
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 2818, in _iter
    result = self.session.execute(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 1670, in execute
    result = conn._execute_20(statement, params or {}, execution_options)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1520, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 313, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1389, in _execute_clauseelement
    ret = self._execute_context(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1748, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1929, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1705, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 716, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.InternalError: (psycopg2.errors.InFailedSqlTransaction) current transaction is aborted, commands ignored until end of transaction block

What you think should happen instead

The current table transaction should be rolled back and proceed to delete other tables

How to reproduce

airflow db clean -v --clean-before-timestamp 2022-06-06 -y

Operating System

Debian GNU/Linux 10 (buster)

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else

As a fix I'm thinking of adding session paramter to _warn_if_missing and rollback when the error is caught. How does that look?

class _warn_if_missing(AbstractContextManager):
    def __init__(self, table, suppress, session=None):
        ...
        self.session = session

    def __exit__(self, exctype, excinst, exctb):
        caught_error = exctype is not None and issubclass(exctype, (OperationalError, ProgrammingError))
        if caught_error:
            logger.warning("Table %r not found.  Skipping.", self.table)
            self.session.rollback()
        return caught_error

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@bhavaniravi bhavaniravi added area:core kind:bug This is a clearly a bug labels Jun 9, 2022
@uranusjr
Copy link
Member

uranusjr commented Jun 9, 2022

I think we need to check session.is_active, but otherwise the fix makes sense.

Actually, the context manager can be simplified to

@contextlib.contextmanager
def _warn_if_missing(table, session):
    try:
        yield
    except (OperationalError, ProgrammingError):
        logger.warning("Table %r not found.  Skipping.", table)
        if session.is_active:
            session.rollback()
        raise

(Also we’re not using the suppress argument now. Is that intended @dstandish?)

@dstandish
Copy link
Contributor

This gets improved in #23574 (here).

It that PR I check for table existence and warn if missing, for any table. Then I suppress errors but log the traceback at DEBUG level if there's an actual error.

It's more precise this way. Previously it could think some errors were table missing errors, erroneously.

@eladkal
Copy link
Contributor

eladkal commented Jun 22, 2022

According to #24340 (comment) PR #23574 solved the issue

@eladkal eladkal closed this as completed Jun 22, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants