Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Microbatch parallelism #10958

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/dbt/artifacts/schemas/batch_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@
successful=self.successful + other.successful,
failed=self.failed + other.failed,
)

def __len__(self):
return len(self.successful) + len(self.failed)

Check warning on line 24 in core/dbt/artifacts/schemas/batch_results.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/artifacts/schemas/batch_results.py#L24

Added line #L24 was not covered by tests
24 changes: 18 additions & 6 deletions core/dbt/task/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from dbt.graph import Graph, GraphQueue, ResourceTypeSelector
from dbt.node_types import NodeType
from dbt.task.base import BaseRunner, resource_types_from_args
from dbt.task.run import MicrobatchModelRunner
from dbt_common.events.functions import fire_event

from .run import ModelRunner as run_model_runner
Expand Down Expand Up @@ -141,13 +142,13 @@

def handle_model_with_unit_tests_node(self, node, pool, callback):
self._raise_set_error()
args = [node]
args = [node, pool]
if self.config.args.single_threaded:
callback(self.call_model_and_unit_tests_runner(*args))
else:
pool.apply_async(self.call_model_and_unit_tests_runner, args=args, callback=callback)

def call_model_and_unit_tests_runner(self, node) -> RunResult:
def call_model_and_unit_tests_runner(self, node, pool) -> RunResult:
assert self.manifest
for unit_test_unique_id in self.model_to_unit_test_map[node.unique_id]:
unit_test_node = self.manifest.unit_tests[unit_test_unique_id]
Expand All @@ -166,6 +167,10 @@
if runner.node.unique_id in self._skipped_children:
cause = self._skipped_children.pop(runner.node.unique_id)
runner.do_skip(cause=cause)

if isinstance(runner, MicrobatchModelRunner):
return self.handle_microbatch_model(runner, pool)

Check warning on line 172 in core/dbt/task/build.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/build.py#L172

Added line #L172 was not covered by tests

return self.call_runner(runner)

# handle non-model-plus-unit-tests nodes
Expand All @@ -177,11 +182,12 @@
if runner.node.unique_id in self._skipped_children:
cause = self._skipped_children.pop(runner.node.unique_id)
runner.do_skip(cause=cause)
args = [runner]
if self.config.args.single_threaded:
callback(self.call_runner(*args))

if isinstance(runner, MicrobatchModelRunner):
callback(self.handle_microbatch_model(runner, pool))
else:
pool.apply_async(self.call_runner, args=args, callback=callback)
args = [runner]
self._submit(pool, args, callback)

# Make a map of model unique_ids to selected unit test unique_ids,
# for processing before the model.
Expand Down Expand Up @@ -210,6 +216,12 @@
)

def get_runner_type(self, node) -> Optional[Type[BaseRunner]]:
if (
node.resource_type == NodeType.Model
and super().get_runner_type(node) == MicrobatchModelRunner
):
return MicrobatchModelRunner

return self.RUNNER_MAP.get(node.resource_type)

# Special build compile_manifest method to pass add_test_edges to the compiler
Expand Down
Loading
Loading