Skip to content

Commit

Permalink
Remove AutomationConditionTester from public API, add evaluate_automa…
Browse files Browse the repository at this point in the history
…tion_conditions
  • Loading branch information
OwenKephart committed Jun 7, 2024
1 parent 2133c50 commit 59f0cae
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 52 deletions.
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@
from dagster._core.definitions.declarative_automation import (
AssetCondition as AssetCondition,
AutomationCondition as AutomationCondition,
AutomationConditionTester as AutomationConditionTester,
evaluate_automation_conditions as evaluate_automation_conditions,
)
from dagster._core.definitions.decorators.asset_check_decorator import (
asset_check as asset_check,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from .automation_condition import AutomationCondition as AutomationCondition
from .automation_condition_tester import AutomationConditionTester as AutomationConditionTester
from .automation_condition_tester import (
evaluate_automation_conditions as evaluate_automation_conditions,
)
from .legacy import RuleCondition as RuleCondition
from .legacy.asset_condition import AssetCondition as AssetCondition
from .operands import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
from collections import defaultdict
from functools import cached_property
from typing import AbstractSet, Mapping, Optional, Sequence
from typing import AbstractSet, Mapping, Optional, Sequence, Union

import mock
import pendulum
Expand All @@ -11,22 +11,26 @@
from dagster._core.definitions.asset_daemon_cursor import AssetDaemonCursor
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_selection import AssetSelection
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.data_time import CachingDataTimeResolver
from dagster._core.definitions.declarative_automation.automation_condition_evaluator import (
AutomationConditionEvaluator,
)
from dagster._core.definitions.declarative_automation.serialized_objects import (
AssetConditionEvaluation,
)
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.events import AssetKeyPartitionKey, AssetMaterialization
from dagster._core.instance import DagsterInstance
from dagster._seven import get_current_datetime_in_utc
from dagster._utils.log import create_console_logger


class AutomationConditionTesterResult:
def __init__(self, requested_asset_partitions: AbstractSet[AssetKeyPartitionKey]):
class EvaluateAutomationConditionsResult:
def __init__(
self,
requested_asset_partitions: AbstractSet[AssetKeyPartitionKey],
cursor: AssetDaemonCursor,
):
self._requested_asset_partitions = requested_asset_partitions
self.cursor = cursor

@cached_property
def _requested_partitions_by_asset_key(self) -> Mapping[AssetKey, AbstractSet[Optional[str]]]:
Expand All @@ -49,18 +53,110 @@ def get_num_requested(self, asset_key: AssetKey) -> int:
return len(self.get_requested_partitions(asset_key))


def evaluate_automation_conditions(
defs: Union[Definitions, Sequence[AssetsDefinition]],
instance: DagsterInstance,
asset_selection: AssetSelection = AssetSelection.all(),
evaluation_time: Optional[datetime.datetime] = None,
cursor: Optional[AssetDaemonCursor] = None,
logger: Optional[logging.Logger] = None,
) -> EvaluateAutomationConditionsResult:
"""Evaluates the AutomationConditions of the provided assets, returning the results. Intended
for use in unit tests.
Params:
defs (Union[Definitions, Sequence[AssetsDefinitions]]):
The definitions to evaluate the conditions of.
instance (DagsterInstance):
The instance to evaluate against.
asset_selection (AssetSelection):
The selection of assets within defs to evaluate against. Defaults to AssetSelection.all()
evaluation_time (Optional[datetime.datetime]):
The time to use for the evaluation. Defaults to the true current time.
cursor (Optional[AssetDaemonCursor]):
The cursor for the computation. If you are evaluating multiple ticks within a test, this
value should be supplied from the `cursor` property of the returned `result` object.
logger (Optional[logging.Logger]):
A logger to emit diagnostic information during the evaluation.
Examples:
.. code-block:: python
from dagster import DagsterInstance, evaluate_automation_conditions
from my_proj import defs
def test_my_automation_conditions() -> None:
instance = DagsterInstance.ephemeral()
# asset starts off as missing, expect it to be requested
result = evaluate_automation_conditions(defs, instance)
assert result.total_requested == 1
# don't re-request the same asset
result = evaluate_automation_conditions(defs, instance, cursor=cursor)
assert result.total_requested == 0
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
"""
if not isinstance(defs, Definitions):
defs = Definitions(assets=defs)

asset_graph_view = AssetGraphView.for_test(
defs=defs,
instance=instance,
effective_dt=evaluation_time or get_current_datetime_in_utc(),
last_event_id=instance.event_log_storage.get_maximum_record_id(),
)
asset_graph = defs.get_asset_graph()
data_time_resolver = CachingDataTimeResolver(
asset_graph_view.get_inner_queryer_for_back_compat()
)
evaluator = AutomationConditionEvaluator(
asset_graph=asset_graph,
asset_keys=asset_selection.resolve(asset_graph),
asset_graph_view=asset_graph_view,
logger=logger or logging.getLogger("dagster.automation_condition_tester"),
cursor=cursor or AssetDaemonCursor.empty(),
data_time_resolver=data_time_resolver,
respect_materialization_data_versions=False,
auto_materialize_run_tags={},
)
results, requested_asset_partitions = evaluator.evaluate()
cursor = AssetDaemonCursor(
evaluation_id=0,
last_observe_request_timestamp_by_asset_key={},
previous_evaluation_state=None,
previous_condition_cursors=[result.get_new_cursor() for result in results],
)

return EvaluateAutomationConditionsResult(
cursor=cursor,
requested_asset_partitions=requested_asset_partitions,
)


class AutomationConditionTester:
def __init__(
self,
defs: Definitions,
asset_selection: AssetSelection = AssetSelection.all(),
current_time: Optional[datetime.datetime] = None,
cursor: Optional[AssetDaemonCursor] = None,
):
self._defs = defs
self._asset_selection = asset_selection
self._current_time = current_time or pendulum.now("UTC")
self._instance = DagsterInstance.ephemeral()
self._cursor = AssetDaemonCursor.empty()
self._cursor = cursor or AssetDaemonCursor.empty()
self._logger = create_console_logger("dagster.automation", logging.DEBUG)

def set_current_time(self, dt: datetime.datetime) -> None:
Expand All @@ -75,49 +171,15 @@ def add_materializations(
AssetMaterialization(asset_key=asset_key, partition=partition)
)

def evaluate(self) -> AutomationConditionTesterResult:
def evaluate(self) -> EvaluateAutomationConditionsResult:
"""Evaluates the AutomationConditions of all provided assets."""
asset_graph_view = AssetGraphView.for_test(
result = evaluate_automation_conditions(
defs=self._defs,
instance=self._instance,
effective_dt=self._current_time,
last_event_id=self._instance.event_log_storage.get_maximum_record_id(),
)
asset_graph = self._defs.get_asset_graph()
data_time_resolver = CachingDataTimeResolver(
asset_graph_view.get_inner_queryer_for_back_compat()
)
evaluator = AutomationConditionEvaluator(
asset_graph=asset_graph,
asset_keys=self._asset_selection.resolve(asset_graph),
asset_graph_view=asset_graph_view,
logger=self._logger,
asset_selection=self._asset_selection,
evaluation_time=self._current_time,
cursor=self._cursor,
data_time_resolver=data_time_resolver,
respect_materialization_data_versions=False,
auto_materialize_run_tags={},
)
results, requested_asset_partitions = evaluator.evaluate()
self._cursor = AssetDaemonCursor(
evaluation_id=0,
last_observe_request_timestamp_by_asset_key={},
previous_evaluation_state=None,
previous_condition_cursors=[result.get_new_cursor() for result in results],
)

for result in results:
self._logger.info(f"Evaluation of {result.asset_key}:")
self._log_evaluation(result.serializable_evaluation)

return AutomationConditionTesterResult(requested_asset_partitions)

def _log_evaluation(self, evaluation: AssetConditionEvaluation, depth: int = 1) -> None:
msg = " " * depth
msg += f"{evaluation.condition_snapshot.description} "
msg += f"({evaluation.true_subset.size} true) "
msg += (
f"({(evaluation.end_timestamp or 0) - (evaluation.start_timestamp or 0):.2f} seconds)"
logger=self._logger,
)
self._logger.info(msg)
for child in evaluation.child_evaluations:
self._log_evaluation(child, depth + 1)
self._cursor = result.cursor
return result
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@
from dagster import (
AssetSelection,
AutomationCondition,
AutomationConditionTester,
Definitions,
HourlyPartitionsDefinition,
StaticPartitionsDefinition,
asset,
evaluate_automation_conditions,
)
from dagster._core.definitions.declarative_automation.automation_condition_tester import (
AutomationConditionTester,
)
from dagster._core.instance import DagsterInstance


@asset(
Expand All @@ -34,7 +38,7 @@ def unpartitioned() -> None: ...
defs = Definitions(assets=[hourly, static, unpartitioned])


def test_current_time_manipulation() -> None:
def test_automation_tester_current_time_manipulation() -> None:
tester = AutomationConditionTester(
defs=defs,
asset_selection=AssetSelection.assets(hourly),
Expand All @@ -54,3 +58,34 @@ def test_current_time_manipulation() -> None:
result = tester.evaluate()
assert result.total_requested == 0
assert result.get_requested_partitions(hourly.key) == set()


def test_basic_regular_defs() -> None:
instance = DagsterInstance.ephemeral()

result = evaluate_automation_conditions(
defs=defs,
asset_selection=AssetSelection.assets(unpartitioned),
instance=instance,
)
assert result.total_requested == 1

result = evaluate_automation_conditions(
defs=defs,
asset_selection=AssetSelection.assets(unpartitioned),
instance=instance,
cursor=result.cursor,
)
assert result.total_requested == 0


def test_basic_assets_defs() -> None:
instance = DagsterInstance.ephemeral()

result = evaluate_automation_conditions(defs=[unpartitioned], instance=instance)
assert result.total_requested == 1

result = evaluate_automation_conditions(
defs=[unpartitioned], instance=instance, cursor=result.cursor
)
assert result.total_requested == 0

0 comments on commit 59f0cae

Please sign in to comment.