Skip to content

Commit

Permalink
feat: create dataset schema versions when writing dataset
Browse files Browse the repository at this point in the history
Signed-off-by: David Goss <[email protected]>
  • Loading branch information
davidjgoss committed Mar 9, 2024
1 parent 78a191b commit 2d26f95
Show file tree
Hide file tree
Showing 26 changed files with 300 additions and 23 deletions.
33 changes: 33 additions & 0 deletions api/src/main/java/marquez/common/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -59,6 +61,7 @@
import marquez.service.models.LineageEvent;
import marquez.service.models.LineageEvent.ParentRunFacet;
import marquez.service.models.StreamMeta;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;

public final class Utils {
Expand Down Expand Up @@ -351,6 +354,36 @@ private static Version newDatasetVersionFor(DatasetVersionData data) {
return Version.of(UUID.nameUUIDFromBytes(bytes));
}

/**
* Returns a new {@link Version} object based on the dataset id and collection of fields
* information. A {@link Version} is generated by concatenating the provided metadata together
* (delimited by a colon). For fields, only the name and type contribute. The resulting string is
* then converted to a {@code byte} array and passed to {@link UUID#nameUUIDFromBytes(byte[])}.
*
* @param namespaceName The namespace of the dataset.
* @param datasetName The dataset name.
* @param fields The fields of the dataset.
* @return A {@link Version} object based on the specified dataset metadata.
*/
public static Version newDatasetSchemaVersionFor(
String namespaceName, String datasetName, Collection<Pair<String, String>> fields) {
final byte[] bytes =
VERSION_JOINER
.join(
namespaceName,
datasetName,
Optional.ofNullable(fields).orElse(Collections.emptyList()).stream()
.sorted()
.map(Utils::joinField)
.collect(joining(VERSION_DELIM)))
.getBytes(UTF_8);
return Version.of(UUID.nameUUIDFromBytes(bytes));
}

private static String joinField(Pair<String, String> field) {
return VERSION_JOINER.join(field.getLeft(), field.getRight());
}

private static String joinField(Triple<String, String, String> field) {
return VERSION_JOINER.join(field.getLeft(), field.getMiddle(), field.getRight());
}
Expand Down
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/db/BaseDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ public interface BaseDao extends SqlObject {
@CreateSqlObject
DatasetVersionDao createDatasetVersionDao();

@CreateSqlObject
DatasetSchemaVersionDao createDatasetSchemaVersionDao();

@CreateSqlObject
JobDao createJobDao();

Expand Down
1 change: 1 addition & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ private Columns() {}
public static final String NAMESPACE_UUID = "namespace_uuid";
public static final String DATASET_UUID = "dataset_uuid";
public static final String DATASET_VERSION_UUID = "dataset_version_uuid";
public static final String DATASET_SCHEMA_VERSION_UUID = "dataset_schema_version_uuid";
public static final String JOB_VERSION_UUID = "job_version_uuid";
public static final String CURRENT_VERSION_UUID = "current_version_uuid";
public static final String CHECKSUM = "checksum";
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/db/DatasetDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ default void setFields(Dataset ds) {
ds.getCurrentVersion()
.ifPresent(
dsv -> {
ds.setFields(datasetFieldDao.find(dsv));
ds.setFields(datasetFieldDao.findByDatasetVersion(dsv));
});
}

Expand Down
13 changes: 12 additions & 1 deletion api/src/main/java/marquez/db/DatasetFieldDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,18 @@ WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symli
+ "FROM dataset_fields f "
+ "INNER JOIN dataset_versions_field_mapping fm on fm.dataset_field_uuid = f.uuid "
+ "WHERE fm.dataset_version_uuid = :datasetVersionUuid")
List<Field> find(UUID datasetVersionUuid);
List<Field> findByDatasetVersion(UUID datasetVersionUuid);

@SqlQuery(
"SELECT f.*, "
+ "ARRAY(SELECT t.name "
+ " FROM dataset_fields_tag_mapping m "
+ " INNER JOIN tags t on t.uuid = m.tag_uuid "
+ " WHERE m.dataset_field_uuid = f.uuid) AS tags "
+ "FROM dataset_fields f "
+ "INNER JOIN dataset_schema_versions_field_mapping fm on fm.dataset_field_uuid = f.uuid "
+ "WHERE fm.dataset_schema_version_uuid = :datasetSchemaVersionUuid")
List<Field> findByDatasetSchemaVersion(UUID datasetSchemaVersionUuid);

@SqlQuery(
"""
Expand Down
55 changes: 55 additions & 0 deletions api/src/main/java/marquez/db/DatasetSchemaVersionDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package marquez.db;

import java.time.Instant;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import marquez.common.Utils;
import marquez.common.models.Version;
import marquez.db.mappers.DatasetSchemaVersionRowMapper;
import marquez.db.models.DatasetFieldRow;
import marquez.db.models.DatasetRow;
import marquez.db.models.DatasetSchemaVersionRow;
import org.apache.commons.lang3.tuple.Pair;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.statement.SqlBatch;
import org.jdbi.v3.sqlobject.statement.SqlQuery;

@RegisterRowMapper(DatasetSchemaVersionRowMapper.class)
public interface DatasetSchemaVersionDao extends BaseDao {
default Version upsertSchemaVersion(
DatasetRow datasetRow, List<DatasetFieldRow> datasetFields, Instant now) {
final Version computedVersion =
Utils.newDatasetSchemaVersionFor(
datasetRow.getNamespaceName(),
datasetRow.getName(),
datasetFields.stream()
.map(field -> Pair.of(field.getName(), field.getType()))
.collect(Collectors.toSet()));
final DatasetSchemaVersionRow newRow =
upsertSchemaVersion(computedVersion.getValue(), datasetRow.getUuid(), now);
if (newRow != null) {
// if not null it means a new insert, so we have to do the fields as well
upsertFieldMappings(
newRow.getUuid(),
datasetFields.stream().map(DatasetFieldRow::getUuid).collect(Collectors.toList()));
}
// if null then it means the version already exists, and so the fields must already exist
return computedVersion;
}

@SqlQuery(
"INSERT INTO dataset_schema_versions "
+ "(uuid, dataset_uuid, created_at) "
+ "VALUES (:uuid, :datasetUuid, :now) "
+ "ON CONFLICT DO NOTHING "
+ "RETURNING *")
DatasetSchemaVersionRow upsertSchemaVersion(UUID uuid, UUID datasetUuid, Instant now);

@SqlBatch(
"INSERT INTO dataset_schema_versions_field_mapping "
+ "(dataset_schema_version_uuid, dataset_field_uuid) "
+ "VALUES (:schemaVersionUuid, :fieldUuid) "
+ "ON CONFLICT DO NOTHING")
void upsertFieldMappings(UUID schemaVersionUuid, Iterable<UUID> fieldUuid);
}
6 changes: 4 additions & 2 deletions api/src/main/java/marquez/db/DatasetVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ default DatasetVersionRow upsertDatasetVersion(
now,
datasetUuid,
version.getValue(),
null, // TODO add schema version
datasetMeta.getRunId().map(RunId::getValue).orElse(null),
toPgObjectFields(datasetMeta.getFields()),
namespaceName,
Expand Down Expand Up @@ -302,9 +303,9 @@ default List<DatasetVersion> findAllWithRun(

@SqlQuery(
"INSERT INTO dataset_versions "
+ "(uuid, created_at, dataset_uuid, version, run_uuid, fields, namespace_name, dataset_name, lifecycle_state) "
+ "(uuid, created_at, dataset_uuid, version, dataset_schema_version_uuid, run_uuid, fields, namespace_name, dataset_name, lifecycle_state) "
+ "VALUES "
+ "(:uuid, :now, :datasetUuid, :version, :runUuid, :fields, :namespaceName, :datasetName, :lifecycleState) "
+ "(:uuid, :now, :datasetUuid, :version, :schemaVersionUuid, :runUuid, :fields, :namespaceName, :datasetName, :lifecycleState) "
+ "ON CONFLICT(version) "
+ "DO UPDATE SET "
+ "run_uuid = EXCLUDED.run_uuid "
Expand All @@ -314,6 +315,7 @@ DatasetVersionRow upsert(
Instant now,
UUID datasetUuid,
UUID version,
UUID schemaVersionUuid,
UUID runUuid,
PGobject fields,
String namespaceName,
Expand Down
1 change: 1 addition & 0 deletions api/src/main/java/marquez/db/JobVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ private static ExtendedDatasetVersionRow toExtendedDatasetVersionRow(DatasetReco
d.getDatasetRow().getCreatedAt(),
d.getDatasetVersionRow().getDatasetUuid(),
d.getDatasetVersionRow().getVersion(),
d.getDatasetVersionRow().getSchemaVersionUuid().orElse(null),
null,
null,
d.getDatasetRow().getNamespaceName(),
Expand Down
41 changes: 24 additions & 17 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,20 @@ default DatasetRecord upsertLineageDataset(
Optional.ofNullable(ds.getFacets())
.map(DatasetFacets::getSchema)
.map(SchemaDatasetFacet::getFields)
.orElse(null);
.orElse(Collections.emptyList());
List<DatasetFieldRow> datasetFields = new ArrayList<>();
for (SchemaField field : fields) {
DatasetFieldRow datasetFieldRow =
daos.getDatasetFieldDao()
.upsert(
UUID.randomUUID(),
now,
field.getName(),
field.getType(),
field.getDescription(),
datasetRow.getUuid());
datasetFields.add(datasetFieldRow);
}

final DatasetRow dsRow = datasetRow;
DatasetVersionRow datasetVersionRow =
Expand All @@ -931,38 +944,32 @@ default DatasetRecord upsertLineageDataset(
fields,
runUuid)
.getValue();
UUID datasetSchemaVersionUuid =
daos.getDatasetSchemaVersionDao()
.upsertSchemaVersion(dsRow, datasetFields, now)
.getValue();
DatasetVersionRow row =
daos.getDatasetVersionDao()
.upsert(
UUID.randomUUID(),
now,
dsRow.getUuid(),
versionUuid,
datasetSchemaVersionUuid,
isInput ? null : runUuid,
daos.getDatasetVersionDao().toPgObjectSchemaFields(fields),
dsNamespace.getName(),
ds.getName(),
dslifecycleState);
return row;
});

List<DatasetFieldMapping> datasetFieldMappings = new ArrayList<>();
List<DatasetFieldRow> datasetFields = new ArrayList<>();
if (fields != null) {
for (SchemaField field : fields) {
DatasetFieldRow datasetFieldRow =
daos.getDatasetFieldDao()
.upsert(
UUID.randomUUID(),
now,
field.getName(),
field.getType(),
field.getDescription(),
datasetRow.getUuid());
datasetFields.add(datasetFieldRow);
datasetFieldMappings.add(
new DatasetFieldMapping(datasetVersionRow.getUuid(), datasetFieldRow.getUuid()));
}
for (DatasetFieldRow datasetFieldRow : datasetFields) {
datasetFieldMappings.add(
new DatasetFieldMapping(datasetVersionRow.getUuid(), datasetFieldRow.getUuid()));
}

daos.getDatasetFieldDao().updateFieldMapping(datasetFieldMappings);

if (isInput && runUuid != null) {
Expand Down
1 change: 1 addition & 0 deletions api/src/main/java/marquez/db/RunDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ default void upsertOutputDatasetsFor(UUID runUuid, ImmutableSet<DatasetId> runOu
Instant.now(),
dsRow.get().getUuid(),
version,
null, // TODO add schema version
runUuid,
datasetVersionDao.toPgObjectFields(d.getFields()),
d.getNamespace().getValue(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package marquez.db.mappers;

import static marquez.db.Columns.timestampOrThrow;
import static marquez.db.Columns.uuidOrThrow;

import java.sql.ResultSet;
import java.sql.SQLException;
import lombok.NonNull;
import marquez.db.Columns;
import marquez.db.models.DatasetSchemaVersionRow;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;

public final class DatasetSchemaVersionRowMapper implements RowMapper<DatasetSchemaVersionRow> {
@Override
public DatasetSchemaVersionRow map(@NonNull ResultSet results, @NonNull StatementContext context)
throws SQLException {
return new DatasetSchemaVersionRow(
uuidOrThrow(results, Columns.ROW_UUID),
uuidOrThrow(results, Columns.DATASET_UUID),
timestampOrThrow(results, Columns.CREATED_AT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public DatasetVersionRow map(@NonNull ResultSet results, @NonNull StatementConte
timestampOrThrow(results, Columns.CREATED_AT),
uuidOrThrow(results, Columns.DATASET_UUID),
uuidOrThrow(results, Columns.VERSION),
uuidOrNull(results, Columns.DATASET_SCHEMA_VERSION_UUID),
stringOrNull(results, Columns.LIFECYCLE_STATE),
uuidOrNull(results, Columns.RUN_UUID),
stringOrNull(results, Columns.NAMESPACE_NAME),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public ExtendedDatasetVersionRow map(
timestampOrThrow(results, Columns.CREATED_AT),
uuidOrThrow(results, Columns.DATASET_UUID),
uuidOrThrow(results, Columns.VERSION),
uuidOrNull(results, Columns.DATASET_SCHEMA_VERSION_UUID),
stringOrNull(results, Columns.LIFECYCLE_STATE),
uuidOrNull(results, Columns.RUN_UUID),
stringOrNull(results, Columns.NAMESPACE_NAME),
Expand Down
19 changes: 19 additions & 0 deletions api/src/main/java/marquez/db/models/DatasetSchemaVersionRow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package marquez.db.models;

import java.time.Instant;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;

@AllArgsConstructor
@EqualsAndHashCode
@Getter
@ToString
public class DatasetSchemaVersionRow {
@NonNull private final UUID uuid;
@NonNull private final UUID datasetUuid;
@NonNull private final Instant createdAt;
}
5 changes: 5 additions & 0 deletions api/src/main/java/marquez/db/models/DatasetVersionRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@ public class DatasetVersionRow {
@Getter @NonNull private final Instant createdAt;
@Getter @NonNull private final UUID datasetUuid;
@Getter @NonNull private final UUID version;
@Nullable private final UUID schemaVersionUuid;
@Getter @Nullable private final String lifecycleState;
@Nullable private final UUID runUuid;
@Getter @Nullable private final String namespaceName;
@Getter @Nullable private final String datasetName;

public Optional<UUID> getSchemaVersionUuid() {
return Optional.ofNullable(schemaVersionUuid);
}

public Optional<UUID> getRunUuid() {
return Optional.ofNullable(runUuid);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,21 @@ public ExtendedDatasetVersionRow(
@NonNull Instant createdAt,
@NonNull UUID datasetUuid,
@NonNull UUID version,
UUID schemaVersionUuid,
@Nullable String lifecycleState,
UUID runUuid,
@NonNull final String namespaceName,
@NonNull final String datasetName) {
super(
uuid, createdAt, datasetUuid, version, lifecycleState, runUuid, namespaceName, datasetName);
uuid,
createdAt,
datasetUuid,
version,
schemaVersionUuid,
lifecycleState,
runUuid,
namespaceName,
datasetName);
this.namespaceName = namespaceName;
this.datasetName = datasetName;
}
Expand Down
Loading

0 comments on commit 2d26f95

Please sign in to comment.