Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bigquery IO Manager altering table name when dealing with AssetOut from multi_asset #25990

Open
sarora-roivant opened this issue Nov 18, 2024 · 0 comments
Labels
area: io-manager Related to I/O Managers type: bug Something isn't working

Comments

@sarora-roivant
Copy link

sarora-roivant commented Nov 18, 2024

What's the issue?

Bigquery IO Manager is somehow altering the table name away from the asset key when dealing with the materialization of an output from a multi_asset. The following is the error:

dagster._core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "relevant_mentions" of step "materialized_patient_chunks_bq":

  File "/app/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_plan.py", line 245, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/app/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 506, in core_dagster_event_sequence_for_step
    for evt in _type_check_and_store_output(step_context, user_event):
  File "/app/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 553, in _type_check_and_store_output
    for evt in _store_output(step_context, step_output_handle, output):
  File "/app/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 758, in _store_output
    for elt in iterate_with_context(
  File "/app/.venv/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 484, in iterate_with_context
    return
  File "/root/.local/share/uv/python/cpython-3.9.20-linux-aarch64-gnu/lib/python3.9/contextlib.py", line 137, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/app/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 84, in op_execution_error_boundary
    raise error_cls(

The above exception was caused by the following exception:
google.api_core.exceptions.BadRequest: 400 Table relevant_mentions_4a20d45f_1fda_48ca_b43c_4b1ca6be3acf_source does not have a schema.; reason: invalid, message: Table relevant_mentions_4a20d45f_1fda_48ca_b43c_4b1ca6be3acf_source does not have a schema.

  File "/app/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/app/.venv/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 482, in iterate_with_context
    next_output = next(iterator)
  File "/app/.venv/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 748, in _gen_fn
    gen_output = output_manager.handle_output(output_context, output.value)
  File "/app/.venv/lib/python3.9/site-packages/dagster/_core/storage/db_io_manager.py", line 159, in handle_output
    handler_metadata = self._handlers_by_type[obj_type].handle_output(
  File "/app/.venv/lib/python3.9/site-packages/dagster_gcp_pandas/bigquery/bigquery_pandas_type_handler.py", line 57, in handle_output
    job.result()
  File "/app/.venv/lib/python3.9/site-packages/google/cloud/bigquery/job/base.py", line 969, in result
    return super(_AsyncJob, self).result(timeout=timeout, **kwargs)
  File "/app/.venv/lib/python3.9/site-packages/google/api_core/future/polling.py", line 261, in result
    raise self._exception

The following is my multi asset definition :

@multi_asset(
    partitions_def=multi_partitions_def,
    ins={"processed_patient_chunks": AssetIn()},
    outs={
        "processed_chunks": AssetOut(
                key="processed_chunks",
                io_manager_key="bq_io_manager", 
                metadata={
                    "partition_expr": {
                        "dataownerid": "CAST(dataownerid AS STRING)",
                        "delivery_week": "CAST(delivery_week AS STRING)"
                    },
                    "write_disposition": "WRITE_APPEND"
                }
            ),
        "condition_evidences": AssetOut(
                key="condition_evidences",
                io_manager_key="bq_io_manager", 
                metadata={
                    "partition_expr": {
                        "dataownerid": "CAST(dataownerid AS STRING)",
                        "delivery_week": "CAST(delivery_week AS STRING)"
                    },
                    "write_disposition": "WRITE_APPEND"
                }
            ),
        "relevant_mentions": AssetOut(
                key="relevant_mentions",
                io_manager_key="bq_io_manager", 
                metadata={
                    "table": "relevant_mentions",
                    "partition_expr": {
                        "dataownerid": "CAST(dataownerid AS STRING)",
                        "delivery_week": "CAST(delivery_week AS STRING)"
                    },
                    "write_disposition": "WRITE_APPEND"
                }
            )
    }
)
def materialized_patient_chunks_bq(context, processed_patient_chunks: List[ProcessedChunk]):
    # Initialize empty lists to store records
    chunks_records = []
    evidences_records = []
    mentions_records = []
    
    # Process each chunk and create linked records
    for chunk in processed_patient_chunks:
        chunk_id = generate_uuid()
        
        # Create processed chunk record
        chunk_record = {
            'id': chunk_id,
            'chunk_id': chunk.chunk_id,
            'patient_id': chunk.patient_id,
            'note_id': chunk.note_id,
            'chunk_number': chunk.chunk_number,
            'text': chunk.text,
            'assessment_type': None,  # Optional field
            'run_id': context.run_id,  # Will be populated by upstream process
            'created_at': datetime.now().astimezone().strftime('%Y-%m-%d %H:%M:%S'),
            'updated_at': datetime.now().astimezone().strftime('%Y-%m-%d %H:%M:%S'),
            'deleted': False, 
            'dataownerid': str(context.partition_key.keys_by_dimension["dataownerid"]),
            'delivery_week': str(context.partition_key.keys_by_dimension["delivery_week"])
        }
        chunks_records.append(chunk_record)
        
        # Process evidences for each chunk
        for evidence in chunk.evidences:
            evidence_id = generate_uuid()
            
            # Create evidence record
            evidence_record = {
                'id': evidence_id,
                'processed_chunk_id': chunk_id,  # Link to parent chunk
                'run_id': context.run_id,  # Will be populated by upstream process
                'condition': evidence.condition,
                'assessment': evidence.assessment,
                'created_at': datetime.now().astimezone().strftime('%Y-%m-%d %H:%M:%S'),
                'updated_at': datetime.now().astimezone().strftime('%Y-%m-%d %H:%M:%S'),
                'deleted': False, 
                'dataownerid': str(context.partition_key.keys_by_dimension["dataownerid"]),
                'delivery_week': str(context.partition_key.keys_by_dimension["delivery_week"])

            }
            evidences_records.append(evidence_record)
            
            # Process mentions for each evidence
            for mention in evidence.relevant_mentions:
                mention_record = {
                    'id': generate_uuid(),
                    'condition_evidence_id': evidence_id,  # Link to parent evidence
                    'patient_assessment_id': None,  # Will be populated by downstream process
                    'run_id': context.run_id,  
                    'text': mention.text,
                    'condition': mention.condition,
                    'reasoning': mention.reasoning,
                    'relevance_score': mention.relevance_score.model_dump_json(),
                    'start': mention.start,
                    'end': mention.end,
                    'created_at': datetime.now().astimezone().strftime('%Y-%m-%d %H:%M:%S'),
                    'updated_at': datetime.now().astimezone().strftime('%Y-%m-%d %H:%M:%S'),
                    'deleted': False, 
                    'dataownerid': str(context.partition_key.keys_by_dimension["dataownerid"]),
                    'delivery_week': str(context.partition_key.keys_by_dimension["delivery_week"])

                }
                mentions_records.append(mention_record)
    
    # Create DataFrames
    processed_chunks = pd.DataFrame(chunks_records)
    condition_evidences = pd.DataFrame(evidences_records)
    relevant_mentions = pd.DataFrame(mentions_records)
    
    return processed_chunks, condition_evidences, relevant_mentions

The other two assets materialize fine, this one fails.

What did you expect to happen?

No response

How to reproduce?

No response

Dagster version

1.6.6

Deployment type

Dagster Helm chart

Deployment details

Running Dagster helm chart on docker desktop with k8s executor

Additional information

Also have an asset sensor defined which senses for the materialization of the "relevant_mentions" asset.

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.
By submitting this issue, you agree to follow Dagster's Code of Conduct.

@sarora-roivant sarora-roivant added the type: bug Something isn't working label Nov 18, 2024
@garethbrickman garethbrickman added the area: io-manager Related to I/O Managers label Nov 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area: io-manager Related to I/O Managers type: bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants