Envelope provides the ability to loop over one or more steps of a pipeline so that each instance does not need to be specified. The values to be looped over can be determined dynamically either from parameters given at pipeline submission or from the data of previous steps in the pipeline.
Loops are specified in an Envelope pipeline by adding a loop step. This is in contrast to a data step where data itself is manipulated. A loop step defines how the loop will run, such as parallel or serial, and over which values. The steps that the loop step then loops over is known as the loop graph, and is defined by all of the steps in the pipeline that are directly dependent on the loop step. Steps that are dependent on loop graph steps, but not the loop step itself, will run after all iterations of the loop.
Loops are executed by unrolling the loop into the steps that would have been created if the iterations of the loop were specified individually. The names of the loop graph steps of each iteration are suffixed with the iteration value to distinguish one instance from another. This allows the steps of all iterations to be accessible within the loop and in steps after the loop completes.
Loop steps have three options for specifying the of values to be looped over:
-
range
provides an inclusive range of integers -
list
provides an ordered list of values -
step
provides the values from a previous step
A loop step can specify a parameter, which is a string that will be used to update the configuration of each loop graph instance with the value of the loop. This could be used to run iterations of the loop over, for example, a range of dates, or a list of table names. For a given parameter parameter_name
all instances of ${parameter_name}
in the loop graph step configurations will be replaced with the value of the loop iteration.
Consider an example where there is a monthly job that needs to iterate through the dates of the month one at a time, and in sequence. Rather than individually listing out steps for each date of the month we can use a loop step to iterate over the dates.
application { name = Loop step example } steps { read_month { input { type = hive table = staging_table } } loop_dates { dependencies = [read_month] type = loop mode = serial source = range range.start = ${first_date} range.end = ${last_date} parameter = processing_date } process_date { dependencies = [loop_dates] deriver { type = sql query.literal = "SELECT * FROM read_month WHERE data_date = ${processing_date}" } planner { type = append } output { type = hive table = processed_table } } }
The pipeline could be run with the dates provided like below:
spark2-submit envelope-*.jar process_dates.conf first_date=20170501,last_date=20170531
When Envelope reaches the loop step (after read_month
) it would unroll the loop into:
application { name = Loop step example } steps { read_month { input { type = hive table = staging_table } } loop_dates { dependencies = [read_month] type = loop mode = serial source = range range.start = 20170501 range.end = 20170531 parameter = processing_date } process_date_20170501 { dependencies = [loop_dates] deriver { type = sql query.literal = "SELECT * FROM read_month WHERE data_date = 20170501" } planner { type = append } output { type = hive table = processed_table } } process_date_20170502 { dependencies = [loop_dates, process_date_20170501] deriver { type = sql query.literal = "SELECT * FROM read_month WHERE data_date = 20170502" } planner { type = append } output { type = hive table = processed_table } } ... process_date_20170531 { dependencies = [loop_dates, process_date_20170530] deriver { type = sql query.literal = "SELECT * FROM read_month WHERE data_date = 20170531" } planner { type = append } output { type = hive table = processed_table } } }