-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DA][testing] RFC: Create AutomationConditionTester class #22292
Merged
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
3 changes: 3 additions & 0 deletions
3
python_modules/dagster/dagster/_core/definitions/declarative_automation/__init__.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
137 changes: 137 additions & 0 deletions
137
...s/dagster/dagster/_core/definitions/declarative_automation/automation_condition_tester.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
import datetime | ||
import logging | ||
from collections import defaultdict | ||
from functools import cached_property | ||
from typing import AbstractSet, Mapping, Optional, Sequence, Union | ||
|
||
from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView | ||
from dagster._core.definitions.asset_daemon_cursor import AssetDaemonCursor | ||
from dagster._core.definitions.asset_key import AssetKey | ||
from dagster._core.definitions.asset_selection import AssetSelection | ||
from dagster._core.definitions.assets import AssetsDefinition | ||
from dagster._core.definitions.data_time import CachingDataTimeResolver | ||
from dagster._core.definitions.declarative_automation.automation_condition_evaluator import ( | ||
AutomationConditionEvaluator, | ||
) | ||
from dagster._core.definitions.definitions_class import Definitions | ||
from dagster._core.definitions.events import AssetKeyPartitionKey | ||
from dagster._core.instance import DagsterInstance | ||
from dagster._seven import get_current_datetime_in_utc | ||
|
||
|
||
class EvaluateAutomationConditionsResult: | ||
def __init__( | ||
self, | ||
requested_asset_partitions: AbstractSet[AssetKeyPartitionKey], | ||
cursor: AssetDaemonCursor, | ||
): | ||
self._requested_asset_partitions = requested_asset_partitions | ||
self.cursor = cursor | ||
|
||
@cached_property | ||
def _requested_partitions_by_asset_key(self) -> Mapping[AssetKey, AbstractSet[Optional[str]]]: | ||
mapping = defaultdict(set) | ||
for asset_partition in self._requested_asset_partitions: | ||
mapping[asset_partition.asset_key].add(asset_partition.partition_key) | ||
return mapping | ||
|
||
@property | ||
def total_requested(self) -> int: | ||
"""Returns the total number of asset partitions requested during this evaluation.""" | ||
return len(self._requested_asset_partitions) | ||
|
||
def get_requested_partitions(self, asset_key: AssetKey) -> AbstractSet[Optional[str]]: | ||
"""Returns the specific partition keys requested for the given asset during this evaluation.""" | ||
return self._requested_partitions_by_asset_key[asset_key] | ||
|
||
def get_num_requested(self, asset_key: AssetKey) -> int: | ||
"""Returns the number of asset partitions requested for the given asset during this evaluation.""" | ||
return len(self.get_requested_partitions(asset_key)) | ||
|
||
|
||
def evaluate_automation_conditions( | ||
defs: Union[Definitions, Sequence[AssetsDefinition]], | ||
instance: DagsterInstance, | ||
asset_selection: AssetSelection = AssetSelection.all(), | ||
evaluation_time: Optional[datetime.datetime] = None, | ||
cursor: Optional[AssetDaemonCursor] = None, | ||
) -> EvaluateAutomationConditionsResult: | ||
"""Evaluates the AutomationConditions of the provided assets, returning the results. Intended | ||
for use in unit tests. | ||
Params: | ||
defs (Union[Definitions, Sequence[AssetsDefinitions]]): | ||
The definitions to evaluate the conditions of. | ||
instance (DagsterInstance): | ||
The instance to evaluate against. | ||
asset_selection (AssetSelection): | ||
The selection of assets within defs to evaluate against. Defaults to AssetSelection.all() | ||
evaluation_time (Optional[datetime.datetime]): | ||
The time to use for the evaluation. Defaults to the true current time. | ||
cursor (Optional[AssetDaemonCursor]): | ||
The cursor for the computation. If you are evaluating multiple ticks within a test, this | ||
value should be supplied from the `cursor` property of the returned `result` object. | ||
Examples: | ||
.. code-block:: python | ||
from dagster import DagsterInstance, evaluate_automation_conditions | ||
from my_proj import defs | ||
def test_my_automation_conditions() -> None: | ||
instance = DagsterInstance.ephemeral() | ||
# asset starts off as missing, expect it to be requested | ||
result = evaluate_automation_conditions(defs, instance) | ||
assert result.total_requested == 1 | ||
# don't re-request the same asset | ||
result = evaluate_automation_conditions(defs, instance, cursor=cursor) | ||
assert result.total_requested == 0 | ||
from dagster import AssetExecutionContext | ||
from dagster_dbt import DbtCliResource, dbt_assets | ||
@dbt_assets(manifest=Path("target", "manifest.json")) | ||
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): | ||
yield from dbt.cli(["build"], context=context).stream() | ||
""" | ||
if not isinstance(defs, Definitions): | ||
defs = Definitions(assets=defs) | ||
|
||
asset_graph_view = AssetGraphView.for_test( | ||
defs=defs, | ||
instance=instance, | ||
effective_dt=evaluation_time or get_current_datetime_in_utc(), | ||
last_event_id=instance.event_log_storage.get_maximum_record_id(), | ||
) | ||
asset_graph = defs.get_asset_graph() | ||
data_time_resolver = CachingDataTimeResolver( | ||
asset_graph_view.get_inner_queryer_for_back_compat() | ||
) | ||
evaluator = AutomationConditionEvaluator( | ||
asset_graph=asset_graph, | ||
asset_keys=asset_selection.resolve(asset_graph), | ||
asset_graph_view=asset_graph_view, | ||
logger=logging.getLogger("dagster.automation_condition_tester"), | ||
cursor=cursor or AssetDaemonCursor.empty(), | ||
data_time_resolver=data_time_resolver, | ||
respect_materialization_data_versions=False, | ||
auto_materialize_run_tags={}, | ||
) | ||
results, requested_asset_partitions = evaluator.evaluate() | ||
cursor = AssetDaemonCursor( | ||
evaluation_id=0, | ||
last_observe_request_timestamp_by_asset_key={}, | ||
previous_evaluation_state=None, | ||
previous_condition_cursors=[result.get_new_cursor() for result in results], | ||
) | ||
|
||
return EvaluateAutomationConditionsResult( | ||
cursor=cursor, | ||
requested_asset_partitions=requested_asset_partitions, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
64 changes: 64 additions & 0 deletions
64
...agster_tests/definitions_tests/auto_materialize_tests/test_automation_condition_tester.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
from dagster import ( | ||
AssetSelection, | ||
AutomationCondition, | ||
Definitions, | ||
HourlyPartitionsDefinition, | ||
StaticPartitionsDefinition, | ||
asset, | ||
evaluate_automation_conditions, | ||
) | ||
from dagster._core.instance import DagsterInstance | ||
|
||
|
||
@asset( | ||
partitions_def=HourlyPartitionsDefinition("2020-01-01-00:00"), | ||
auto_materialize_policy=AutomationCondition.eager().as_auto_materialize_policy(), | ||
) | ||
def hourly() -> None: ... | ||
|
||
|
||
@asset( | ||
partitions_def=StaticPartitionsDefinition(["a", "b", "c"]), | ||
auto_materialize_policy=AutomationCondition.eager().as_auto_materialize_policy(), | ||
) | ||
def static() -> None: ... | ||
|
||
|
||
@asset( | ||
auto_materialize_policy=AutomationCondition.eager().as_auto_materialize_policy(), | ||
) | ||
def unpartitioned() -> None: ... | ||
|
||
|
||
defs = Definitions(assets=[hourly, static, unpartitioned]) | ||
|
||
|
||
def test_basic_regular_defs() -> None: | ||
instance = DagsterInstance.ephemeral() | ||
|
||
result = evaluate_automation_conditions( | ||
defs=defs, | ||
asset_selection=AssetSelection.assets(unpartitioned), | ||
instance=instance, | ||
) | ||
assert result.total_requested == 1 | ||
|
||
result = evaluate_automation_conditions( | ||
defs=defs, | ||
asset_selection=AssetSelection.assets(unpartitioned), | ||
instance=instance, | ||
cursor=result.cursor, | ||
) | ||
assert result.total_requested == 0 | ||
|
||
|
||
def test_basic_assets_defs() -> None: | ||
instance = DagsterInstance.ephemeral() | ||
|
||
result = evaluate_automation_conditions(defs=[unpartitioned], instance=instance) | ||
assert result.total_requested == 1 | ||
|
||
result = evaluate_automation_conditions( | ||
defs=[unpartitioned], instance=instance, cursor=result.cursor | ||
) | ||
assert result.total_requested == 0 |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely should be a goal to have this return a range representation soon. cc: @smackesey