-
Notifications
You must be signed in to change notification settings - Fork 319
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add OL facet tables #2152
Add OL facet tables #2152
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to be huge improvement in terms of performance.
It will also help developing some features like getting raw events by namespace which @mobuchowski coped with some time ago (#2070)
My concerns are:
- If my understanding is correct, current behavior leads users to losing dataset history (facets written the old way won't be read the new way). If this acceptable, at least we should put some doc notes. Otherwise, providing some migration (even the optional migration) would be great.
- We're storing the same facets twice now: in
lineage_events
and new facets' tables. Is this an expected behavior? - Wouldn't we like to have a test that verifies after posting lineage event that introduced tables are filled? I couldn't spot anything similar in the PR.
- Flyway DB migration version files numeration need to be adjusted (v46 seem to be outdated)
+ " WHERE run_uuid = dv.run_uuid\n" | ||
+ " ) e ON e.run_uuid = dv.run_uuid\n" | ||
+ " ) df ON df.run_uuid = dv.run_uuid\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems to be missing dataset_uuid = dv.dataset_uuid
bea5995
to
e789723
Compare
Codecov Report
@@ Coverage Diff @@
## main #2152 +/- ##
============================================
+ Coverage 76.72% 77.14% +0.41%
- Complexity 1177 1237 +60
============================================
Files 222 227 +5
Lines 5354 5727 +373
Branches 429 465 +36
============================================
+ Hits 4108 4418 +310
- Misses 768 801 +33
- Partials 478 508 +30
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
2e850f8
to
f1b5ffc
Compare
70471fb
to
660f093
Compare
1b0ccd9
to
3830c32
Compare
|
private static final String GET_CURRENT_LOCK_SQL = | ||
""" | ||
SELECT * FROM v55_facet_migration_lock ORDER BY created_at ASC, run_uuid ASC LIMIT 1 | ||
"""; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think oneliners need not to use text block syntax, especially if not well-formatted 😉
+ " SELECT le.run_uuid, JSON_AGG(event->'run'->'facets') AS facets\n" | ||
+ " FROM lineage_events le\n" | ||
+ " GROUP BY le.run_uuid\n" | ||
+ " SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's risky to use this data in the same version that we're migrating in, since we have manual migration process for some.
This release should IMO still use lineage_events
table, and given reasonable time to migrate to newer version, we should use new tables to optimize queries.
Unless we have a well documented a way to run migration without impacting "running deployment", so that we don't have to rollback to previous Marquez version if something fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mobuchowski for the pointing this out (which, I agree with).
@pawel-big-lebowski I know this PR / optimization has been dragging (mostly my fault, so happy to take the blame), but splitting the changes into separate PRs / releases would reduce any challenges with rollbacks, but also provide users the option to decide when to apply the migration / upgrade. So, that said, what I propose is:
- Open a PR to just introduce the dataset, job, and run facets tables (a write only mode), meaning the facet tables won't be referenced in anyway in our queries, but will be written to so that features can take advantage of the optimization; lineage events will continue to be written to the
lineage_events
table as well. This can be released in0.30.0
. - As a follow up PR, we can update all references to
lineage_events
in our queries to use the individual facet tables. Within this PR, the migration script (and documentation) to backfill facets from thelineage_events
table will also be introduced (and be required to run for users with largerlineage_events
tables). This can be released in0.31.0
or later.
PR #2152 (current) can be step 2 and marked ias blocked, and I can cherry pick any changes into a separate PR for step 1 (also open to other options). Anyways, thanks for the great work here @pawel-big-lebowski. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be difficult to split the change into separate PRs. The initial PR was missing tests. I added them within my commits and it detected some issues with previous commits like:
runFacetExists
method inapi/src/main/java/marquez/db/RunFacetsDao.java
fromName
method inapi/src/main/java/marquez/db/DatasetFacetsDao.java
(see my latest commit)
So merging some of the commits shouldn't be a solution. Additionally, one cannot take benefit from facet tables unless they are filled with data. Otherwise, users will experience incomplete results.
This PR does not clean or remove data from lineage_events
table so we always have the ability to prepare a fix that restores previous mechanics if we want to.
@wslulciuc Although splitting this PR into multiple PRs could improve readability, I don't see any advantage of splitting this into separate releases (0.30.0
and 0.31.0
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The combination of a manual migration process and the fact that the code stops reading from the lineage_events
table introduces a real problem for users. When a user deploys, the tables will be created and all new records will be written into the new facet tables. However, all old data still lives only in the lineage_events
table and won't be accessible until after the user executes the migration command. This means that the service will effectively be broken until users choose to execute this migration.
I think splitting the PR so that writes and reads are committed and can be deployed separately makes sense. This gives users the opportunity to push multiple deployments in a way that guarantees no downtime. It also makes this PR muuuch easier to review - readability is not just good for its own sake; it makes it easier to catch issues before they go to production. The migration script itself warrants its own review, I think.
c2c16f5
to
1748433
Compare
Signed-off-by: wslulciuc <[email protected]>
Signed-off-by: wslulciuc <[email protected]>
Signed-off-by: wslulciuc <[email protected]>
Signed-off-by: wslulciuc <[email protected]>
Signed-off-by: wslulciuc <[email protected]>
Signed-off-by: wslulciuc <[email protected]>
Signed-off-by: wslulciuc <[email protected]>
Signed-off-by: wslulciuc <[email protected]>
Signed-off-by: wslulciuc <[email protected]>
Signed-off-by: wslulciuc <[email protected]>
Signed-off-by: wslulciuc <[email protected]>
Signed-off-by: wslulciuc <[email protected]>
Signed-off-by: Pawel Leszczynski <[email protected]>
76e62b0
to
0322664
Compare
Signed-off-by: Pawel Leszczynski <[email protected]>
0322664
to
ad0fa19
Compare
Database migration in a nutshell:
|
I updated |
Optional.ofNullable(datasetFacets.getDocumentation()) | ||
.ifPresent( | ||
documentation -> | ||
insertDatasetFacet( | ||
UUID.randomUUID(), | ||
now, | ||
datasetUuid, | ||
runUuid, | ||
lineageEventTime, | ||
lineageEventType, | ||
Facet.DOCUMENTATION.getType(), | ||
Facet.DOCUMENTATION.getName(), | ||
toPgObject(Facet.DOCUMENTATION.asJson(documentation)))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can replace the copy/paste of this block with something like
writeDatasetFacet(Facet.DOCUMENTATION, datasetFacets.getDocumentation());
where writeDatasetFacet is defined as
void writeDatasetFacet(Facet facet, Object nullableData) {
Optional.ofNullable(nullableData)
.ifPresent(
data ->
insertDatasetFacet(
UUID.randomUUID(),
now,
datasetUuid,
runUuid,
lineageEventTime,
lineageEventType,
facet.getType(),
facet.getName(),
toPgObject(facet.asJson(data))));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional.ofNullable(jobFacet.getDocumentation()) | ||
.ifPresent( | ||
documentation -> | ||
insertJobFacet( | ||
UUID.randomUUID(), | ||
now, | ||
jobUuid, | ||
runUuid, | ||
lineageEventTime, | ||
lineageEventType, | ||
DatasetFacetsDao.Facet.DOCUMENTATION.getName(), | ||
toPgObject(DatasetFacetsDao.Facet.DOCUMENTATION.asJson(documentation)))); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An interface for the various facet enums will help you avoid all this duplication
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, it's a bit ugly. Do you mean defining an interface for each enum to implement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+ " SELECT le.run_uuid, JSON_AGG(event->'run'->'facets') AS facets\n" | ||
+ " FROM lineage_events le\n" | ||
+ " GROUP BY le.run_uuid\n" | ||
+ " SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The combination of a manual migration process and the fact that the code stops reading from the lineage_events
table introduces a real problem for users. When a user deploys, the tables will be created and all new records will be written into the new facet tables. However, all old data still lives only in the lineage_events
table and won't be accessible until after the user executes the migration command. This means that the service will effectively be broken until users choose to execute this migration.
I think splitting the PR so that writes and reads are committed and can be deployed separately makes sense. This gives users the opportunity to push multiple deployments in a way that guarantees no downtime. It also makes this PR muuuch easier to review - readability is not just good for its own sake; it makes it easier to catch issues before they go to production. The migration script itself warrants its own review, I think.
Replying to @collado-mike
I think there may be some kind of misunderstanding here. For the users with more than 100K lineage_events:
No, it's not and that's why
I agree that splitting PR into multiple PRs makes sense:
Releasing PR I, PR II and PR III in one go does not imply any downtime. |
* Split `lineage_events` table to `dataset_facets`, `run_facets`, and `job_facets` tables. [`2152`](https://github.com/MarquezProject/marquez/pull/2152) | ||
[@wslulciuc](https://github.com/wslulciuc,), [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) | ||
* Performance improvement with migration procedure that requires manual steps if database has more than 100K lineage events. | ||
* Please read [here](https://github.com/MarquezProject/marquez/blob/main/api/src/main/resources/marquez/db/migration/V55__readme.md) to get more database migration details. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: I would reword as follows:
Note: We highly encourage users to review our migration plan.
@@ -149,6 +150,12 @@ public void registerResources( | |||
} | |||
} | |||
|
|||
@Override | |||
protected void addDefaultCommands(Bootstrap<MarquezConfig> bootstrap) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid overriding addDefaultCommands()
, we can just register the command within MarquezApp.initialize()
:
@Override
public void initialize(@NonNull Bootstrap<MarquezConfig> bootstrap) {
bootstrap.addCommand(new V55MigrationCommand());
// continue initialization ...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The existing commands are kind of no-arg commands. Migration commands needs to be able to access database connection. That's why I included it here so that it can get Application
as a constructor param. Within initialize
commands are added although application is not created yet.
* <p>Please refer to @link marquez/db/migration/V55__readme.md for more details. | ||
*/ | ||
@Slf4j | ||
public class V55MigrationCommand<MarquezConfig> extends EnvironmentCommand<marquez.MarquezConfig> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is V55MigrationCommand
too specific (but also vague)? We'll certainly have more migrations in the future and one-off commands for each will be hard to manage and not the best user experience. I suggest we name the class DbMigrationsCommand
and define v55_migrate
as a subcommand. This tutorial is a pretty good reference on how to define subcommands.
Optional.ofNullable(jobFacet.getDocumentation()) | ||
.ifPresent( | ||
documentation -> | ||
insertJobFacet( | ||
UUID.randomUUID(), | ||
now, | ||
jobUuid, | ||
runUuid, | ||
lineageEventTime, | ||
lineageEventType, | ||
DatasetFacetsDao.Facet.DOCUMENTATION.getName(), | ||
toPgObject(DatasetFacetsDao.Facet.DOCUMENTATION.asJson(documentation)))); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, it's a bit ugly. Do you mean defining an interface for each enum to implement?
|
||
@Setter private Integer chunkSize = null; | ||
|
||
@Setter private boolean triggeredByCommand = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: I'd use the flag name manual
.
execute(CREATE_DATASET_FACETS_VIEW); | ||
execute(CREATE_JOB_FACETS_VIEW); | ||
|
||
if (!triggeredByCommand && countLineageEvents() >= BASIC_MIGRATION_LIMIT) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's great that we inform the user on how to handle very large tables 💯
* Workaround to register uuid_generate_v4 function to generate uuids. gen_random_uuid() is | ||
* available since Postgres 13 | ||
*/ | ||
jdbi.withHandle(h -> h.createCall("CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\"").invoke()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be possible that the migration might not be applied by a superuser? Can we the uuid using java.util.UUID
, then inject the value in the inserts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a hard one. So, we don't want Java to touch the data. It's good for performance when everything is done in SQLs and no data transfers between backend DB and Java application are required.
Adding this extension is a workaround for PostgreSQL 12, that we still support. If we stopped supporting PostgreSQL 12, we could use built-in uuid function (https://www.postgresql.org/docs/current/functions-uuid.html) available since version 13.
Otherwise we can add extra condition here to include extra extension only for PostgreSQL 12. Anyway, I think PostgreSQL 12 users will need a superuser.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, that's reasonable. Mind making a note in the migration doc on the superuser requirement? Also, yeah, we should upgrade our version requirements for PostgreSQL. I wanted to first have in place schema validation in CI. PR #2326 gets us one step closer to upgrading.
| 10K events | 50K rows | 50K rows | 150K rows | 10sec | | ||
| 100K events | 500K rows | 500K rows | 150K rows | 106sec | | ||
| 500K events | 2.5M rows | 2.5M rows | 7.5M rows | 612sec | | ||
| 1M events | 5M rows | 5M rows | 15M rows | 1473sec | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this detailed and well throughout migration plan @pawel-big-lebowski 💯
In step 1 that I outlined above, Marquez would continue to write to the @pawel-big-lebowski has made some great additions and has helped push this PR to it's current state (and close to being merged!). Based on your feedback @collado-mike and @pawel-big-lebowski's input, there are enough safe guards to ensure the user has control of how the migration is applied (as outlined in |
The content of this PR has been split to:
Review comments provided in this PR have been applied there. |
Problem
To access OL facets, we have to run direct queries against OL events table, which has significant performance concerns.
Closes: #2076
Solution
Add tables for dataset, job, and run facets.
Checklist
CHANGELOG.md
with details about your change under the "Unreleased" section (if relevant, depending on the change, this may not be necessary).sql
database schema migration according to Flyway's naming convention (if relevant)