-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathetl_helper.py
31 lines (23 loc) · 1.01 KB
/
etl_helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import luigi
from transform_and_load import TransformAndLoad
class StartETL(luigi.Task):
def requires(self):
return TransformAndLoad()
def run(self):
# Setting task status to RUNNING
self.set_status_message('ETL process started...')
pass
def output(self):
return luigi.LocalTarget('./data/transform.csv')
# These methods run every time any task is completed without any error
@luigi.Task.event_handler(luigi.Event.SUCCESS)
def task_successful_callback(self):
# You can check call back against the task using its namespace
# if ExtractData.task_namespace == self.task_namespace: (Like this you can check)
print("Task Successful!")
print("Task name:{}".format(self.task_namespace))
# This method runs every time any task is failed
@luigi.Task.event_handler(luigi.Event.FAILURE)
def task_failed_callback(self, exception):
print('Task failed!', exception)
print("Task name:{}".format(self.task_namespace))