Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(airflow): fix AthenaOperator extraction #11857

Merged
merged 2 commits into from
Dec 4, 2024

Conversation

steffengr
Copy link
Contributor

@steffengr steffengr commented Nov 14, 2024

The GenericSqlExtractor which is currently by the DataHub Airflow plugin to extract lineage information does not properly support the AthenaOperator and crashes with "AttributeError: 'AthenaOperator' object has no attribute 'sql'". This patch introduces a AthenaOperatorExtractor following the BigQueryInsertJobOperatorExtractor example to fix support for the AthenaOperator.

Fixes #11160

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • Docs related to the changes have been added/updated (if applicable). If a new feature has been added a Usage Guide has been added for the same.
  • For any breaking change/potential downtime/deprecation/big changes an entry has been made in Updating DataHub

@github-actions github-actions bot added ingestion PR or Issue related to the ingestion of metadata community-contribution PR or Issue raised by member(s) of DataHub Community labels Nov 14, 2024
Copy link
Collaborator

@hsheth2 hsheth2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code looks reasonable - can we add a test for this?

@steffengr
Copy link
Contributor Author

@hsheth2 Thank for looking at this! I don't see related tests in this module. Could you point me at an example? I don't have much experience with this code base.

@hsheth2 hsheth2 added the needs-review Label for PRs that need review from a maintainer. label Nov 20, 2024
@hsheth2 hsheth2 added pending-submitter-response Issue/request has been reviewed but requires a response from the submitter and removed needs-review Label for PRs that need review from a maintainer. labels Nov 20, 2024
@steffengr
Copy link
Contributor Author

@hsheth2 I added a test. I had to set "AIRFLOW__API__AUTH_BACKENDS": "airflow.providers.fab.auth_manager.api.auth.backend.basic_auth", to get these tests to work. Probably some change in Airflow.

@steffengr
Copy link
Contributor Author

I'm not sure what causes these test failures. I'm also getting them on master.

@hsheth2
Copy link
Collaborator

hsheth2 commented Nov 27, 2024

Looks like CI is still failing. I see this in the logs (e.g. here https://github.com/datahub-project/datahub/actions/runs/12052943048/job/33607437487?pr=11857)

Traceback (most recent call last):
  File "/home/runner/work/datahub/datahub/metadata-ingestion-modules/airflow-plugin/venv/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/home/runner/work/datahub/datahub/metadata-ingestion-modules/airflow-plugin/venv/lib/python3.10/site-packages/airflow/__main__.py", line 57, in main
    args.func(args)
  File "/home/runner/work/datahub/datahub/metadata-ingestion-modules/airflow-plugin/venv/lib/python3.10/site-packages/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
  File "/home/runner/work/datahub/datahub/metadata-ingestion-modules/airflow-plugin/venv/lib/python3.10/site-packages/airflow/utils/cli.py", line 114, in wrapper
    return f(*args, **kwargs)
  File "/home/runner/work/datahub/datahub/metadata-ingestion-modules/airflow-plugin/venv/lib/python3.10/site-packages/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function
    return func(*args, **kwargs)
  File "/home/runner/work/datahub/datahub/metadata-ingestion-modules/airflow-plugin/venv/lib/python3.10/site-packages/airflow/cli/commands/dag_command.py", line 160, in dag_trigger
    message = api_client.trigger_dag(
  File "/home/runner/work/datahub/datahub/metadata-ingestion-modules/airflow-plugin/venv/lib/python3.10/site-packages/airflow/api/client/local_client.py", line 34, in trigger_dag
    dag_run = trigger_dag.trigger_dag(
  File "/home/runner/work/datahub/datahub/metadata-ingestion-modules/airflow-plugin/venv/lib/python3.10/site-packages/airflow/api/common/trigger_dag.py", line 119, in trigger_dag
    raise DagNotFound(f"Dag id {dag_id} not found in DagModel")
airflow.exceptions.DagNotFound: Dag id athena_operator not found in DagModel

I also created #11981 to help make this a bit easier to debug, in case that's useful.

@steffengr
Copy link
Contributor Author

@hsheth2 I'm lost here. I suspected there might be a race condition between triggering the dag and loading it as I get this error randomly on any of the test dags on my local machine. However, calling the rest API to wait for the dag to be available doesn't seem to help on some of the Airflow versions. Any idea what could cause the dag to not load on some of the Airflow versions?

@steffengr
Copy link
Contributor Author

@hsheth2 Looks like changing my REST call to check if the dag exists by removing the query parameter that didn't exist in old versions of Airflow did the trick but now I'm getting another strange error regarding the golden file not existing on only one particular version of Airflow (2.4.3). Does this version require anything special?

ignore_paths_v2 = (), ignore_order = True
    def assert_metadata_files_equal(
        output_path: Union[str, os.PathLike],
        golden_path: Union[str, os.PathLike],
        update_golden: bool,
        copy_output: bool,
        ignore_paths: Sequence[str] = (),
        ignore_paths_v2: Sequence[str] = (),
        ignore_order: bool = True,
    ) -> None:
        golden_exists = os.path.isfile(golden_path)
    
        if copy_output:
            shutil.copyfile(str(output_path), str(golden_path) + ".output")
            logger.info(f"Copied output file to {golden_path}.output")
    
        if not update_golden and not golden_exists:
>           raise FileNotFoundError(
                "Golden file does not exist. Please run with the --update-golden-files option to create."
            )
E           FileNotFoundError: Golden file does not exist. Please run with the --update-golden-files option to create.
../../metadata-ingestion/src/datahub/testing/compare_metadata_json.py:56: FileNotFoundError
=========================== short test summary info ============================
FAILED tests/integration/test_plugin.py::test_airflow_plugin[v2_athena_operator_no_dag_listener] - FileNotFoundError: Golden file does not exist. Please run with the --update-golden-files option to create.
======= 1 failed, 19 passed, 3 skipped, 21 warnings in 347.02s (0:05:47) =======

@hsheth2
Copy link
Collaborator

hsheth2 commented Nov 29, 2024

@steffengr with airflow 2.4, we support a limited form of the plugin, since certain features weren't available until later versions.

We run tests for that as well, which generates the "no_dag_listener" variants of the golden files. That file golden can only be generated on airflow 2.4.

I believe something like this would generate that missing file.

tox -e py310-airflow24 -- 'tests/integration/test_plugin.py::test_airflow_plugin[v2_athena_operator_no_dag_listener]' --update-golden-files

The GenericSqlExtractor which is currently by the DataHub Airflow plugin
to extract lineage information does not properly support the
AthenaOperator and crashes with "AttributeError: 'AthenaOperator' object
has no attribute 'sql'". This patch introduces a AthenaOperatorExtractor
following the BigQueryInsertJobOperatorExtractor example to fix support
for the AthenaOperator.
@steffengr
Copy link
Contributor Author

@hsheth2 Thanks! That did the trick. Is there any chance to get this into a 0.14 patch release?

@datahub-cyborg datahub-cyborg bot added needs-review Label for PRs that need review from a maintainer. and removed pending-submitter-response Issue/request has been reviewed but requires a response from the submitter labels Dec 2, 2024
@datahub-cyborg datahub-cyborg bot added merge-pending-ci A PR that has passed review and should be merged once CI is green. and removed needs-review Label for PRs that need review from a maintainer. labels Dec 2, 2024
@hsheth2 hsheth2 changed the title fix(ingestion/airflow-plugin): fix AthenaOperator extraction fix(airflow): fix AthenaOperator extraction Dec 4, 2024
@hsheth2 hsheth2 merged commit 49b6284 into datahub-project:master Dec 4, 2024
57 of 85 checks passed
@hsheth2
Copy link
Collaborator

hsheth2 commented Dec 4, 2024

@steffengr this will likely go into 0.15.0. We will definitely at least cut an rc this week, and might also cut the full release

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-contribution PR or Issue raised by member(s) of DataHub Community ingestion PR or Issue related to the ingestion of metadata merge-pending-ci A PR that has passed review and should be merged once CI is green.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Datahub Airflow plugin throws error: AttributeError: 'AthenaOperator' object has no attribute 'sql'
2 participants