Skip to content

Commit

Permalink
[PIP-193] [feature][connectors] Add support for a transform Function …
Browse files Browse the repository at this point in the history
…in Sinks (#16740)
  • Loading branch information
cbornet authored Aug 31, 2022
1 parent b457d71 commit dab0d1f
Show file tree
Hide file tree
Showing 48 changed files with 1,890 additions and 581 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -711,9 +711,12 @@ public StreamingOutput downloadFunction(
@ApiParam(value = "The namespace of a Pulsar Function")
final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of a Pulsar Function")
final @PathParam("functionName") String functionName) {
final @PathParam("functionName") String functionName,
@ApiParam(value = "Whether to download the transform-function")
final @QueryParam("transform-function") boolean transformFunction) {

return functions().downloadFunction(tenant, namespace, functionName, clientAppId(), clientAuthData());
return functions()
.downloadFunction(tenant, namespace, functionName, clientAppId(), clientAuthData(), transformFunction);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,42 @@ void downloadFunction(String destinationFile, String tenant, String namespace, S
CompletableFuture<Void> downloadFunctionAsync(
String destinationFile, String tenant, String namespace, String function);

/**
* Download Function Code.
*
* @param destinationFile
* file where data should be downloaded to
* @param tenant
* Tenant name
* @param namespace
* Namespace name
* @param function
* Function name
* @param transformFunction
* Whether to download the transform function (for sources and sinks)
* @throws PulsarAdminException
*/
void downloadFunction(String destinationFile, String tenant, String namespace, String function,
boolean transformFunction) throws PulsarAdminException;

/**
* Download Function Code asynchronously.
*
* @param destinationFile
* file where data should be downloaded to
* @param tenant
* Tenant name
* @param namespace
* Namespace name
* @param function
* Function name
* @param transformFunction
* Whether to download the transform function (for sources and sinks)
*/
CompletableFuture<Void> downloadFunctionAsync(
String destinationFile, String tenant, String namespace, String function, boolean transformFunction);


/**
* Deprecated in favor of getting sources and sinks for their own APIs.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,7 @@ public class SinkConfig {
// to change behavior at runtime. Currently, this primarily used by the KubernetesManifestCustomizer
// interface
private String customRuntimeOptions;
private String transformFunction;
private String transformFunctionClassName;
private String transformFunctionConfig;
}
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,22 @@ public CompletableFuture<Void> downloadFunctionAsync(
functions.path(tenant).path(namespace).path(functionName).path("download"));
}

@Override
public void downloadFunction(String destinationPath, String tenant, String namespace, String functionName,
boolean transformFunction) throws PulsarAdminException {
downloadFile(destinationPath, functions.path(tenant).path(namespace).path(functionName).path("download")
.queryParam("transform-function", transformFunction));
}

@Override
public CompletableFuture<Void> downloadFunctionAsync(
String destinationPath, String tenant, String namespace, String functionName, boolean transformFunction) {
return downloadFileAsync(destinationPath,
functions.path(tenant).path(namespace).path(functionName).path("download")
.queryParam("transform-function", transformFunction));
}


@Override
public void downloadFunction(String destinationPath, String path) throws PulsarAdminException {
downloadFile(destinationPath, functions.path("download").queryParam("path", path));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,44 @@ public void testUpdateAuthData() throws Exception {
verify(functions, times(1)).updateFunctionWithUrl(any(FunctionConfig.class), anyString(), eq(updateOptions));
}

@Test
public void testDownloadFunction() throws Exception {
cmd.run(new String[] {
"download",
"--destination-file", JAR_NAME,
"--name", FN_NAME,
"--tenant", TENANT,
"--namespace", NAMESPACE
});
verify(functions, times(1))
.downloadFunction(JAR_NAME, TENANT, NAMESPACE, FN_NAME, false);
}

@Test
public void testDownloadFunctionByPath() throws Exception {
cmd.run(new String[] {
"download",
"--destination-file", JAR_NAME,
"--path", PACKAGE_URL
});
verify(functions, times(1))
.downloadFunction(JAR_NAME, PACKAGE_URL);
}

@Test
public void testDownloadTransformFunction() throws Exception {
cmd.run(new String[] {
"download",
"--destination-file", JAR_NAME,
"--name", FN_NAME,
"--tenant", TENANT,
"--namespace", NAMESPACE,
"--transform-function"
});
verify(functions, times(1))
.downloadFunction(JAR_NAME, TENANT, NAMESPACE, FN_NAME, true);
}


public static class ConsoleOutputCapturer {
private ByteArrayOutputStream stdout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,10 @@ class DownloadFunction extends FunctionCommand {
description = "Path or functionPkgUrl to store the content",
listConverter = StringConverter.class, required = false, hidden = true)
protected String path;
@Parameter(
names = "--transform-function",
description = "Download the transform Function of the connector")
protected Boolean transformFunction = false;

private void mergeArgs() {
if (isBlank(destinationFile) && !isBlank(deprecatedDestinationFile)) {
Expand All @@ -1195,7 +1199,8 @@ void runCmd() throws Exception {
if (path != null) {
getAdmin().functions().downloadFunction(destinationFile, path);
} else {
getAdmin().functions().downloadFunction(destinationFile, tenant, namespace, functionName);
getAdmin().functions()
.downloadFunction(destinationFile, tenant, namespace, functionName, transformFunction);
}
print("Downloaded successfully");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,13 @@ abstract class SinkDetailsCommand extends BaseCommand {
@Parameter(names = "--secrets", description = "The map of secretName to an object that encapsulates "
+ "how the secret is fetched by the underlying secrets provider")
protected String secretsString;
@Parameter(names = "--transform-function", description = "Transform function applied before the Sink")
protected String transformFunction;
@Parameter(names = "--transform-function-classname", description = "The transform function class name")
protected String transformFunctionClassName;
@Parameter(names = "--transform-function-config", description = "Configuration of the transform function "
+ "applied before the Sink")
protected String transformFunctionConfig;

protected SinkConfig sinkConfig;

Expand Down Expand Up @@ -578,6 +585,18 @@ void processArguments() throws Exception {
sinkConfig.setSecrets(secretsMap);
}

if (transformFunction != null) {
sinkConfig.setTransformFunction(transformFunction);
}

if (transformFunctionClassName != null) {
sinkConfig.setTransformFunctionClassName(transformFunctionClassName);
}

if (transformFunctionConfig != null) {
sinkConfig.setTransformFunctionConfig(transformFunctionConfig);
}

// check if configs are valid
validateSinkConfigs(sinkConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ public class TestCmdSinks {
private static final Long RAM = 1024L * 1024L;
private static final Long DISK = 1024L * 1024L * 1024L;
private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon Jul 02 00:33:15 +0000 2018\",\"int\":1000,\"int_string\":\"1000\",\"float\":1000.0,\"float_string\":\"1000.0\"}";
private static final String TRANSFORM_FUNCTION = "transform";
private static final String TRANSFORM_FUNCTION_CLASSNAME = "TransformFunction";
private static final String TRANSFORM_FUNCTION_CONFIG = "{\"test_function_config\": \"\"}";

private PulsarAdmin pulsarAdmin;
private Sinks sink;
Expand Down Expand Up @@ -146,6 +149,11 @@ public SinkConfig getSinkConfig() throws JsonProcessingException {
sinkConfig.setArchive(JAR_FILE_PATH);
sinkConfig.setResources(new Resources(CPU, RAM, DISK));
sinkConfig.setConfigs(createSink.parseConfigs(SINK_CONFIG_STRING));

sinkConfig.setTransformFunction(TRANSFORM_FUNCTION);
sinkConfig.setTransformFunctionClassName(TRANSFORM_FUNCTION_CLASSNAME);
sinkConfig.setTransformFunctionConfig(TRANSFORM_FUNCTION_CONFIG);

return sinkConfig;
}

Expand All @@ -166,6 +174,9 @@ public void testCliCorrect() throws Exception {
RAM,
DISK,
SINK_CONFIG_STRING,
TRANSFORM_FUNCTION,
TRANSFORM_FUNCTION_CLASSNAME,
TRANSFORM_FUNCTION_CONFIG,
sinkConfig
);
}
Expand All @@ -188,6 +199,9 @@ public void testMissingInput() throws Exception {
RAM,
DISK,
SINK_CONFIG_STRING,
TRANSFORM_FUNCTION,
TRANSFORM_FUNCTION_CLASSNAME,
TRANSFORM_FUNCTION_CONFIG,
sinkConfig
);
}
Expand All @@ -211,6 +225,9 @@ public void testMissingCustomSerdeInput() throws Exception {
RAM,
DISK,
SINK_CONFIG_STRING,
TRANSFORM_FUNCTION,
TRANSFORM_FUNCTION_CLASSNAME,
TRANSFORM_FUNCTION_CONFIG,
sinkConfig
);
}
Expand All @@ -233,6 +250,9 @@ public void testMissingTopicPattern() throws Exception {
RAM,
DISK,
SINK_CONFIG_STRING,
TRANSFORM_FUNCTION,
TRANSFORM_FUNCTION_CLASSNAME,
TRANSFORM_FUNCTION_CONFIG,
sinkConfig
);
}
Expand All @@ -255,6 +275,9 @@ public void testMissingProcessingGuarantees() throws Exception {
RAM,
DISK,
SINK_CONFIG_STRING,
TRANSFORM_FUNCTION,
TRANSFORM_FUNCTION_CLASSNAME,
TRANSFORM_FUNCTION_CONFIG,
sinkConfig
);
}
Expand All @@ -277,6 +300,9 @@ public void testMissingArchive() throws Exception {
RAM,
DISK,
SINK_CONFIG_STRING,
TRANSFORM_FUNCTION,
TRANSFORM_FUNCTION_CLASSNAME,
TRANSFORM_FUNCTION_CONFIG,
sinkConfig
);
}
Expand All @@ -301,6 +327,9 @@ public void testInvalidJar() throws Exception {
RAM,
DISK,
SINK_CONFIG_STRING,
TRANSFORM_FUNCTION,
TRANSFORM_FUNCTION_CLASSNAME,
TRANSFORM_FUNCTION_CONFIG,
sinkConfig
);
}
Expand All @@ -323,6 +352,9 @@ public void testMissingConfig() throws Exception {
RAM,
DISK,
null,
TRANSFORM_FUNCTION,
TRANSFORM_FUNCTION_CLASSNAME,
TRANSFORM_FUNCTION_CONFIG,
sinkConfig
);
}
Expand All @@ -341,6 +373,9 @@ private void testCmdSinkCliMissingArgs(
Long ram,
Long disk,
String sinkConfigString,
String transformFunction,
String transformFunctionClassName,
String transformFunctionConfig,
SinkConfig sinkConfig) throws Exception {

// test create sink
Expand All @@ -357,6 +392,9 @@ private void testCmdSinkCliMissingArgs(
createSink.ram = ram;
createSink.disk = disk;
createSink.sinkConfigString = sinkConfigString;
createSink.transformFunction = transformFunction;
createSink.transformFunctionClassName = transformFunctionClassName;
createSink.transformFunctionConfig = transformFunctionConfig;

createSink.processArguments();

Expand All @@ -376,6 +414,9 @@ private void testCmdSinkCliMissingArgs(
updateSink.ram = ram;
updateSink.disk = disk;
updateSink.sinkConfigString = sinkConfigString;
updateSink.transformFunction = transformFunction;
updateSink.transformFunctionClassName = transformFunctionClassName;
updateSink.transformFunctionConfig = transformFunctionConfig;

updateSink.processArguments();

Expand All @@ -395,6 +436,9 @@ private void testCmdSinkCliMissingArgs(
localSinkRunner.ram = ram;
localSinkRunner.disk = disk;
localSinkRunner.sinkConfigString = sinkConfigString;
localSinkRunner.transformFunction = transformFunction;
localSinkRunner.transformFunctionClassName = transformFunctionClassName;
localSinkRunner.transformFunctionConfig = transformFunctionConfig;

localSinkRunner.processArguments();

Expand Down Expand Up @@ -539,6 +583,9 @@ public void testCliOverwriteConfigFile() throws Exception {
testSinkConfig.setResources(new Resources(CPU + 1, RAM + 1, DISK + 1));
testSinkConfig.setConfigs(createSink.parseConfigs("{\"created_at-prime\":\"Mon Jul 02 00:33:15 +0000 2018\", \"otherConfigProperties\":{\"property1.value\":\"value1\",\"property2.value\":\"value2\"}}"));

testSinkConfig.setTransformFunction(TRANSFORM_FUNCTION + "-prime");
testSinkConfig.setTransformFunction(TRANSFORM_FUNCTION_CLASSNAME + "-prime");
testSinkConfig.setTransformFunction("{\"test_function_config\": \"prime\"}");

SinkConfig expectedSinkConfig = getSinkConfig();

Expand All @@ -563,6 +610,9 @@ public void testCliOverwriteConfigFile() throws Exception {
RAM,
DISK,
SINK_CONFIG_STRING,
TRANSFORM_FUNCTION,
TRANSFORM_FUNCTION_CLASSNAME,
TRANSFORM_FUNCTION_CONFIG,
file.getAbsolutePath(),
expectedSinkConfig
);
Expand All @@ -583,6 +633,9 @@ private void testMixCliAndConfigFile(
Long ram,
Long disk,
String sinkConfigString,
String transformFunction,
String transformFunctionClassName,
String transformFunctionConfig,
String sinkConfigFile,
SinkConfig sinkConfig
) throws Exception {
Expand All @@ -602,6 +655,9 @@ private void testMixCliAndConfigFile(
createSink.ram = ram;
createSink.disk = disk;
createSink.sinkConfigString = sinkConfigString;
createSink.transformFunction = transformFunction;
createSink.transformFunctionClassName = transformFunctionClassName;
createSink.transformFunctionConfig = transformFunctionConfig;
createSink.sinkConfigFile = sinkConfigFile;

createSink.processArguments();
Expand All @@ -622,6 +678,9 @@ private void testMixCliAndConfigFile(
updateSink.ram = ram;
updateSink.disk = disk;
updateSink.sinkConfigString = sinkConfigString;
updateSink.transformFunction = transformFunction;
updateSink.transformFunctionClassName = transformFunctionClassName;
updateSink.transformFunctionConfig = transformFunctionConfig;
updateSink.sinkConfigFile = sinkConfigFile;

updateSink.processArguments();
Expand All @@ -642,6 +701,9 @@ private void testMixCliAndConfigFile(
localSinkRunner.ram = ram;
localSinkRunner.disk = disk;
localSinkRunner.sinkConfigString = sinkConfigString;
localSinkRunner.transformFunction = transformFunction;
localSinkRunner.transformFunctionClassName = transformFunctionClassName;
localSinkRunner.transformFunctionConfig = transformFunctionConfig;
localSinkRunner.sinkConfigFile = sinkConfigFile;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
public class InstanceConfig {
private int instanceId;
private String functionId;
private String transformFunctionId;
private String functionVersion;
private FunctionDetails functionDetails;
private int maxBufferedTuples;
Expand Down
Loading

0 comments on commit dab0d1f

Please sign in to comment.