Derivers are used in Envelope pipelines to derive new data. Derivers transform data that has already been brought into the pipeline in previous steps, whether straight from inputs or from the results of previous derivers. Derivers can operate over one or multiple steps at a time. Derivers operate identically in batch and streaming modes. As a mapping back to Spark, derivers create new DataFrames from existing DataFrames.
Envelope provides a number of derivers out of the box and also allows custom derivers to be specified.
The sql
deriver is used to run SQL to derive new data in the pipeline. Queries submitted to the deriver are executed by Spark SQL. The results of the SQL query then becomes the data of the step that contains the deriver.
The query provided to the deriver can select from two types of sources:
-
From previous steps by referencing the previous step name as the table name
-
From the Hive metastore by referencing the Hive table name, and optionally with a Hive metastore database name prefix (i.e.
databasename.tablename
)
A query can be provided to a SQL deriver instance in one of two ways:
-
The
query.literal
configuration is used to provide the SQL query directly in the pipeline configuration file. This is good for very short queries or for rapid development. -
The
query.file
configuration is used to provide the path to an HDFS file that contains the SQL query to run. This is good for long queries or for where the development of the query is done separately to the pipeline.
Note that the SQL deriver should only be used to run SELECT queries that derive new data. To write data outside of the pipeline the step should additionally specify a planner and output.
For example, where traffic
was a previous step in the pipeline:
deriver { type = sql query.literal = """ SELECT UNIX_TIMESTAMP() * 1000 as_of_time , ROUND(AVG(number_of_vehicles), 2) avg_num_veh , MIN(number_of_vehicles) min_num_veh , MAX(number_of_vehicles) max_num_veh , MIN(measurement_time) first_meas_time , MAX(measurement_time) last_meas_time FROM traffic""" }
The morphline
deriver is used to run Morphline transformations over the records of a single dependency of the step defined by the step.name
parameter.
The Morphline transformation is provided to the Envelope pipeline by a local file to the Spark executors. The local file is retrieved from the location in the morphline.file
configuration. The local file can be provided to the Spark executors from spark2-submit
using the --files
option.
The ID of the specific transformation within the Morphline file is specified with the morphline.id
configuration.
The deriver requires the output schema of the Morphline transformation to be provided using the field.names
and field.types
configurations.
The nest
deriver is used to nest the data of one step within another by a common join key. This is useful for denormalizing a one-to-many relationship without repeating values on the one-cardinality side. This type of data modeling is known as a supernova schema.
To configure the deriver to nest a one-to-many relationship, specify:
-
The one-cardinality step name in
nest.into
-
The many-cardinality step name in
nest.from
-
The join key field names in
key.field.names
-
The name of the nested field on the derivation in
nested.field.name
Consider the following simple example where we have a customers table and an orders table (a one-to-many relationship because a customer can have many orders but an order can only belong to one customer) and we want to nest the orders for a customer on to the customer’s record so that we can query across the two data sets without the cost of joining the two at runtime.
customers
:
customer_id | name |
---|---|
10000 |
Jane |
10001 |
Joe |
orders
:
order_id | product_name | customer_id |
---|---|---|
1000 |
Envelopes |
10000 |
1001 |
Stamps |
10000 |
1002 |
Pens |
10000 |
1003 |
Paper |
10001 |
To nest the orders
step into the customers
step we could run a subsequent step with:
... steps { customers { ... } orders { ... } customers_nested { dependencies = [customers, orders] deriver { type = nest nest.from = orders nest.into = customers key.field.names = [customer_id] nested.field.name = customer_orders } ... } ... } ...
Which would produce the derived result:
customers_nested
:
customer_id | name | customer_orders | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
10000 |
Jane |
|
||||||||||||
10001 |
Joe |
|
In Impala if was then written to a Parquet table the data could be queried with syntax like:
SELECT c.customer_name, COUNT(o.order_id) FROM customers_nested c, c.customer_orders o;
For more information on querying nested tables using Impala, see the complex types documentation.
The passthrough
deriver simply unions all of its dependencies together. All of the dependencies must have the same schema.
The pivot
deriver is used to pivot (otherwise known as transpose) key-value-pair data where the derivation has a column per unique key. This can be useful when the source data model of the pipeline defines attributes for an entity via key-value-pairs but the transformed data model of the pipeline should have distinct columns per attribute for simpler and more efficient analytics querying.
To configure the deriver to pivot a previous step that contains attributes for an entity defines as key-value-pairs, specify:
-
The name of the previous step to pivot with
step.name
-
The fields that define the entity key with
entity.key.field.names
-
The field that defines the key of the key-value-pairs with
pivot.key.field.name
-
The field that defines the value of the key-value-pairs with
pivot.value.field.name
-
The method to use for retrieving the entity attributes with
pivot.keys.source
. For the deriver to dynamically find the distinct keys of the key-value-pairs usedynamic
. To provide a static list of keys usestatic
. -
The static list of keys with
pivot.keys.list
, when using thestatic
method for retrieving pivot keys.
Consider the following simple example where we have a key-value-pairs step that captures the attributes of each customer with one record per attribute per customer, and we want to derive a pivoted (transposed) step that captures the same attributes of each customer but with one record per customer.
customers_kvp
:
customer_id | key | value |
---|---|---|
10000 |
name |
Jane |
10000 |
state |
NY |
10000 |
balance |
50000.0 |
10001 |
name |
Joe |
10001 |
state |
CA |
10001 |
balance |
30000.0 |
To pivot the customers_kvp
step we could run a subsequent step with:
... steps { customers_kvp { ... } customers_pivoted { dependencies = [customers_kvp] deriver { type = pivot step.name = customers_kvp entity.key.field.names = [customer_id] pivot.key.field.name = key pivot.value.field.name = value pivot.keys.source = dynamic } ... } ... } ...
Which would produce the derived result:
customers_kvp
:
customer_id | name | state | balance |
---|---|---|---|
10000 |
Jane |
NY |
50000.0 |
10001 |
Joe |
CA |
30000.0 |
The exclude
deriver executes a LEFT ANTI JOIN
on two designated dependencies on a set of common fields between the two. Commonly, this deriver is used for easy de-duplication within a pipeline.
The equivalent SQL statement would read:
SELECT Left.* FROM Left LEFT ANTI JOIN Right USING (field1, field2)
The dq
deriver can be used to perform data quality checks on a dataset using a set of user-defined
rules. Rules can be applied at two scopes: at dataset or row level. For dataset scope, the rules are
evaluated against the dataset as a whole and the derived result is a dataset containing one row per rule indicating a pass or fail. The
schema of the dataset is name: String, result: Boolean
. For
example, the result might be:
name | result |
---|---|
namecheck |
true |
agerange |
false |
Row level scope takes the list of rules and applies them to every row for the defined input dependency.
The results of the checks are appended to the rows as a field of type map<string, boolean>
called
results
by default. The results would look something like:
name | age | results |
---|---|---|
Ian |
null |
{"namenotnull":true,"agerange":false} |
Webster |
21 |
{"namenotnull":true,"agerange":true} |
Envelope has a number of built-in rules (see below) but allows for custom user-defined rules via fully-qualified class name. See the config guide for specific configuration parameters.
The following row-level rules are provided:
-
checknulls
- check for the null values in one or more fields in a row -
enum
- check one or more fields against a list of allowed values (non-floating point numerics and strings) -
range
- check one or more numeric fields is between lower and upper bounds (inclusive) -
regex
- check one or more string fields against an allowed pattern
The following rules are defined at the dataset scope:
-
count
- ensure the dataset has an expected count. The count may either statically defined or loaded as a dependency from another step. If the latter, the Dataset must contain a single row with a single field of type long. -
checkschema
- ensure the dataset matches the schema. Currently only supports primitive types.
In addition, any defined row-level rule can be applied at the dataset scope. In this case, the deriver simply logically ANDs the individual results from each row check into a single boolean result for the rule.
If specifying multiple dependencies, the user must specify to which dependency the dataset-level rules
should be applied using the dataset
configuration parameter.
If using multiple dataset level checks on the same dataset it is recommended to employ the cache
hint
on the dependency containing the data to be checked.
An example configuration containing both dataset and row-level DQ derivers is as follows:
...
steps {
dqparams {
input {
type = filesystem
format = json
path = "hdfs:///tmp/dqparams"
}
}
mydata {
input {
type = filesystem
format = json
path = "hdfs:///tmp/data"
}
}
checkmydata {
dependencies = [mydata,dqparams]
deriver {
type = dq
scope = dataset
dataset = mydata
rules {
r1 {
type = count
expected.dependency = dqparams
}
r2 {
type = checkschema
fields = [
{ name = "name", type = "string" },
{ name = "address", type = "string },
{ name = "age", type = "age" }
]
}
r3 {
// row-level rule being run in dataset scope
type = regex
fields = ["name"]
regex = "[a-zA-Z' ]{1,}"
}
r4 {
// row-level rule beingf run in dataset scope
type = enum
fields = ["name"]
values = ["Ian","Jeremy","Webster"]
fieldtype = string
case-sensitive = false
}
}
}
}
checkrows {
dependencies = [mydata]
deriver {
type = dq
scope = row
rules {
r1 {
type = checknulls
fields = [ "name", "address", "age" ]
}
r2 {
type = regex
fields = ["name"]
regex = "[a-zA-Z' ]{1,}"
}
r3 {
type = range
fields = ["age"]
fieldtype = "int"
range = [0,150]
ignore-nulls = true
}
}
}
}
}
...
Users wishing to specify custom rules can extend either the RowRule
or DatasetRule
interface. Row level rules should implement a check(Row row)
method returning a boolean result. Dataset
scope rules should implement a check(Dataset<Row> dataset, Map<String, Dataset<Row>> stepDependencies)
method which returns a Dataset with a row per rule with the schema name: String, result: Boolean
.
Row level rules are automatically wrapped in DatasetRowRuleWrapper
when used in a dataset scope.
The custom rules may also implement the ProvidesAlias
interface which allows an alias to be used instead of the fully-qualified class name in Envelope config files. The implementation must be placed in a META-INF/services/com.cloudera.labs.envelope.derive.dq.DatasetRule
or META-INF/services/com.cloudera.labs.envelope.derive.dq.RowRule
file on the classpath.
In cases that Envelope does not provide a deriver that meets the requirements for a particular derivation a custom deriver can be developed and provided instead.
Envelope is pluggable so that Envelope itself does not need to be modified. Instead a separate jar that only contains the deriver(s) for the pipeline can be created.
To create a new deriver, first start a new Java or Scala project that has a dependency on the Envelope version you are using. You do not need to include Envelope in the packaged jar.
For example, if you are using Maven:
<dependency> <groupId>com.cloudera.labs</groupId> <artifactId>envelope-core</artifactId> <version>0.5.0</version> <scope>provided</scope> </dependency>
With the configured project you can develop the deriver by adding a class that implements the Deriver interface.
The two methods in the interface are:
-
configure
to receive the configurations of thederiver
section of the step. This can be used to retrieve any custom configurations required by the deriver. -
derive
to run a derivation. Thedependencies
argument provides the name and Spark DataFrame for each of the dependencies of the step that contains the deriver. The return value is the DataFrame that represents the derivation. Access to the SparkSession object is available from the static methodContexts#getSparkSession
.
To reference the deriver in your pipeline simply use the deriver’s fully qualified class name (or alias—see below) as the deriver type. For example:
... deriver { type = com.yourcompany.envelope.deriver.CustomDeriver customproperty1 = ... ... } ...
To use an alias in configuration files, Envelope needs to be able to find your class. First, your class will need to implement the ProvidesAlias
interface. Next, place the implementation’s fully qualified class name in a META-INF/services/com.cloudera.labs.envelope.deriver.Deriver
file on the class path - the usual method is to package the file with your JAR.
With the project compiled into a jar file the deriver can be submitted as part of the Envelope pipeline similarly to:
spark2-submit --jars customderiver.jar envelope-*.jar pipeline.conf
The jar file can contain multiple derivers, and other pluggable classes such as custom inputs, outputs, etc.
When developing a custom deriver keep in mind:
-
Derivers are only for deriving new data, and should not lead to side effects outside of the deriver, such as writing to an output or changing external metadata.
-
Derivers are often highly reusable, so avoid hard-coding values or field names into the deriver and have them be given at runtime through configuration instead.
-
Derivers are usually most efficient when they operate only on the Dataset/DataFrame API. If possible avoid converting to the RDD API and then back again.
-
You can look at the code of the provided derivers for hints as to how structure your own deriver.
-
There are utility classes in the .utils package that may already provide some of the functionality you need to put together your derivation logic.