Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into BigQuery-Owner-Label-to-Da…
Browse files Browse the repository at this point in the history
…tahub-Ownership
  • Loading branch information
shubhamjagtap639 authored Apr 3, 2024
2 parents d0cec72 + 786c776 commit 16ecd49
Show file tree
Hide file tree
Showing 128 changed files with 2,539 additions and 887 deletions.
3 changes: 2 additions & 1 deletion .github/actions/ci-optimization/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ runs:
- "metadata-ingestion/**"
- "metadata-models/**"
- "smoke-test/**"
- "docker/datahub-ingestion-**"
- "docker/datahub-ingestion**"
docker:
- "docker/**"
backend:
- ".github/**"
- "metadata-models/**"
- "datahub-upgrade/**"
- "entity-registry/**"
Expand Down
2 changes: 1 addition & 1 deletion .github/actions/docker-custom-build-and-push/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ inputs:
required: false

images:
# e.g. linkedin/datahub-gms
# e.g. acryldata/datahub-gms
description: "List of Docker images to use as base name for tags"
required: true
build-args:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/airflow-plugin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ jobs:
- name: Install dependencies
run: ./metadata-ingestion/scripts/install_deps.sh
- name: Install airflow package and test (extras ${{ matrix.extra_pip_requirements }})
run: ./gradlew -Pextra_pip_requirements='${{ matrix.extra_pip_requirements }}' -Pextra_pip_extras='${{ matrix.extra_pip_extras }}' :metadata-ingestion-modules:airflow-plugin:lint :metadata-ingestion-modules:airflow-plugin:testQuick
run: ./gradlew -Pextra_pip_requirements='${{ matrix.extra_pip_requirements }}' -Pextra_pip_extras='${{ matrix.extra_pip_extras }}' :metadata-ingestion-modules:airflow-plugin:build
- name: pip freeze show list installed
if: always()
run: source metadata-ingestion-modules/airflow-plugin/venv/bin/activate && pip freeze
Expand Down
38 changes: 19 additions & 19 deletions .github/workflows/docker-unified.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ concurrency:
cancel-in-progress: true

env:
DATAHUB_GMS_IMAGE: "linkedin/datahub-gms"
DATAHUB_FRONTEND_IMAGE: "linkedin/datahub-frontend-react"
DATAHUB_MAE_CONSUMER_IMAGE: "linkedin/datahub-mae-consumer"
DATAHUB_MCE_CONSUMER_IMAGE: "linkedin/datahub-mce-consumer"
DATAHUB_KAFKA_SETUP_IMAGE: "linkedin/datahub-kafka-setup"
DATAHUB_ELASTIC_SETUP_IMAGE: "linkedin/datahub-elasticsearch-setup"
DATAHUB_GMS_IMAGE: "acryldata/datahub-gms"
DATAHUB_FRONTEND_IMAGE: "acryldata/datahub-frontend-react"
DATAHUB_MAE_CONSUMER_IMAGE: "acryldata/datahub-mae-consumer"
DATAHUB_MCE_CONSUMER_IMAGE: "acryldata/datahub-mce-consumer"
DATAHUB_KAFKA_SETUP_IMAGE: "acryldata/datahub-kafka-setup"
DATAHUB_ELASTIC_SETUP_IMAGE: "acryldata/datahub-elasticsearch-setup"
DATAHUB_MYSQL_SETUP_IMAGE: "acryldata/datahub-mysql-setup"
DATAHUB_UPGRADE_IMAGE: "acryldata/datahub-upgrade"
DATAHUB_INGESTION_BASE_IMAGE: "acryldata/datahub-ingestion-base"
Expand Down Expand Up @@ -73,7 +73,7 @@ jobs:
- name: Check whether publishing enabled
id: publish
env:
ENABLE_PUBLISH: ${{ secrets.DOCKER_PASSWORD != '' && secrets.ACRYL_DOCKER_PASSWORD != '' }}
ENABLE_PUBLISH: ${{ secrets.ACRYL_DOCKER_PASSWORD != '' }}
run: |
echo "Enable publish: ${{ env.ENABLE_PUBLISH }}"
echo "publish=${{ env.ENABLE_PUBLISH }}" >> $GITHUB_OUTPUT
Expand Down Expand Up @@ -127,8 +127,8 @@ jobs:
images: |
${{ env.DATAHUB_GMS_IMAGE }}
tags: ${{ needs.setup.outputs.tag }}
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
username: ${{ secrets.ACRYL_DOCKER_USERNAME }}
password: ${{ secrets.ACRYL_DOCKER_PASSWORD }}
publish: ${{ needs.setup.outputs.publish }}
context: .
file: ./docker/datahub-gms/Dockerfile
Expand Down Expand Up @@ -191,8 +191,8 @@ jobs:
images: |
${{ env.DATAHUB_MAE_CONSUMER_IMAGE }}
tags: ${{ needs.setup.outputs.tag }}
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
username: ${{ secrets.ACRYL_DOCKER_USERNAME }}
password: ${{ secrets.ACRYL_DOCKER_PASSWORD }}
publish: ${{ needs.setup.outputs.publish }}
context: .
file: ./docker/datahub-mae-consumer/Dockerfile
Expand Down Expand Up @@ -255,8 +255,8 @@ jobs:
images: |
${{ env.DATAHUB_MCE_CONSUMER_IMAGE }}
tags: ${{ needs.setup.outputs.tag }}
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
username: ${{ secrets.ACRYL_DOCKER_USERNAME }}
password: ${{ secrets.ACRYL_DOCKER_PASSWORD }}
publish: ${{ needs.setup.outputs.publish }}
context: .
file: ./docker/datahub-mce-consumer/Dockerfile
Expand Down Expand Up @@ -385,8 +385,8 @@ jobs:
images: |
${{ env.DATAHUB_FRONTEND_IMAGE }}
tags: ${{ needs.setup.outputs.tag }}
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
username: ${{ secrets.ACRYL_DOCKER_USERNAME }}
password: ${{ secrets.ACRYL_DOCKER_PASSWORD }}
publish: ${{ needs.setup.outputs.publish }}
context: .
file: ./docker/datahub-frontend/Dockerfile
Expand Down Expand Up @@ -439,8 +439,8 @@ jobs:
images: |
${{ env.DATAHUB_KAFKA_SETUP_IMAGE }}
tags: ${{ needs.setup.outputs.tag }}
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
username: ${{ secrets.ACRYL_DOCKER_USERNAME }}
password: ${{ secrets.ACRYL_DOCKER_PASSWORD }}
publish: ${{ needs.setup.outputs.publish }}
context: .
file: ./docker/kafka-setup/Dockerfile
Expand Down Expand Up @@ -481,8 +481,8 @@ jobs:
images: |
${{ env.DATAHUB_ELASTIC_SETUP_IMAGE }}
tags: ${{ needs.setup.outputs.tag }}
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
username: ${{ secrets.ACRYL_DOCKER_USERNAME }}
password: ${{ secrets.ACRYL_DOCKER_PASSWORD }}
publish: ${{ needs.setup.outputs.publish }}
context: .
file: ./docker/elasticsearch-setup/Dockerfile
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ HOSTED_DOCS_ONLY-->
[![Version](https://img.shields.io/github/v/release/datahub-project/datahub?include_prereleases)](https://github.com/datahub-project/datahub/releases/latest)
[![PyPI version](https://badge.fury.io/py/acryl-datahub.svg)](https://badge.fury.io/py/acryl-datahub)
[![build & test](https://github.com/datahub-project/datahub/workflows/build%20&%20test/badge.svg?branch=master&event=push)](https://github.com/datahub-project/datahub/actions?query=workflow%3A%22build+%26+test%22+branch%3Amaster+event%3Apush)
[![Docker Pulls](https://img.shields.io/docker/pulls/linkedin/datahub-gms.svg)](https://hub.docker.com/r/linkedin/datahub-gms)
[![Docker Pulls](https://img.shields.io/docker/pulls/acryldata/datahub-gms.svg)](https://hub.docker.com/r/acryldata/datahub-gms)
[![Slack](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](https://slack.datahubproject.io)
[![PRs Welcome](https://img.shields.io/badge/PRs-welcome-brightgreen.svg)](https://github.com/datahub-project/datahub/blob/master/docs/CONTRIBUTING.md)
[![GitHub commit activity](https://img.shields.io/github/commit-activity/m/datahub-project/datahub)](https://github.com/datahub-project/datahub/pulls?q=is%3Apr)
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ buildscript {
ext.openLineageVersion = '1.5.0'
ext.logbackClassicJava8 = '1.2.12'

ext.docker_registry = 'linkedin'
ext.docker_registry = 'acryldata'

apply from: './repositories.gradle'
buildscript.repositories.addAll(project.repositories)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.linkedin.datahub.graphql.resolvers.load;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;
import static com.linkedin.datahub.graphql.types.mappers.MapperUtils.*;

import com.datahub.authorization.AuthorizationConfiguration;
import com.linkedin.common.UrnArrayArray;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.SetMode;
Expand Down Expand Up @@ -156,6 +158,11 @@ private LineageRelationship mapEntityRelationship(
result.setUpdatedActor(UrnToEntityMapper.map(context, updatedActor));
}
result.setIsManual(lineageRelationship.hasIsManual() && lineageRelationship.isIsManual());
if (lineageRelationship.getPaths() != null) {
UrnArrayArray paths = lineageRelationship.getPaths();
result.setPaths(
paths.stream().map(path -> mapPath(context, path)).collect(Collectors.toList()));
}

return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import static com.linkedin.datahub.graphql.util.SearchInsightsUtil.*;
import static com.linkedin.metadata.utils.SearchUtil.*;

import com.linkedin.common.UrnArray;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.AggregationMetadata;
import com.linkedin.datahub.graphql.generated.EntityPath;
import com.linkedin.datahub.graphql.generated.FacetMetadata;
import com.linkedin.datahub.graphql.generated.MatchedField;
import com.linkedin.datahub.graphql.generated.SearchResult;
Expand Down Expand Up @@ -104,4 +106,11 @@ public static SearchSuggestion mapSearchSuggestion(
return new SearchSuggestion(
suggestion.getText(), suggestion.getScore(), Math.toIntExact(suggestion.getFrequency()));
}

public static EntityPath mapPath(@Nullable final QueryContext context, UrnArray path) {
EntityPath entityPath = new EntityPath();
entityPath.setPath(
path.stream().map(p -> UrnToEntityMapper.map(context, p)).collect(Collectors.toList()));
return entityPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
import static com.linkedin.datahub.graphql.types.mappers.MapperUtils.*;
import static com.linkedin.datahub.graphql.util.SearchInsightsUtil.*;

import com.linkedin.common.UrnArray;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.datahub.graphql.generated.EntityPath;
import com.linkedin.datahub.graphql.generated.FreshnessStats;
import com.linkedin.datahub.graphql.generated.SearchAcrossLineageResult;
import com.linkedin.datahub.graphql.generated.SearchAcrossLineageResults;
Expand All @@ -16,6 +14,7 @@
import com.linkedin.metadata.search.LineageSearchEntity;
import com.linkedin.metadata.search.LineageSearchResult;
import com.linkedin.metadata.search.SearchResultMetadata;
import java.util.ArrayList;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -69,14 +68,9 @@ private SearchAcrossLineageResult mapResult(
.map(p -> mapPath(context, p))
.collect(Collectors.toList()))
.setDegree(searchEntity.getDegree())
.setDegrees(searchEntity.getDegrees().stream().collect(Collectors.toList()))
.setDegrees(new ArrayList<>(searchEntity.getDegrees()))
.setExplored(Boolean.TRUE.equals(searchEntity.isExplored()))
.setIgnoredAsHop(Boolean.TRUE.equals(searchEntity.isIgnoredAsHop()))
.build();
}

private EntityPath mapPath(@Nullable final QueryContext context, UrnArray path) {
EntityPath entityPath = new EntityPath();
entityPath.setPath(
path.stream().map(p -> UrnToEntityMapper.map(context, p)).collect(Collectors.toList()));
return entityPath;
}
}
4 changes: 4 additions & 0 deletions datahub-graphql-core/src/main/resources/entity.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -1331,6 +1331,10 @@ type LineageRelationship {
"""
isManual: Boolean

"""
The paths traversed for this relationship
"""
paths: [EntityPath]
}

"""
Expand Down
14 changes: 12 additions & 2 deletions datahub-graphql-core/src/main/resources/search.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ type ScrollResults {
}

"""
Results returned by issueing a search across relationships query
Results returned by issuing a search across relationships query
"""
type SearchAcrossLineageResults {
"""
Expand Down Expand Up @@ -679,7 +679,7 @@ type SearchAcrossLineageResults {
}

"""
Results returned by issueing a search across relationships query using scroll API
Results returned by issuing a search across relationships query using scroll API
"""
type ScrollAcrossLineageResults {
"""
Expand Down Expand Up @@ -742,6 +742,16 @@ type SearchAcrossLineageResult {
"""
degrees: [Int!]

"""
Marks whether or not this entity was explored further for lineage
"""
explored: Boolean!

"""
Whether this relationship was ignored as a hop
"""
ignoredAsHop: Boolean!

}

"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public KafkaJob(UpgradeContext context, RestoreIndicesArgs args) {

@Override
public RestoreIndicesResult call() {
return _entityService.restoreIndices(args, context.report()::addLine);
return _entityService.streamRestoreIndices(args, context.report()::addLine).findFirst().get();
}
}

Expand Down Expand Up @@ -85,7 +85,10 @@ private List<RestoreIndicesResult> iterateFutures(List<Future<RestoreIndicesResu
private RestoreIndicesArgs getArgs(UpgradeContext context) {
RestoreIndicesArgs result = new RestoreIndicesArgs();
result.batchSize = getBatchSize(context.parsedArgs());
// this class assumes batch size == limit
result.limit = getBatchSize(context.parsedArgs());
context.report().addLine(String.format("batchSize is %d", result.batchSize));
context.report().addLine(String.format("limit is %d", result.limit));
result.numThreads = getThreadCount(context.parsedArgs());
context.report().addLine(String.format("numThreads is %d", result.numThreads));
result.batchDelayMs = getBatchDelayMs(context.parsedArgs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesResult;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ReindexDataJobViaNodesCLLStep implements UpgradeStep {

private static final String UPGRADE_ID = "via-node-cll-reindex-datajob";
public static final String UPGRADE_ID = "via-node-cll-reindex-datajob-v2";
private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID);

private final EntityService<?> entityService;
Expand All @@ -33,13 +32,17 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
RestoreIndicesArgs args =
new RestoreIndicesArgs()
.setAspectName(DATA_JOB_INPUT_OUTPUT_ASPECT_NAME)
.setUrnLike("urn:li:" + DATA_JOB_ENTITY_NAME + ":%")
.setBatchSize(batchSize);
RestoreIndicesResult result =
entityService.restoreIndices(args, x -> context.report().addLine((String) x));
context.report().addLine("Rows migrated: " + result.rowsMigrated);
context.report().addLine("Rows ignored: " + result.ignored);
.aspectName(DATA_JOB_INPUT_OUTPUT_ASPECT_NAME)
.urnLike("urn:li:" + DATA_JOB_ENTITY_NAME + ":%")
.batchSize(batchSize);

entityService
.streamRestoreIndices(args, x -> context.report().addLine((String) x))
.forEach(
result -> {
context.report().addLine("Rows migrated: " + result.rowsMigrated);
context.report().addLine("Rows ignored: " + result.ignored);
});

BootstrapStep.setUpgradeResult(UPGRADE_ID_URN, entityService);
context.report().addLine("State updated: " + UPGRADE_ID_URN);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.linkedin.datahub.upgrade;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.AssertJUnit.assertNotNull;

import com.linkedin.datahub.upgrade.impl.DefaultUpgradeManager;
import com.linkedin.datahub.upgrade.system.SystemUpdateNonBlocking;
import com.linkedin.datahub.upgrade.system.vianodes.ReindexDataJobViaNodesCLL;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import java.util.List;
import javax.inject.Named;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
import org.testng.annotations.Test;

@ActiveProfiles("test")
@SpringBootTest(
classes = {UpgradeCliApplication.class, UpgradeCliApplicationTestConfiguration.class},
properties = {
"BOOTSTRAP_SYSTEM_UPDATE_DATA_JOB_NODE_CLL_ENABLED=true",
"kafka.schemaRegistry.type=INTERNAL",
"DATAHUB_UPGRADE_HISTORY_TOPIC_NAME=test_due_topic",
"METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME=test_mcl_versioned_topic"
},
args = {"-u", "SystemUpdateNonBlocking"})
public class DatahubUpgradeNonBlockingTest extends AbstractTestNGSpringContextTests {

@Autowired(required = false)
@Named("systemUpdateNonBlocking")
private SystemUpdateNonBlocking systemUpdateNonBlocking;

@Autowired
@Test
public void testSystemUpdateNonBlockingInit() {
assertNotNull(systemUpdateNonBlocking);
}

@Test
public void testReindexDataJobViaNodesCLLPaging() {
EntityService<?> mockService = mock(EntityService.class);
ReindexDataJobViaNodesCLL cllUpgrade = new ReindexDataJobViaNodesCLL(mockService, true, 10);
SystemUpdateNonBlocking upgrade =
new SystemUpdateNonBlocking(List.of(), List.of(cllUpgrade), null);
DefaultUpgradeManager manager = new DefaultUpgradeManager();
manager.register(upgrade);
manager.execute("SystemUpdateNonBlocking", List.of());
verify(mockService, times(1))
.streamRestoreIndices(
eq(
new RestoreIndicesArgs()
.batchSize(10)
.limit(0)
.aspectName("dataJobInputOutput")
.urnLike("urn:li:dataJob:%")),
any());
}
}
Loading

0 comments on commit 16ecd49

Please sign in to comment.