Skip to content

Commit

Permalink
Merge branch 'master' into Fivetran-connector-performance-optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhamjagtap639 authored May 21, 2024
2 parents fa07ace + 7f37c6f commit 7e25aed
Show file tree
Hide file tree
Showing 38 changed files with 2,126 additions and 375 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ private Constants() {}
public static final String PROPERTIES_SCHEMA_FILE = "properties.graphql";
public static final String FORMS_SCHEMA_FILE = "forms.graphql";
public static final String INCIDENTS_SCHEMA_FILE = "incident.graphql";
public static final String CONNECTIONS_SCHEMA_FILE = "connection.graphql";
public static final String BROWSE_PATH_DELIMITER = "/";
public static final String BROWSE_PATH_V2_DELIMITER = "␟";
public static final String VERSION_STAMP_FIELD_NAME = "versionStamp";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.linkedin.datahub.graphql.generated.DashboardStatsSummary;
import com.linkedin.datahub.graphql.generated.DashboardUserUsageCounts;
import com.linkedin.datahub.graphql.generated.DataFlow;
import com.linkedin.datahub.graphql.generated.DataHubConnection;
import com.linkedin.datahub.graphql.generated.DataHubView;
import com.linkedin.datahub.graphql.generated.DataJob;
import com.linkedin.datahub.graphql.generated.DataJobInputOutput;
Expand Down Expand Up @@ -129,6 +130,7 @@
import com.linkedin.datahub.graphql.resolvers.chart.BrowseV2Resolver;
import com.linkedin.datahub.graphql.resolvers.chart.ChartStatsSummaryResolver;
import com.linkedin.datahub.graphql.resolvers.config.AppConfigResolver;
import com.linkedin.datahub.graphql.resolvers.connection.UpsertConnectionResolver;
import com.linkedin.datahub.graphql.resolvers.container.ContainerEntitiesResolver;
import com.linkedin.datahub.graphql.resolvers.container.ParentContainersResolver;
import com.linkedin.datahub.graphql.resolvers.dashboard.DashboardStatsSummaryResolver;
Expand Down Expand Up @@ -306,6 +308,7 @@
import com.linkedin.datahub.graphql.types.chart.ChartType;
import com.linkedin.datahub.graphql.types.common.mappers.OperationMapper;
import com.linkedin.datahub.graphql.types.common.mappers.UrnToEntityMapper;
import com.linkedin.datahub.graphql.types.connection.DataHubConnectionType;
import com.linkedin.datahub.graphql.types.container.ContainerType;
import com.linkedin.datahub.graphql.types.corpgroup.CorpGroupType;
import com.linkedin.datahub.graphql.types.corpuser.CorpUserType;
Expand Down Expand Up @@ -355,6 +358,7 @@
import com.linkedin.metadata.config.ViewsConfiguration;
import com.linkedin.metadata.config.VisualConfiguration;
import com.linkedin.metadata.config.telemetry.TelemetryConfiguration;
import com.linkedin.metadata.connection.ConnectionService;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphClient;
import com.linkedin.metadata.graph.SiblingGraphService;
Expand Down Expand Up @@ -439,6 +443,7 @@ public class GmsGraphQLEngine {
private final ERModelRelationshipService erModelRelationshipService;
private final FormService formService;
private final RestrictedService restrictedService;
private ConnectionService connectionService;

private final BusinessAttributeService businessAttributeService;
private final FeatureFlags featureFlags;
Expand Down Expand Up @@ -472,6 +477,7 @@ public class GmsGraphQLEngine {
private final GlossaryTermType glossaryTermType;
private final GlossaryNodeType glossaryNodeType;
private final AspectType aspectType;
private final DataHubConnectionType connectionType;
private final ContainerType containerType;
private final DomainType domainType;
private final NotebookType notebookType;
Expand Down Expand Up @@ -558,6 +564,7 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
this.dataProductService = args.dataProductService;
this.formService = args.formService;
this.restrictedService = args.restrictedService;
this.connectionService = args.connectionService;

this.businessAttributeService = args.businessAttributeService;
this.ingestionConfiguration = Objects.requireNonNull(args.ingestionConfiguration);
Expand Down Expand Up @@ -588,6 +595,7 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
this.glossaryTermType = new GlossaryTermType(entityClient);
this.glossaryNodeType = new GlossaryNodeType(entityClient);
this.aspectType = new AspectType(entityClient);
this.connectionType = new DataHubConnectionType(entityClient, secretService);
this.containerType = new ContainerType(entityClient);
this.domainType = new DomainType(entityClient);
this.notebookType = new NotebookType(entityClient);
Expand Down Expand Up @@ -636,6 +644,7 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
dataJobType,
glossaryTermType,
glossaryNodeType,
connectionType,
containerType,
notebookType,
domainType,
Expand Down Expand Up @@ -753,6 +762,7 @@ public void configureRuntimeWiring(final RuntimeWiring.Builder builder) {
configureRoleResolvers(builder);
configureBusinessAttributeResolver(builder);
configureBusinessAttributeAssociationResolver(builder);
configureConnectionResolvers(builder);
}

private void configureOrganisationRoleResolvers(RuntimeWiring.Builder builder) {
Expand Down Expand Up @@ -803,6 +813,7 @@ public GraphQLEngine.Builder builder() {
.addSchema(fileBasedSchema(LINEAGE_SCHEMA_FILE))
.addSchema(fileBasedSchema(PROPERTIES_SCHEMA_FILE))
.addSchema(fileBasedSchema(FORMS_SCHEMA_FILE))
.addSchema(fileBasedSchema(CONNECTIONS_SCHEMA_FILE))
.addSchema(fileBasedSchema(INCIDENTS_SCHEMA_FILE));

for (GmsGraphQLPlugin plugin : this.graphQLPlugins) {
Expand Down Expand Up @@ -3015,4 +3026,29 @@ private void configureBusinessAttributeAssociationResolver(final RuntimeWiring.B
.getBusinessAttribute()
.getUrn())));
}

private void configureConnectionResolvers(final RuntimeWiring.Builder builder) {
builder.type(
"Mutation",
typeWiring ->
typeWiring.dataFetcher(
"upsertConnection",
new UpsertConnectionResolver(connectionService, secretService)));
builder.type(
"Query",
typeWiring -> typeWiring.dataFetcher("connection", getResolver(this.connectionType)));
builder.type(
"DataHubConnection",
typeWiring ->
typeWiring.dataFetcher(
"platform",
new LoadableTypeResolver<>(
this.dataPlatformType,
(env) -> {
final DataHubConnection connection = env.getSource();
return connection.getPlatform() != null
? connection.getPlatform().getUrn()
: null;
})));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.linkedin.metadata.config.ViewsConfiguration;
import com.linkedin.metadata.config.VisualConfiguration;
import com.linkedin.metadata.config.telemetry.TelemetryConfiguration;
import com.linkedin.metadata.connection.ConnectionService;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphClient;
import com.linkedin.metadata.graph.SiblingGraphService;
Expand Down Expand Up @@ -84,6 +85,7 @@ public class GmsGraphQLEngineArgs {
int graphQLQueryDepthLimit;
boolean graphQLQueryIntrospectionEnabled;
BusinessAttributeService businessAttributeService;
ConnectionService connectionService;

// any fork specific args should go below this line
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package com.linkedin.datahub.graphql.resolvers.connection;

import com.linkedin.common.DataPlatformInstance;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.DataMap;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.DataHubConnection;
import com.linkedin.datahub.graphql.generated.DataHubConnectionDetails;
import com.linkedin.datahub.graphql.generated.DataHubJsonConnection;
import com.linkedin.datahub.graphql.generated.DataPlatform;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.entity.EnvelopedAspectMap;
import com.linkedin.metadata.Constants;
import io.datahubproject.metadata.services.SecretService;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class ConnectionMapper {
/**
* Maps a GMS encrypted connection details object into the decrypted form returned by the GraphQL
* API.
*
* <p>Returns null if the Entity does not have the required aspects: dataHubConnectionDetails or
* dataPlatformInstance.
*/
@Nullable
public static DataHubConnection map(
@Nonnull final QueryContext context,
@Nonnull final EntityResponse entityResponse,
@Nonnull final SecretService secretService) {
// If the connection does not exist, simply return null
if (!hasAspects(entityResponse)) {
return null;
}

final DataHubConnection result = new DataHubConnection();
final Urn entityUrn = entityResponse.getUrn();
final EnvelopedAspectMap aspects = entityResponse.getAspects();

result.setUrn(entityUrn.toString());
result.setType(EntityType.DATAHUB_CONNECTION);

final EnvelopedAspect envelopedAssertionInfo =
aspects.get(Constants.DATAHUB_CONNECTION_DETAILS_ASPECT_NAME);
if (envelopedAssertionInfo != null) {
result.setDetails(
mapConnectionDetails(
context,
new com.linkedin.connection.DataHubConnectionDetails(
envelopedAssertionInfo.getValue().data()),
secretService));
}
final EnvelopedAspect envelopedPlatformInstance =
aspects.get(Constants.DATA_PLATFORM_INSTANCE_ASPECT_NAME);
if (envelopedPlatformInstance != null) {
final DataMap data = envelopedPlatformInstance.getValue().data();
result.setPlatform(mapPlatform(new DataPlatformInstance(data)));
}
return result;
}

private static DataHubConnectionDetails mapConnectionDetails(
@Nonnull final QueryContext context,
@Nonnull final com.linkedin.connection.DataHubConnectionDetails gmsDetails,
@Nonnull final SecretService secretService) {
final DataHubConnectionDetails result = new DataHubConnectionDetails();
result.setType(
com.linkedin.datahub.graphql.generated.DataHubConnectionDetailsType.valueOf(
gmsDetails.getType().toString()));
if (gmsDetails.hasJson() && ConnectionUtils.canManageConnections(context)) {
result.setJson(mapJsonConnectionDetails(gmsDetails.getJson(), secretService));
}
if (gmsDetails.hasName()) {
result.setName(gmsDetails.getName());
}
return result;
}

private static DataHubJsonConnection mapJsonConnectionDetails(
@Nonnull final com.linkedin.connection.DataHubJsonConnection gmsJsonConnection,
@Nonnull final SecretService secretService) {
final DataHubJsonConnection result = new DataHubJsonConnection();
// Decrypt the BLOB!
result.setBlob(secretService.decrypt(gmsJsonConnection.getEncryptedBlob()));
return result;
}

private static DataPlatform mapPlatform(final DataPlatformInstance platformInstance) {
// Set dummy platform to be resolved.
final DataPlatform partialPlatform = new DataPlatform();
partialPlatform.setUrn(platformInstance.getPlatform().toString());
return partialPlatform;
}

private static boolean hasAspects(@Nonnull final EntityResponse response) {
return response.hasAspects()
&& response.getAspects().containsKey(Constants.DATAHUB_CONNECTION_DETAILS_ASPECT_NAME)
&& response.getAspects().containsKey(Constants.DATA_PLATFORM_INSTANCE_ASPECT_NAME);
}

private ConnectionMapper() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.linkedin.datahub.graphql.resolvers.connection;

import com.datahub.authorization.AuthUtil;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.metadata.authorization.PoliciesConfig;
import javax.annotation.Nonnull;

/** Utilities for working with DataHub Connections. */
public class ConnectionUtils {

/**
* Returns true if the user is able to read and or write connection between DataHub and external
* platforms.
*/
public static boolean canManageConnections(@Nonnull QueryContext context) {
return AuthUtil.isAuthorized(
context.getAuthorizer(),
context.getActorUrn(),
PoliciesConfig.MANAGE_CONNECTIONS_PRIVILEGE);
}

private ConnectionUtils() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.linkedin.datahub.graphql.resolvers.connection;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;

import com.datahub.authentication.Authentication;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.connection.DataHubConnectionDetailsType;
import com.linkedin.connection.DataHubJsonConnection;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.generated.DataHubConnection;
import com.linkedin.datahub.graphql.generated.UpsertDataHubConnectionInput;
import com.linkedin.entity.EntityResponse;
import com.linkedin.metadata.connection.ConnectionService;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import io.datahubproject.metadata.services.SecretService;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class UpsertConnectionResolver implements DataFetcher<CompletableFuture<DataHubConnection>> {

private final ConnectionService _connectionService;
private final SecretService _secretService;

public UpsertConnectionResolver(
@Nonnull final ConnectionService connectionService,
@Nonnull final SecretService secretService) {
_connectionService =
Objects.requireNonNull(connectionService, "connectionService cannot be null");
_secretService = Objects.requireNonNull(secretService, "secretService cannot be null");
}

@Override
public CompletableFuture<DataHubConnection> get(final DataFetchingEnvironment environment)
throws Exception {

final QueryContext context = environment.getContext();
final UpsertDataHubConnectionInput input =
bindArgument(environment.getArgument("input"), UpsertDataHubConnectionInput.class);
final Authentication authentication = context.getAuthentication();

return CompletableFuture.supplyAsync(
() -> {
if (!ConnectionUtils.canManageConnections(context)) {
throw new AuthorizationException(
"Unauthorized to upsert Connection. Please contact your DataHub administrator for more information.");
}

try {
final Urn connectionUrn =
_connectionService.upsertConnection(
context.getOperationContext(),
input.getId(),
UrnUtils.getUrn(input.getPlatformUrn()),
DataHubConnectionDetailsType.valueOf(input.getType().toString()),
input.getJson() != null
// Encrypt payload
? new DataHubJsonConnection()
.setEncryptedBlob(_secretService.encrypt(input.getJson().getBlob()))
: null,
input.getName());

final EntityResponse connectionResponse =
_connectionService.getConnectionEntityResponse(
context.getOperationContext(), connectionUrn);
return ConnectionMapper.map(context, connectionResponse, _secretService);
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to upsert a Connection from input %s", input), e);
}
});
}
}
Loading

0 comments on commit 7e25aed

Please sign in to comment.