Skip to content

Commit

Permalink
Remove AutomationConditionTester from public API, add evaluate_automa…
Browse files Browse the repository at this point in the history
…tion_conditions
  • Loading branch information
OwenKephart committed Jun 6, 2024
1 parent 2133c50 commit 18549a4
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 51 deletions.
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@
from dagster._core.definitions.declarative_automation import (
AssetCondition as AssetCondition,
AutomationCondition as AutomationCondition,
AutomationConditionTester as AutomationConditionTester,
evaluate_automation_conditions as evaluate_automation_conditions,
)
from dagster._core.definitions.decorators.asset_check_decorator import (
asset_check as asset_check,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from .automation_condition import AutomationCondition as AutomationCondition
from .automation_condition_tester import AutomationConditionTester as AutomationConditionTester
from .automation_condition_tester import (
evaluate_automation_conditions as evaluate_automation_conditions,
)
from .legacy import RuleCondition as RuleCondition
from .legacy.asset_condition import AssetCondition as AssetCondition
from .operands import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,21 @@
from dagster._core.definitions.declarative_automation.automation_condition_evaluator import (
AutomationConditionEvaluator,
)
from dagster._core.definitions.declarative_automation.serialized_objects import (
AssetConditionEvaluation,
)
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.events import AssetKeyPartitionKey, AssetMaterialization
from dagster._core.instance import DagsterInstance
from dagster._seven import get_current_datetime_in_utc
from dagster._utils.log import create_console_logger


class AutomationConditionTesterResult:
def __init__(self, requested_asset_partitions: AbstractSet[AssetKeyPartitionKey]):
class EvaluateAutomationConditionsResult:
def __init__(
self,
requested_asset_partitions: AbstractSet[AssetKeyPartitionKey],
cursor: AssetDaemonCursor,
):
self._requested_asset_partitions = requested_asset_partitions
self.cursor = cursor

@cached_property
def _requested_partitions_by_asset_key(self) -> Mapping[AssetKey, AbstractSet[Optional[str]]]:
Expand All @@ -49,18 +52,61 @@ def get_num_requested(self, asset_key: AssetKey) -> int:
return len(self.get_requested_partitions(asset_key))


def evaluate_automation_conditions(
defs: Definitions,
instance: DagsterInstance,
asset_selection: AssetSelection = AssetSelection.all(),
evaluation_time: Optional[datetime.datetime] = None,
cursor: Optional[AssetDaemonCursor] = None,
logger: Optional[logging.Logger] = None,
) -> EvaluateAutomationConditionsResult:
asset_graph_view = AssetGraphView.for_test(
defs=defs,
instance=instance,
effective_dt=evaluation_time or get_current_datetime_in_utc(),
last_event_id=instance.event_log_storage.get_maximum_record_id(),
)
asset_graph = defs.get_asset_graph()
data_time_resolver = CachingDataTimeResolver(
asset_graph_view.get_inner_queryer_for_back_compat()
)
evaluator = AutomationConditionEvaluator(
asset_graph=asset_graph,
asset_keys=asset_selection.resolve(asset_graph),
asset_graph_view=asset_graph_view,
logger=logger or logging.getLogger("dagster.automation_condition_tester"),
cursor=cursor or AssetDaemonCursor.empty(),
data_time_resolver=data_time_resolver,
respect_materialization_data_versions=False,
auto_materialize_run_tags={},
)
results, requested_asset_partitions = evaluator.evaluate()
cursor = AssetDaemonCursor(
evaluation_id=0,
last_observe_request_timestamp_by_asset_key={},
previous_evaluation_state=None,
previous_condition_cursors=[result.get_new_cursor() for result in results],
)

return EvaluateAutomationConditionsResult(
cursor=cursor,
requested_asset_partitions=requested_asset_partitions,
)


class AutomationConditionTester:
def __init__(
self,
defs: Definitions,
asset_selection: AssetSelection = AssetSelection.all(),
current_time: Optional[datetime.datetime] = None,
cursor: Optional[AssetDaemonCursor] = None,
):
self._defs = defs
self._asset_selection = asset_selection
self._current_time = current_time or pendulum.now("UTC")
self._instance = DagsterInstance.ephemeral()
self._cursor = AssetDaemonCursor.empty()
self._cursor = cursor or AssetDaemonCursor.empty()
self._logger = create_console_logger("dagster.automation", logging.DEBUG)

def set_current_time(self, dt: datetime.datetime) -> None:
Expand All @@ -75,49 +121,15 @@ def add_materializations(
AssetMaterialization(asset_key=asset_key, partition=partition)
)

def evaluate(self) -> AutomationConditionTesterResult:
def evaluate(self) -> EvaluateAutomationConditionsResult:
"""Evaluates the AutomationConditions of all provided assets."""
asset_graph_view = AssetGraphView.for_test(
result = evaluate_automation_conditions(
defs=self._defs,
instance=self._instance,
effective_dt=self._current_time,
last_event_id=self._instance.event_log_storage.get_maximum_record_id(),
)
asset_graph = self._defs.get_asset_graph()
data_time_resolver = CachingDataTimeResolver(
asset_graph_view.get_inner_queryer_for_back_compat()
)
evaluator = AutomationConditionEvaluator(
asset_graph=asset_graph,
asset_keys=self._asset_selection.resolve(asset_graph),
asset_graph_view=asset_graph_view,
logger=self._logger,
asset_selection=self._asset_selection,
evaluation_time=self._current_time,
cursor=self._cursor,
data_time_resolver=data_time_resolver,
respect_materialization_data_versions=False,
auto_materialize_run_tags={},
)
results, requested_asset_partitions = evaluator.evaluate()
self._cursor = AssetDaemonCursor(
evaluation_id=0,
last_observe_request_timestamp_by_asset_key={},
previous_evaluation_state=None,
previous_condition_cursors=[result.get_new_cursor() for result in results],
)

for result in results:
self._logger.info(f"Evaluation of {result.asset_key}:")
self._log_evaluation(result.serializable_evaluation)

return AutomationConditionTesterResult(requested_asset_partitions)

def _log_evaluation(self, evaluation: AssetConditionEvaluation, depth: int = 1) -> None:
msg = " " * depth
msg += f"{evaluation.condition_snapshot.description} "
msg += f"({evaluation.true_subset.size} true) "
msg += (
f"({(evaluation.end_timestamp or 0) - (evaluation.start_timestamp or 0):.2f} seconds)"
logger=self._logger,
)
self._logger.info(msg)
for child in evaluation.child_evaluations:
self._log_evaluation(child, depth + 1)
self._cursor = result.cursor
return result
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@
from dagster import (
AssetSelection,
AutomationCondition,
AutomationConditionTester,
Definitions,
HourlyPartitionsDefinition,
StaticPartitionsDefinition,
asset,
evaluate_automation_conditions,
)
from dagster._core.definitions.declarative_automation.automation_condition_tester import (
AutomationConditionTester,
)
from dagster._core.instance import DagsterInstance


@asset(
Expand All @@ -34,7 +38,7 @@ def unpartitioned() -> None: ...
defs = Definitions(assets=[hourly, static, unpartitioned])


def test_current_time_manipulation() -> None:
def test_automation_tester_current_time_manipulation() -> None:
tester = AutomationConditionTester(
defs=defs,
asset_selection=AssetSelection.assets(hourly),
Expand All @@ -54,3 +58,22 @@ def test_current_time_manipulation() -> None:
result = tester.evaluate()
assert result.total_requested == 0
assert result.get_requested_partitions(hourly.key) == set()


def test_basic() -> None:
instance = DagsterInstance.ephemeral()

result = evaluate_automation_conditions(
defs=defs,
asset_selection=AssetSelection.assets(unpartitioned),
instance=instance,
)
assert result.total_requested == 1

result = evaluate_automation_conditions(
defs=defs,
asset_selection=AssetSelection.assets(unpartitioned),
instance=instance,
cursor=result.cursor,
)
assert result.total_requested == 0

0 comments on commit 18549a4

Please sign in to comment.