Skip to content

Commit

Permalink
feat(ingestion) Add new endpoint to test an ingestion connection (dat…
Browse files Browse the repository at this point in the history
  • Loading branch information
chriscollins3456 authored and maggiehays committed Aug 1, 2022
1 parent e8c7f89 commit 1054a4a
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import com.linkedin.datahub.graphql.resolvers.group.RemoveGroupResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.execution.CancelIngestionExecutionRequestResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.execution.CreateIngestionExecutionRequestResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.execution.CreateTestConnectionRequestResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.execution.GetIngestionExecutionRequestResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.execution.IngestionSourceExecutionRequestsResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.secret.CreateSecretResolver;
Expand Down Expand Up @@ -719,6 +720,7 @@ private void configureMutationResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher("deleteIngestionSource", new DeleteIngestionSourceResolver(this.entityClient))
.dataFetcher("createIngestionExecutionRequest", new CreateIngestionExecutionRequestResolver(this.entityClient, this.ingestionConfiguration))
.dataFetcher("cancelIngestionExecutionRequest", new CancelIngestionExecutionRequestResolver(this.entityClient))
.dataFetcher("createTestConnectionRequest", new CreateTestConnectionRequestResolver(this.entityClient, this.ingestionConfiguration))
.dataFetcher("deleteAssertion", new DeleteAssertionResolver(this.entityClient, this.entityService))
.dataFetcher("createTest", new CreateTestResolver(this.entityClient))
.dataFetcher("updateTest", new UpdateTestResolver(this.entityClient))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.linkedin.datahub.graphql.resolvers.ingest.execution;

import com.linkedin.data.template.StringMap;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.generated.CreateTestConnectionRequestInput;
import com.linkedin.datahub.graphql.resolvers.ingest.IngestionAuthUtils;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.execution.ExecutionRequestInput;
import com.linkedin.execution.ExecutionRequestSource;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.config.IngestionConfiguration;
import com.linkedin.metadata.key.ExecutionRequestKey;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;


/**
* Creates an on-demand ingestion execution request.
*/
public class CreateTestConnectionRequestResolver implements DataFetcher<CompletableFuture<String>> {

private static final String TEST_CONNECTION_TASK_NAME = "TEST_CONNECTION";
private static final String TEST_CONNECTION_SOURCE_NAME = "MANUAL_TEST_CONNECTION";
private static final String RECIPE_ARG_NAME = "recipe";
private static final String VERSION_ARG_NAME = "version";
private static final String DEFAULT_EXECUTOR_ID = "default";

private final EntityClient _entityClient;
private final IngestionConfiguration _ingestionConfiguration;

public CreateTestConnectionRequestResolver(final EntityClient entityClient, final IngestionConfiguration ingestionConfiguration) {
_entityClient = entityClient;
_ingestionConfiguration = ingestionConfiguration;
}

@Override
public CompletableFuture<String> get(final DataFetchingEnvironment environment) throws Exception {
final QueryContext context = environment.getContext();

return CompletableFuture.supplyAsync(() -> {

if (!IngestionAuthUtils.canManageIngestion(context)) {
throw new AuthorizationException("Unauthorized to perform this action. Please contact your DataHub administrator.");
}

final CreateTestConnectionRequestInput input =
bindArgument(environment.getArgument("input"), CreateTestConnectionRequestInput.class);

try {

final MetadataChangeProposal proposal = new MetadataChangeProposal();
final ExecutionRequestKey key = new ExecutionRequestKey();
final UUID uuid = UUID.randomUUID();
final String uuidStr = uuid.toString();
key.setId(uuidStr);
proposal.setEntityKeyAspect(GenericRecordUtils.serializeAspect(key));

final ExecutionRequestInput execInput = new ExecutionRequestInput();
execInput.setTask(TEST_CONNECTION_TASK_NAME);
execInput.setSource(new ExecutionRequestSource().setType(TEST_CONNECTION_SOURCE_NAME));
execInput.setExecutorId(DEFAULT_EXECUTOR_ID);
execInput.setRequestedAt(System.currentTimeMillis());

Map<String, String> arguments = new HashMap<>();
arguments.put(RECIPE_ARG_NAME, input.getRecipe());
arguments.put(VERSION_ARG_NAME, _ingestionConfiguration.getDefaultCliVersion());
execInput.setArgs(new StringMap(arguments));

proposal.setEntityType(Constants.EXECUTION_REQUEST_ENTITY_NAME);
proposal.setAspectName(Constants.EXECUTION_REQUEST_INPUT_ASPECT_NAME);
proposal.setAspect(GenericRecordUtils.serializeAspect(execInput));
proposal.setChangeType(ChangeType.UPSERT);

return _entityClient.ingestProposal(proposal, context.getAuthentication());
} catch (Exception e) {
throw new RuntimeException(String.format("Failed to create new test ingestion connection request %s", input.toString()), e);
}
});
}
}
16 changes: 16 additions & 0 deletions datahub-graphql-core/src/main/resources/ingestion.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ extend type Mutation {
Cancel a running execution request, provided the urn of the original execution request
"""
cancelIngestionExecutionRequest(input: CancelIngestionExecutionRequestInput!): String

"""
Create a request to execute a test ingestion connection job
input: Input required for creating a test connection request
"""
createTestConnectionRequest(input: CreateTestConnectionRequestInput!): String
}

"""
Expand Down Expand Up @@ -160,6 +166,16 @@ input CreateIngestionExecutionRequestInput {
ingestionSourceUrn: String!
}

"""
Input for creating a test connection request
"""
input CreateTestConnectionRequestInput {
"""
A JSON-encoded recipe
"""
recipe: String!
}

"""
Input for cancelling an execution request input
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.linkedin.datahub.graphql.resolvers.ingest.execution;

import com.datahub.authentication.Authentication;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.CreateTestConnectionRequestInput;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.config.IngestionConfiguration;
import com.linkedin.mxe.MetadataChangeProposal;
import graphql.schema.DataFetchingEnvironment;
import org.mockito.Mockito;
import org.testng.annotations.Test;

import static com.linkedin.datahub.graphql.resolvers.ingest.IngestTestUtils.*;
import static org.testng.Assert.*;


public class CreateTestConnectionRequestResolverTest {

private static final CreateTestConnectionRequestInput TEST_INPUT = new CreateTestConnectionRequestInput(
"test recipe"
);

@Test
public void testGetSuccess() throws Exception {
// Create resolver
EntityClient mockClient = Mockito.mock(EntityClient.class);
IngestionConfiguration ingestionConfiguration = new IngestionConfiguration();
ingestionConfiguration.setDefaultCliVersion("default");
CreateTestConnectionRequestResolver resolver = new CreateTestConnectionRequestResolver(mockClient, ingestionConfiguration);

// Execute resolver
QueryContext mockContext = getMockAllowContext();
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(TEST_INPUT);
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);

resolver.get(mockEnv).get();

Mockito.verify(mockClient, Mockito.times(1)).ingestProposal(
Mockito.any(MetadataChangeProposal.class),
Mockito.any(Authentication.class)
);
}

@Test
public void testGetUnauthorized() throws Exception {
// Create resolver
EntityClient mockClient = Mockito.mock(EntityClient.class);
IngestionConfiguration ingestionConfiguration = new IngestionConfiguration();
ingestionConfiguration.setDefaultCliVersion("default");
CreateTestConnectionRequestResolver resolver = new CreateTestConnectionRequestResolver(mockClient, ingestionConfiguration);

// Execute resolver
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
QueryContext mockContext = getMockDenyContext();
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(TEST_INPUT);
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);

assertThrows(RuntimeException.class, () -> resolver.get(mockEnv).join());
Mockito.verify(mockClient, Mockito.times(0)).ingestProposal(
Mockito.any(),
Mockito.any(Authentication.class));
}
}

0 comments on commit 1054a4a

Please sign in to comment.