Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Plugin]: Add example of per model dbt execution #3135

Open
Tracked by #5783
brandon-segal opened this issue Dec 6, 2022 · 5 comments
Open
Tracked by #5783

[Plugin]: Add example of per model dbt execution #3135

brandon-segal opened this issue Dec 6, 2022 · 5 comments
Assignees
Labels
documentation Improvements or additions to documentation hacktoberfest plugins Plugins related labels (backend or frontend)

Comments

@brandon-segal
Copy link

Provide a documented example of how to do substantial orchestration of a dbt task. I was interested to see if I could replicate the logic in this proposed Airlfow DAG orchestration that orchestrates a dbt project. In the workflow they do the following:

  1. Runs dbt compile to create a fresh copy of manifest.json (Where dependencies are stored)
  2. Reads the model selectors defined in the YAML file
  3. Uses the dbt ls command to list all of the models associated with each model selector in the YAML file
  4. Turns the dbt DAG from manifest.json into a Graph object with the networkx library
  5. Uses the methods available on the Graph object to figure out the correct set of dependencies for each group of models defined in the YAML file
  6. Writes the dependencies for each group of models (stored as a list of tuples) to file
  7. Create an Airflow DAG for each group of models based on the given dependencies
  8. This DAG is then registered with their orchestrator

Is this kind of logic possible using something like @dynamic workflows to dynamically generate the tasks during execution or through a script that generates it prior to registration?

@brandon-segal brandon-segal changed the title [Plugin]: [Plugin]: Add example of per model dbt execution Dec 6, 2022
@cosmicBboy cosmicBboy added documentation Improvements or additions to documentation plugins Plugins related labels (backend or frontend) labels Dec 19, 2022
@timle2
Copy link

timle2 commented Mar 8, 2023

I've set up a blog post on this topic, here's the relevant section to this discussion. This is something I've been interested in doing for a long time, and we set up a demo of this (which is the write up) over the summer using containerTasks. I think this can easily be converted to dbtrun tasks as well though.

Perhaps the dbt DAG export could be explicitly written as a dbt plugin task. But I've also submitted a feature request to make 'execution plan exports' a standard feature in dbt here. Otherwise creating the DAG in Flyte can be easily accomplished with ImperativeWorkflows

Here's a summary of the steps to get the DAG

  • Generate manifest, and graph (see dbt.graph; it uses networkx.classes.digraph.DiGraph)
  • Parse selectors, get the selection spect, apply to manifest to get affected nodes, apply to graph to select just selected nodes
  • Create a GraphQueue (dbt.graph.queue) from the graph
  • Iterate across the queue (it's ordered by score, based on topological sort) to get run order. Pull any parents, if they exist for each step.
    (dbt 1.5 is doing major rewrites to the interface, so maybe this gets easier; I've also submitted a feature request to make 'execution plan exports' a standard feature in dbt (here)[https://github.com/[CT-2272] [Feature] Export/list DAG execution plan dbt-labs/dbt-core#7137]
    In this way, we get an execution plan that mirrors the run process of dbt with maximum fidelity.

This execution plan can then be built in Flyte, with ImperativeWorkflows. In my article I've done that with ContainterTasks, but I think it's simple to convert over to DBTRun tasks if someone wanted!
Here's a POC for how that could start to look with dbttasks

from flytekitplugins.dbt.task import DBTRun
from flytekitplugins.dbt.schema import DBTRunInput
from flytekit.core.workflow import ImperativeWorkflow



DBT_PROJECT_DIR = "/Users/timothyl/git/tim-flyte-test/dbt_demo_project"
DBT_PROFILES_DIR = "/Users/timothyl/git/tim-flyte-test/dbt_demo_project"
DBT_PROFILE = "bq-oauth"

input_ = DBTRunInput(
        project_dir=DBT_PROJECT_DIR,
        profiles_dir=DBT_PROFILES_DIR,
        profile=DBT_PROFILE,
        select=["tag:something"])

task_1 = DBTRun(name="test-task")
task_2 = DBTRun(name="test-task2")


wb3 = ImperativeWorkflow(name='imperative_dbt_demo')

task1_task_id = wb3.add_entity(task_1, input = input_)
task2_task_id = wb3.add_entity(task_2, input = input_)

task2_task_id.runs_before(task1_task_id)

Copy link

github-actions bot commented Dec 4, 2023

Hello 👋, this issue has been inactive for over 9 months. To help maintain a clean and focused backlog, we'll be marking this issue as stale and will engage on it to decide if it is still applicable.
Thank you for your contribution and understanding! 🙏

@someshfengde
Copy link

@davidmirror-ops can you assign this issue to me?

@davidmirror-ops
Copy link
Contributor

@someshfengde yes! Please let us know soon if you have questions.

@someshfengde
Copy link

yes sure thanks for assigning :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation hacktoberfest plugins Plugins related labels (backend or frontend)
Projects
None yet
Development

No branches or pull requests

5 participants