Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add @FlywayTarget annotation to migration tests to control flyway upg… #2035

Merged
merged 2 commits into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ LEFT JOIN LATERAL (
FROM lineage_events le
WHERE le.run_uuid=r.uuid
AND event->'run'->'facets'->'parent'->'run'->>'runId' IS NOT NULL
AND event->'run'->'facets'->'airflow_version' IS NOT NULL
AND event->'run'->'facets'->>'airflow_version' IS NOT NULL
) e ON e.run_uuid=r.uuid
WHERE e.parent_run_id IS NOT NULL
""";
Expand All @@ -76,8 +76,8 @@ INSERT INTO jobs (uuid, type, created_at, updated_at, namespace_uuid, name, desc
current_location
FROM jobs
WHERE namespace_name=:namespace AND name=:jobName
ON CONFLICT(name, namespace_uuid) WHERE parent_job_uuid IS NULL
DO UPDATE SET updated_at=now()
ON CONFLICT (name, namespace_uuid) WHERE parent_job_uuid IS NULL
DO UPDATE SET updated_at=EXCLUDED.updated_at
wslulciuc marked this conversation as resolved.
Show resolved Hide resolved
RETURNING uuid
""";
public static final String INSERT_PARENT_RUN_QUERY =
Expand Down
80 changes: 54 additions & 26 deletions api/src/test/java/marquez/db/BackfillTestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import marquez.common.Utils;
import marquez.common.models.JobType;
import marquez.db.models.ExtendedRunRow;
import marquez.db.models.JobRow;
import marquez.db.models.NamespaceRow;
import marquez.db.models.RunArgsRow;
import marquez.service.models.LineageEvent;
Expand All @@ -36,7 +35,7 @@
public class BackfillTestUtils {
public static final String COMPLETE = "COMPLETE";

public static void writeNewEvent(
public static ExtendedRunRow writeNewEvent(
Jdbi jdbi,
String jobName,
Instant now,
Expand All @@ -45,25 +44,9 @@ public static void writeNewEvent(
String parentJobName)
throws SQLException, JsonProcessingException {
OpenLineageDao openLineageDao = jdbi.onDemand(OpenLineageDao.class);
JobDao jobDao = jdbi.onDemand(JobDao.class);
RunArgsDao runArgsDao = jdbi.onDemand(RunArgsDao.class);
RunDao runDao = jdbi.onDemand(RunDao.class);
PGobject pgInputs = new PGobject();
pgInputs.setType("json");
pgInputs.setValue("[]");
JobRow jobRow =
jobDao.upsertJob(
UUID.randomUUID(),
JobType.BATCH,
now,
namespace.getUuid(),
NAMESPACE,
jobName,
"description",
null,
null,
null,
pgInputs);
UUID jobUuid = writeJob(jdbi, jobName, now, namespace);

RunArgsRow runArgsRow =
runArgsDao.upsertRunArgs(
Expand All @@ -75,7 +58,7 @@ public static void writeNewEvent(
null,
runUuid.toString(),
now,
jobRow.getUuid(),
jobUuid,
null,
runArgsRow.getUuid(),
now,
Expand All @@ -91,6 +74,15 @@ public static void writeNewEvent(
Instant.now().atZone(LOCAL_ZONE).truncatedTo(ChronoUnit.HOURS));
nominalTimeRunFacet.setNominalEndTime(
nominalTimeRunFacet.getNominalStartTime().plus(1, ChronoUnit.HOURS));
Optional<ParentRunFacet> parentRun =
Optional.ofNullable(parentRunId)
.map(
runId ->
new ParentRunFacet(
PRODUCER_URL,
LineageTestUtils.SCHEMA_URL,
new RunLink(runId),
new JobLink(NAMESPACE, parentJobName)));
LineageEvent event =
new LineageEvent(
COMPLETE,
Expand All @@ -99,11 +91,7 @@ public static void writeNewEvent(
runUuid.toString(),
new RunFacet(
nominalTimeRunFacet,
new ParentRunFacet(
LineageTestUtils.PRODUCER_URL,
LineageTestUtils.SCHEMA_URL,
new RunLink(parentRunId),
new JobLink(NAMESPACE, parentJobName)),
parentRun.orElse(null),
ImmutableMap.of("airflow_version", ImmutableMap.of("version", "abc")))),
new LineageEvent.Job(
NAMESPACE, jobName, new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP)),
Expand All @@ -121,5 +109,45 @@ NAMESPACE, jobName, new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP)),
namespace.getName(),
eventJson,
PRODUCER_URL.toString());
return runRow;
}

public static UUID writeJob(Jdbi jdbi, String jobName, Instant now, NamespaceRow namespace)
throws SQLException {
PGobject pgInputs = new PGobject();
pgInputs.setType("json");
pgInputs.setValue("[]");
return jdbi.withHandle(
h -> {
UUID jobContextUuid =
h.createQuery(
"""
INSERT INTO job_contexts (uuid, created_at, context, checksum) VALUES (:uuid, :now, :context, :checksum)
ON CONFLICT (checksum) DO UPDATE SET created_at=EXCLUDED.created_at
RETURNING uuid
""")
.bind("uuid", UUID.randomUUID())
.bind("now", now)
.bind("context", "")
.bind("checksum", "")
.mapTo(UUID.class)
.first();
return h.createQuery(
"""
INSERT INTO jobs (uuid, type, created_at, updated_at, namespace_uuid, name, namespace_name, current_job_context_uuid, current_inputs)
VALUES (:uuid, :type, :now, :now, :namespaceUuid, :name, :namespaceName, :currentJobContextUuid, :currentInputs)
RETURNING uuid
""")
.bind("uuid", UUID.randomUUID())
.bind("type", marquez.client.models.JobType.BATCH)
.bind("now", now)
.bind("namespaceUuid", namespace.getUuid())
.bind("name", jobName)
.bind("namespaceName", namespace.getName())
.bind("currentJobContextUuid", jobContextUuid)
.bind("currentInputs", pgInputs)
.mapTo(UUID.class)
.first();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,23 @@
package marquez.db.migrations;

import static marquez.db.LineageTestUtils.NAMESPACE;
import static marquez.db.LineageTestUtils.createLineageRow;
import static org.assertj.core.api.Assertions.assertThat;

import com.fasterxml.jackson.core.JsonProcessingException;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import marquez.db.BackfillTestUtils;
import marquez.db.JobDao;
import marquez.db.LineageTestUtils;
import marquez.db.NamespaceDao;
import marquez.db.OpenLineageDao;
import marquez.db.RunArgsDao;
import marquez.db.RunDao;
import marquez.db.models.NamespaceRow;
import marquez.jdbi.JdbiExternalPostgresExtension.FlywayTarget;
import marquez.jdbi.MarquezJdbiExternalPostgresExtension;
import marquez.service.models.Job;
import marquez.service.models.LineageEvent.JobFacet;
import org.flywaydb.core.api.configuration.Configuration;
import org.flywaydb.core.api.migration.Context;
import org.jdbi.v3.core.Jdbi;
Expand All @@ -35,6 +31,9 @@
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(MarquezJdbiExternalPostgresExtension.class)
// fix the flyway migration up to v44 since we depend on the database structure as it exists at this
// point in time. The migration will only ever be applied on a database at this version.
@FlywayTarget("44")
collado-mike marked this conversation as resolved.
Show resolved Hide resolved
class V44_2__BackfillAirflowParentRunsTest {

static Jdbi jdbi;
Expand Down Expand Up @@ -66,13 +65,7 @@ public void testMigrateAirflowTasks() throws SQLException, JsonProcessingExcepti
BackfillTestUtils.writeNewEvent(
jdbi, "airflowDag.task2", now, namespace, "schedule:00:00:00", task1Name);

createLineageRow(
openLineageDao,
"a_non_airflow_task",
BackfillTestUtils.COMPLETE,
new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP),
Collections.emptyList(),
Collections.emptyList());
BackfillTestUtils.writeNewEvent(jdbi, "a_non_airflow_task", now, namespace, null, null);

jdbi.useHandle(
handle -> {
Expand All @@ -94,7 +87,18 @@ public Connection getConnection() {
throw new AssertionError("Unable to execute migration", e);
}
});
Optional<Job> jobByName = jobDao.findJobByName(NAMESPACE, dagName);
assertThat(jobByName).isPresent();
Optional<String> jobNameResult =
jdbi.withHandle(
h ->
h.createQuery(
"""
SELECT name FROM jobs_view
WHERE namespace_name=:namespace AND simple_name=:jobName
""")
.bind("namespace", NAMESPACE)
.bind("jobName", dagName)
.mapTo(String.class)
.findFirst());
assertThat(jobNameResult).isPresent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,20 @@

import static marquez.db.BackfillTestUtils.writeNewEvent;
import static marquez.db.LineageTestUtils.NAMESPACE;
import static marquez.db.LineageTestUtils.createLineageRow;
import static org.assertj.core.api.Assertions.assertThat;

import com.fasterxml.jackson.core.JsonProcessingException;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import marquez.common.models.JobName;
import marquez.db.JobDao;
import marquez.db.LineageTestUtils;
import marquez.db.NamespaceDao;
import marquez.db.OpenLineageDao;
import marquez.db.models.ExtendedRunRow;
import marquez.db.models.NamespaceRow;
import marquez.db.models.UpdateLineageRow;
import marquez.jdbi.JdbiExternalPostgresExtension.FlywayTarget;
import marquez.jdbi.MarquezJdbiExternalPostgresExtension;
import marquez.service.models.Job;
import marquez.service.models.LineageEvent.JobFacet;
import org.flywaydb.core.api.configuration.Configuration;
import org.flywaydb.core.api.migration.Context;
import org.jdbi.v3.core.Jdbi;
Expand All @@ -35,6 +29,9 @@
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(MarquezJdbiExternalPostgresExtension.class)
// fix the flyway migration up to v44 since we depend on the database structure as it exists at this
// point in time. The migration will only ever be applied on a database at this version.
@FlywayTarget("44")
class V44_3_BackfillJobsWithParentsTest {

static Jdbi jdbi;
Expand All @@ -48,26 +45,16 @@ public static void setUpOnce(Jdbi jdbi) {

@Test
public void testBackfill() throws SQLException, JsonProcessingException {
String parentName = "parentJob";
UpdateLineageRow parentJob =
createLineageRow(
openLineageDao,
parentName,
"COMPLETE",
new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP),
Collections.emptyList(),
Collections.emptyList());

NamespaceDao namespaceDao = jdbi.onDemand(NamespaceDao.class);
Instant now = Instant.now();
NamespaceRow namespace =
namespaceDao.upsertNamespaceRow(UUID.randomUUID(), now, NAMESPACE, "me");
String parentName = "parentJob";
ExtendedRunRow parentRun = writeNewEvent(jdbi, parentName, now, namespace, null, null);

String task1Name = "task1";
writeNewEvent(
jdbi, task1Name, now, namespace, parentJob.getRun().getUuid().toString(), parentName);
writeNewEvent(
jdbi, "task2", now, namespace, parentJob.getRun().getUuid().toString(), parentName);
writeNewEvent(jdbi, task1Name, now, namespace, parentRun.getUuid().toString(), parentName);
writeNewEvent(jdbi, "task2", now, namespace, parentRun.getUuid().toString(), parentName);

jdbi.useHandle(
handle -> {
Expand All @@ -92,11 +79,13 @@ public Connection getConnection() {
}
});

JobDao jobDao = jdbi.onDemand(JobDao.class);
Optional<Job> jobByName = jobDao.findJobByName(NAMESPACE, task1Name);
assertThat(jobByName)
.isPresent()
.get()
.hasFieldOrPropertyWithValue("name", new JobName(parentName + "." + task1Name));
Optional<String> jobName =
jdbi.withHandle(
h ->
h.createQuery("SELECT name FROM jobs_view WHERE simple_name=:jobName")
.bind("jobName", task1Name)
.mapTo(String.class)
.findFirst());
assertThat(jobName).isPresent().get().isEqualTo(parentName + "." + task1Name);
}
}
20 changes: 17 additions & 3 deletions api/src/test/java/marquez/jdbi/JdbiExternalPostgresExtension.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@

package marquez.jdbi;

import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import javax.sql.DataSource;
import org.flywaydb.core.Flyway;
import org.flywaydb.core.api.configuration.FluentConfiguration;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.core.spi.JdbiPlugin;
Expand All @@ -24,6 +27,11 @@
public abstract class JdbiExternalPostgresExtension
implements BeforeAllCallback, AfterAllCallback, ParameterResolver {

@Retention(RetentionPolicy.RUNTIME)
public @interface FlywayTarget {
String value();
}

protected final List<JdbiPlugin> plugins = new ArrayList<>();
private final ReentrantLock lock = new ReentrantLock();
private volatile DataSource dataSource;
Expand Down Expand Up @@ -82,12 +90,18 @@ public Handle getHandle() {
@Override
public void beforeAll(ExtensionContext context) throws Exception {
if (migration != null) {
flyway =
FluentConfiguration flywayConfig =
Flyway.configure()
.dataSource(getDataSource())
.locations(migration.paths.toArray(new String[0]))
.schemas(migration.schemas.toArray(new String[0]))
.load();
.schemas(migration.schemas.toArray(new String[0]));

FlywayTarget target = context.getRequiredTestClass().getAnnotation(FlywayTarget.class);
if (target != null) {
flywayConfig.target(target.value());
}

flyway = flywayConfig.load();
flyway.migrate();
}

Expand Down