Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YAML Constructor Error when adding import_schema_path parameter to pipeline #575

Closed
AlbertSusanto opened this issue Aug 22, 2023 · 4 comments
Assignees
Labels
bug Something isn't working

Comments

@AlbertSusanto
Copy link

AlbertSusanto commented Aug 22, 2023

Background
Hi there!
Following this documentation about adding schema when creating a new pipeline, I found this error.

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/dlt/pipeline/pipeline.py", line 323, in normalize
    runner.run_pool(normalize.config, normalize)
  File "/usr/local/lib/python3.10/site-packages/dlt/common/runners/pool_runner.py", line 48, in run_pool
    while _run_func():
  File "/usr/local/lib/python3.10/site-packages/dlt/common/runners/pool_runner.py", line 41, in _run_func
    run_metrics = run_f.run(cast(TPool, pool))
  File "/usr/local/lib/python3.10/site-packages/dlt/normalize/normalize.py", line 313, in run
    self.spool_schema_files(load_id, schema_name, schema_files)
  File "/usr/local/lib/python3.10/site-packages/dlt/normalize/normalize.py", line 286, in spool_schema_files
    self.spool_files(schema_name, load_id, map_parallel_f, files)
  File "/usr/local/lib/python3.10/site-packages/dlt/normalize/normalize.py", line 252, in spool_files
    schema = Normalize.load_or_create_schema(self.schema_storage, schema_name)
  File "/usr/local/lib/python3.10/site-packages/dlt/normalize/normalize.py", line 58, in load_or_create_schema
    schema = schema_storage.load_schema(schema_name)
  File "/usr/local/lib/python3.10/site-packages/dlt/common/storages/live_schema_storage.py", line 28, in load_schema
    return super().load_schema(name)
  File "/usr/local/lib/python3.10/site-packages/dlt/common/storages/schema_storage.py", line 40, in load_schema
    return self._maybe_import_schema(name, storage_schema)
  File "/usr/local/lib/python3.10/site-packages/dlt/common/storages/schema_storage.py", line 93, in _maybe_import_schema
    imported_schema = self._load_import_schema(name)
  File "/usr/local/lib/python3.10/site-packages/dlt/common/storages/schema_storage.py", line 127, in _load_import_schema
    return self._parse_schema_str(import_storage.load(schema_file), self.config.external_schema_format)
  File "/usr/local/lib/python3.10/site-packages/dlt/common/storages/schema_storage.py", line 165, in _parse_schema_str
    imported_schema = yaml.safe_load(schema_str)
  File "/usr/local/lib/python3.10/site-packages/yaml/__init__.py", line 125, in safe_load
    return load(stream, SafeLoader)
  File "/usr/local/lib/python3.10/site-packages/yaml/__init__.py", line 81, in load
    return loader.get_single_data()
  File "/usr/local/lib/python3.10/site-packages/yaml/constructor.py", line 51, in get_single_data
    return self.construct_document(node)
  File "/usr/local/lib/python3.10/site-packages/yaml/constructor.py", line 60, in construct_document
    for dummy in generator:
  File "/usr/local/lib/python3.10/site-packages/yaml/constructor.py", line 413, in construct_yaml_map
    value = self.construct_mapping(node)
  File "/usr/local/lib/python3.10/site-packages/yaml/constructor.py", line 218, in construct_mapping
    return super().construct_mapping(node, deep=deep)
  File "/usr/local/lib/python3.10/site-packages/yaml/constructor.py", line 143, in construct_mapping
    value = self.construct_object(value_node, deep=deep)
  File "/usr/local/lib/python3.10/site-packages/yaml/constructor.py", line 100, in construct_object
    data = constructor(self, node)
  File "/usr/local/lib/python3.10/site-packages/yaml/constructor.py", line 427, in construct_undefined
    raise ConstructorError(None, None,
yaml.constructor.ConstructorError: could not determine a constructor for the tag 'tag:yaml.org,2002:python/object/apply:sqlalchemy.sql.elements.quoted_name'
  in "<unicode string>", line 53, column 15:
        resource: !!python/object/apply:sqlalchemy ... 

Problem
Currently i can conclude that this line of code imported_schema = yaml.safe_load(schema_str) in the dlt/common/storages/schema_storage.py is causing the problem when constructing the yaml tag here (!!python/object/apply). This also support the fact that PyYaml cannot construct the tag using SafeLoader as per in this issue. I may be wrong about this but let me know!

Steps to reproduce from my device

  • Create an Airflow PythonOperator DAG
  • Create folder schemas/export and schemas/import in the project
  • Add import_schema_path and export_schema_path parameter to the pipeline function
  • Execute the pipeline

Note

  • Package version: 0.3.8
@AlbertSusanto AlbertSusanto changed the title YAML Constructor Error when adding import/export_schema_path parameter to pipeline YAML Constructor Error when adding import_schema_path parameter to pipeline Aug 22, 2023
@rudolfix rudolfix added the bug Something isn't working label Aug 23, 2023
@rudolfix rudolfix moved this from Todo to Planned in dlt core library Aug 23, 2023
@sh-rp
Copy link
Collaborator

sh-rp commented Aug 23, 2023

Hey @AlbertSusanto, thanks for reporting this! can you provide us with a repo or a code snippet with the most basic setup that produces this error? Then I can have a look and find out what is going on there.

@rudolfix rudolfix moved this from Planned to In Progress in dlt core library Aug 23, 2023
@AlbertSusanto
Copy link
Author

Sorry for the late reply. This is my code to replicate.

pipeline = dlt.pipeline(
        pipeline_name="test-pipeline", 
        destination='bigquery', 
        dataset_name="test", 
        import_schema_path="schema/import", 
        export_schema_path="schema/export"
    )
credentials = ConnectionStringCredentials(
        "postgresql://postgres:postgres@localhost:5432/test"
    )
source_3: List[DltResource] = []
test_table = sql_table(credentials=credentials ,table="test", columns=['column1','column2','column3'])
source_3.append(test_table)
info = pipeline.run(source_3, write_disposition="append", table_name="test")
print(info)

I modified the sql_table function to become like this:

def sql_table(
    credentials: Union[ConnectionStringCredentials, Engine, str] = dlt.secrets.value,
    table: str = dlt.config.value,
    schema: Optional[str] = dlt.config.value,
    metadata: Optional[MetaData] = None,
    columns: Optional[list] = None,
    incremental: Optional[dlt.sources.incremental[Any]] = None,
) -> DltResource:
    """
    A dlt resource which loads data from an SQL database table using SQLAlchemy.

    Args:
        credentials (Union[ConnectionStringCredentials, Engine, str]): Database credentials or an `Engine` instance representing the database connection.
        table (str): Name of the table to load.
        schema (Optional[str]): Optional name of the schema the table belongs to.
        metadata (Optional[MetaData]): Optional `sqlalchemy.MetaData` instance. If provided, the `schema` argument is ignored.
        incremental (Optional[dlt.sources.incremental[Any]]): Option to enable incremental loading for the table.
            E.g., `incremental=dlt.sources.incremental('updated_at', pendulum.parse('2022-01-01T00:00:00Z'))`
        write_disposition (str): Write disposition of the resource.

    Returns:
        DltResource: The dlt resource for loading data from the SQL database table.
    """

    engine = engine_from_credentials(credentials)
    engine.execution_options(stream_results=True)
    metadata = metadata or MetaData(schema=schema)
    print(credentials)
    print(engine)
    print(schema)

    table_obj = Table(table, metadata, autoload_with=engine, include_columns=columns)

    return dlt.resource(
        table_rows, name=table_obj.name, primary_key=get_primary_key(table_obj)
    )(engine, table_obj, incremental=incremental)

Hope this helps, and let me know if the bug does exist or its just my local machine. Thanks!

@sh-rp
Copy link
Collaborator

sh-rp commented Sep 5, 2023

Hey @AlbertSusanto, no worries. Does this error also happen if you run the pipeline outside of airflow from your terminal/console? I tried to reproduce this, but the error does not happen for me. For reference, this is my code:

@dlt.common.configuration.with_config(
    sections=("sources", "sql_database"), spec=SqlTableResourceConfiguration
)
def sql_table(
    credentials: Union[ConnectionStringCredentials, Engine, str] = dlt.secrets.value,
    table: str = dlt.config.value,
    schema: Optional[str] = dlt.config.value,
    metadata: Optional[MetaData] = None,
    columns: Optional[list] = None,
    incremental: Optional[dlt.sources.incremental[Any]] = None,
) -> DltResource:
    """
    A dlt resource which loads data from an SQL database table using SQLAlchemy.

    Args:
        credentials (Union[ConnectionStringCredentials, Engine, str]): Database credentials or an `Engine` instance representing the database connection.
        table (str): Name of the table to load.
        schema (Optional[str]): Optional name of the schema the table belongs to.
        metadata (Optional[MetaData]): Optional `sqlalchemy.MetaData` instance. If provided, the `schema` argument is ignored.
        incremental (Optional[dlt.sources.incremental[Any]]): Option to enable incremental loading for the table.
            E.g., `incremental=dlt.sources.incremental('updated_at', pendulum.parse('2022-01-01T00:00:00Z'))`
        write_disposition (str): Write disposition of the resource.

    Returns:
        DltResource: The dlt resource for loading data from the SQL database table.
    """

    engine = engine_from_credentials(credentials)
    engine.execution_options(stream_results=True)
    metadata = metadata or MetaData(schema=schema)
    print(credentials)
    print(engine)
    print(schema)

    table_obj = Table(table, metadata, autoload_with=engine, include_columns=columns)

    return dlt.resource(
        table_rows, name=table_obj.name, primary_key=get_primary_key(table_obj)
    )(engine, table_obj, incremental=incremental)

def load_select_tables_from_database() -> None:
    """Use the sql_database source to reflect an entire database schema and load select tables from it.

    This example sources data from the public Rfam MySQL database.
    """
    # Create a pipeline
    pipeline = dlt.pipeline(
        import_schema_path="schemas/import",
        export_schema_path="schemas/export",
        pipeline_name="rfam",
        destination='bigquery', dataset_name="rfam_data"
    )

    # Credentials for the sample database.
    # Note: It is recommended to configure credentials in `.dlt/secrets.toml` under `sources.sql_database.credentials`
    credentials = ConnectionStringCredentials(
        "mysql+pymysql://[email protected]:4497/Rfam"
    )

    source_3 = []
    test_table = sql_table(credentials=credentials, table="clan", columns=["description", "clan_acc"])
    source_3.append(test_table)
    info = pipeline.run(source_3, write_disposition="append", table_name="test")
    print(info)

    # Run the pipeline. The merge write disposition merges existing rows in the destination by primary key
    info = pipeline.run(source_3, write_disposition="merge")
    print(info)


if __name__ == "__main__":
    load_select_tables_from_database()



@rudolfix
Copy link
Collaborator

rudolfix commented Nov 8, 2023

we were not able to reproduce the issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Status: Done
Development

No branches or pull requests

3 participants