Skip to content

Commit

Permalink
[fix][functions] Fix the download of builtin Functions (apache#17877)
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet committed Nov 24, 2022
1 parent 918c618 commit 67dc69c
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -102,7 +101,7 @@
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
Expand Down Expand Up @@ -1436,14 +1435,22 @@ public StreamingOutput downloadFunction(String tenant, String namespace, String
String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
}

String pkgPath = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName)
.getPackageLocation().getPackagePath();
FunctionMetaData functionMetaData =
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
String pkgPath = functionMetaData.getPackageLocation().getPackagePath();

FunctionDetails.ComponentType componentType =
InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails());

return getStreamingOutput(pkgPath);
return getStreamingOutput(pkgPath, componentType);
}

private StreamingOutput getStreamingOutput(String pkgPath) {
final StreamingOutput streamingOutput = output -> {
return getStreamingOutput(pkgPath, null);
}

private StreamingOutput getStreamingOutput(String pkgPath, FunctionDetails.ComponentType componentType) {
return output -> {
if (pkgPath.startsWith(Utils.HTTP)) {
URL url = URI.create(pkgPath).toURL();
try (InputStream inputStream = url.openStream()) {
Expand All @@ -1455,15 +1462,7 @@ private StreamingOutput getStreamingOutput(String pkgPath) {
Files.copy(file.toPath(), output);
} else if (pkgPath.startsWith(Utils.BUILTIN)
&& !worker().getWorkerConfig().getUploadBuiltinSinksSources()) {
String sType = pkgPath.replaceFirst("^builtin://", "");
final String connectorsDir = worker().getWorkerConfig().getConnectorsDirectory();
log.warn("Processing package {} ; looking at the dir {}", pkgPath, connectorsDir);
TreeMap<String, FunctionArchive> sinksOrSources =
FunctionUtils.searchForFunctions(connectorsDir, true);
Path narPath = sinksOrSources.get(sType).getArchivePath();
if (narPath == null) {
throw new IllegalStateException("Didn't find " + pkgPath + " in " + connectorsDir);
}
Path narPath = getBuiltinArchivePath(pkgPath, componentType);
log.info("Loading {} from {}", pkgPath, narPath);
try (InputStream in = new FileInputStream(narPath.toString())) {
IOUtils.copy(in, output, 1024);
Expand All @@ -1477,14 +1476,34 @@ private StreamingOutput getStreamingOutput(String pkgPath) {
output.flush();
}
} catch (Exception e) {
log.error("Failed download package {} from packageMangment Service", pkgPath, e);
log.error("Failed download package {} from packageManagement Service", pkgPath, e);

}
} else {
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), output, pkgPath);
}
};
return streamingOutput;
}

private Path getBuiltinArchivePath(String pkgPath, FunctionDetails.ComponentType componentType) {
String type = pkgPath.replaceFirst("^builtin://", "");
if (!FunctionDetails.ComponentType.FUNCTION.equals(componentType)) {
Connector connector = worker().getConnectorsManager().getConnector(type);
if (connector != null) {
return connector.getArchivePath();
}
if (componentType != null) {
throw new IllegalStateException("Didn't find " + type + " in built-in connectors");
}
}
FunctionArchive function = worker().getFunctionsManager().getFunction(type);
if (function != null) {
return function.getArchivePath();
}
if (componentType != null) {
throw new IllegalStateException("Didn't find " + type + " in built-in functions");
}
throw new IllegalStateException("Didn't find " + type + " in built-in connectors or functions");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.functions.worker.rest.api.v3;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
Expand All @@ -29,7 +28,6 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileInputStream;
Expand All @@ -44,7 +42,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.UUID;
import java.util.function.Consumer;
import javax.ws.rs.core.Response;
Expand Down Expand Up @@ -78,7 +75,8 @@
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.FunctionsManager;
Expand Down Expand Up @@ -1604,20 +1602,13 @@ public void testDownloadFunctionHttpUrl() throws Exception {
String jarHttpUrl =
"https://repo1.maven.org/maven2/org/apache/pulsar/pulsar-common/2.4.2/pulsar-common-2.4.2.jar";
String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
PulsarWorkerService worker = mock(PulsarWorkerService.class);
doReturn(true).when(worker).isInitialized();
WorkerConfig config = mock(WorkerConfig.class);
when(config.isAuthorizationEnabled()).thenReturn(false);
when(worker.getWorkerConfig()).thenReturn(config);
FunctionsImpl function = new FunctionsImpl(() -> worker);
StreamingOutput streamOutput = function.downloadFunction(jarHttpUrl, null, null);

StreamingOutput streamOutput = resource.downloadFunction(jarHttpUrl, null, null);
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
streamOutput.write(output);
Assert.assertTrue(pkgFile.exists());
if (pkgFile.exists()) {
pkgFile.delete();
}
pkgFile.delete();
}

@Test
Expand All @@ -1626,66 +1617,133 @@ public void testDownloadFunctionFile() throws Exception {
File file = Paths.get(fileUrl.toURI()).toFile();
String fileLocation = file.getAbsolutePath().replace('\\', '/');
String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
PulsarWorkerService worker = mock(PulsarWorkerService.class);
doReturn(true).when(worker).isInitialized();
WorkerConfig config = mock(WorkerConfig.class);
when(config.isAuthorizationEnabled()).thenReturn(false);
when(worker.getWorkerConfig()).thenReturn(config);
FunctionsImpl function = new FunctionsImpl(() -> worker);
StreamingOutput streamOutput = function.downloadFunction("file:///" + fileLocation, null, null);

StreamingOutput streamOutput = resource.downloadFunction("file:///" + fileLocation, null, null);
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
streamOutput.write(output);
Assert.assertTrue(pkgFile.exists());
if (pkgFile.exists()) {
pkgFile.delete();
}
Assert.assertEquals(file.length(), pkgFile.length());
pkgFile.delete();
}

@Test
public void testDownloadFunctionBuiltin() throws Exception {
mockStatic(WorkerUtils.class, ctx -> {
});

public void testDownloadFunctionBuiltinConnector() throws Exception {
URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
File file = Paths.get(fileUrl.toURI()).toFile();
String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();

PulsarWorkerService worker = mock(PulsarWorkerService.class);
doReturn(true).when(worker).isInitialized();
WorkerConfig config = new WorkerConfig()
.setUploadBuiltinSinksSources(false);
when(mockedWorkerService.getWorkerConfig()).thenReturn(config);

WorkerConfig config = mock(WorkerConfig.class);
when(config.isAuthorizationEnabled()).thenReturn(false);
when(config.getUploadBuiltinSinksSources()).thenReturn(false);
when(config.getConnectorsDirectory()).thenReturn("/connectors");
Connector connector = Connector.builder().archivePath(file.toPath()).build();
ConnectorsManager connectorsManager = mock(ConnectorsManager.class);
when(connectorsManager.getConnector("cassandra")).thenReturn(connector);
when(mockedWorkerService.getConnectorsManager()).thenReturn(connectorsManager);

when(worker.getDlogNamespace()).thenReturn(mock(Namespace.class));
when(worker.getWorkerConfig()).thenReturn(config);
FunctionsImpl function = new FunctionsImpl(() -> worker);
StreamingOutput streamOutput = resource.downloadFunction("builtin://cassandra", null, null);

TreeMap<String, FunctionArchive> functions = new TreeMap<>();
FunctionArchive functionArchive = FunctionArchive.builder().archivePath(file.toPath()).build();
functions.put("cassandra", functionArchive);
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
streamOutput.write(output);
output.flush();
output.close();
Assert.assertTrue(pkgFile.exists());
Assert.assertTrue(pkgFile.exists());
Assert.assertEquals(file.length(), pkgFile.length());
pkgFile.delete();
}

mockStatic(FunctionUtils.class, ctx -> {
ctx.when(() -> FunctionUtils.searchForFunctions(anyString(), anyBoolean())).thenReturn(functions);
@Test
public void testDownloadFunctionBuiltinFunction() throws Exception {
URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
File file = Paths.get(fileUrl.toURI()).toFile();
String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();

});
WorkerConfig config = new WorkerConfig()
.setUploadBuiltinSinksSources(false);
when(mockedWorkerService.getWorkerConfig()).thenReturn(config);

FunctionsManager functionsManager = mock(FunctionsManager.class);
FunctionArchive functionArchive = FunctionArchive.builder().archivePath(file.toPath()).build();
when(functionsManager.getFunction("exclamation")).thenReturn(functionArchive);
when(mockedWorkerService.getConnectorsManager()).thenReturn(mock(ConnectorsManager.class));
when(mockedWorkerService.getFunctionsManager()).thenReturn(functionsManager);

StreamingOutput streamOutput = function.downloadFunction("builtin://cassandra", null, null);
StreamingOutput streamOutput = resource.downloadFunction("builtin://exclamation", null, null);

File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
streamOutput.write(output);
output.flush();
output.close();
Assert.assertTrue(pkgFile.exists());
if (pkgFile.exists()) {
Assert.assertEquals(file.length(), pkgFile.length());
pkgFile.delete();
} else {
fail("expected file");
}
Assert.assertEquals(file.length(), pkgFile.length());
pkgFile.delete();
}

@Test
public void testDownloadFunctionBuiltinConnectorByName() throws Exception {
URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
File file = Paths.get(fileUrl.toURI()).toFile();
String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
WorkerConfig config = new WorkerConfig()
.setUploadBuiltinSinksSources(false);
when(mockedWorkerService.getWorkerConfig()).thenReturn(config);

when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);

FunctionMetaData metaData = FunctionMetaData.newBuilder()
.setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("builtin://cassandra"))
.setFunctionDetails(FunctionDetails.newBuilder().setComponentType(FunctionDetails.ComponentType.SINK))
.build();
when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData);

Connector connector = Connector.builder().archivePath(file.toPath()).build();
ConnectorsManager connectorsManager = mock(ConnectorsManager.class);
when(connectorsManager.getConnector("cassandra")).thenReturn(connector);
when(mockedWorkerService.getConnectorsManager()).thenReturn(connectorsManager);

StreamingOutput streamOutput = resource.downloadFunction(tenant, namespace, function, null, null);
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
streamOutput.write(output);
Assert.assertTrue(pkgFile.exists());
Assert.assertEquals(file.length(), pkgFile.length());
pkgFile.delete();
}

@Test
public void testDownloadFunctionBuiltinFunctionByName() throws Exception {
URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
File file = Paths.get(fileUrl.toURI()).toFile();
String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
WorkerConfig config = new WorkerConfig()
.setUploadBuiltinSinksSources(false);
when(mockedWorkerService.getWorkerConfig()).thenReturn(config);

when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);

FunctionMetaData metaData = FunctionMetaData.newBuilder()
.setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("builtin://exclamation"))
.setFunctionDetails(FunctionDetails.newBuilder().setComponentType(FunctionDetails.ComponentType.FUNCTION))
.build();
when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData);

FunctionsManager functionsManager = mock(FunctionsManager.class);
FunctionArchive functionArchive = FunctionArchive.builder().archivePath(file.toPath()).build();
when(functionsManager.getFunction("exclamation")).thenReturn(functionArchive);
when(mockedWorkerService.getConnectorsManager()).thenReturn(mock(ConnectorsManager.class));
when(mockedWorkerService.getFunctionsManager()).thenReturn(functionsManager);

StreamingOutput streamOutput = resource.downloadFunction(tenant, namespace, function, null, null);
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
streamOutput.write(output);
Assert.assertTrue(pkgFile.exists());
Assert.assertEquals(file.length(), pkgFile.length());
pkgFile.delete();
}

@Test
Expand Down

0 comments on commit 67dc69c

Please sign in to comment.