-
Notifications
You must be signed in to change notification settings - Fork 137
/
Copy pathhello_armada.py
86 lines (80 loc) · 2.73 KB
/
hello_armada.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
from armada.model import GrpcChannelArgs
from armada.operators.armada import ArmadaOperator
from armada_client.armada import submit_pb2
from armada_client.k8s.io.api.core.v1 import generated_pb2 as core_v1
from armada_client.k8s.io.apimachinery.pkg.api.resource import \
generated_pb2 as api_resource
def submit_sleep_job():
"""
This is a PodSpec definition that allows you to run sleep.
This returns an array of JobSubmitRequestItems that allows you
to submit to Armada.
"""
pod = core_v1.PodSpec(
containers=[
core_v1.Container(
name="sleep",
image="busybox",
args=["sleep", "10s"],
securityContext=core_v1.SecurityContext(runAsUser=1000),
resources=core_v1.ResourceRequirements(
requests={
"cpu": api_resource.Quantity(string="120m"),
"memory": api_resource.Quantity(string="510Mi"),
},
limits={
"cpu": api_resource.Quantity(string="120m"),
"memory": api_resource.Quantity(string="510Mi"),
},
),
)
],
)
return [
submit_pb2.JobSubmitRequestItem(
priority=1,
pod_spec=pod,
namespace="personal-anonymous",
annotations={"armadaproject.io/hello": "world"},
)
]
"""
This is an example of a Airflow dag that uses a BashOperator and an ArmadaOperator
"""
with DAG(
dag_id="hello_armada",
start_date=pendulum.datetime(2016, 1, 1, tz="UTC"),
schedule_interval="@daily",
catchup=False,
default_args={"retries": 2},
) as dag:
"""
The ArmadaOperator requires grpc.channel arguments for armada.
"""
armada_channel_args = GrpcChannelArgs(target="127.0.0.1:50051")
"""
This defines an Airflow task that runs Hello World and it gives the airflow
task name of dummy.
"""
op = BashOperator(task_id="dummy", bash_command="echo Hello World!")
"""
This is creating an Armada task with the task_id of armada and name of armada.
The Airflow operator needs queue for Armada
You should reuse them for all your tasks.
This job will use the podspec defined above.
"""
armada = ArmadaOperator(
task_id="armada",
name="armada",
armada_queue="test",
channel_args=armada_channel_args,
job_request=submit_sleep_job()[0],
lookout_url_template="http://127.0.0.1:8089/jobs?job_id=<job_id>",
)
"""
Airflow dag syntax for running op and then armada.
"""
op >> armada