Skip to content

Commit

Permalink
Fix downloading of builtin functions
Browse files Browse the repository at this point in the history
The download function was not considering the component type and only looking into the connectors dir. Also we were scanning the connectors dir instead of using the info from the ConnectorsManager. This change looks at the component type if the download function is called with tenant+ns+name to choose between the ConnectorsManager and the FunctionsManager to get the archive path.
If the download function is called with a direct path, we look first into the ConnectorsManager then into the FunctionsManager.
  • Loading branch information
cbornet committed Sep 29, 2022
1 parent f0b6348 commit e64931c
Showing 1 changed file with 33 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -102,7 +101,7 @@
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
Expand Down Expand Up @@ -1473,10 +1472,18 @@ public StreamingOutput downloadFunction(String tenant, String namespace, String
? functionMetaData.getTransformFunctionPackageLocation().getPackagePath()
: functionMetaData.getPackageLocation().getPackagePath();

return getStreamingOutput(pkgPath);
FunctionDetails.ComponentType componentType = transformFunction
? FunctionDetails.ComponentType.FUNCTION
: InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails());

return getStreamingOutput(pkgPath, componentType);
}

private StreamingOutput getStreamingOutput(String pkgPath) {
return getStreamingOutput(pkgPath, null);
}

private StreamingOutput getStreamingOutput(String pkgPath, FunctionDetails.ComponentType componentType) {
return output -> {
if (pkgPath.startsWith(Utils.HTTP)) {
URL url = URI.create(pkgPath).toURL();
Expand All @@ -1489,15 +1496,7 @@ private StreamingOutput getStreamingOutput(String pkgPath) {
Files.copy(file.toPath(), output);
} else if (pkgPath.startsWith(Utils.BUILTIN)
&& !worker().getWorkerConfig().getUploadBuiltinSinksSources()) {
String sType = pkgPath.replaceFirst("^builtin://", "");
final String connectorsDir = worker().getWorkerConfig().getConnectorsDirectory();
log.warn("Processing package {} ; looking at the dir {}", pkgPath, connectorsDir);
TreeMap<String, FunctionArchive> sinksOrSources =
FunctionUtils.searchForFunctions(connectorsDir, true);
Path narPath = sinksOrSources.get(sType).getArchivePath();
if (narPath == null) {
throw new IllegalStateException("Didn't find " + pkgPath + " in " + connectorsDir);
}
Path narPath = getBuiltinArchivePath(pkgPath, componentType);
log.info("Loading {} from {}", pkgPath, narPath);
try (InputStream in = new FileInputStream(narPath.toString())) {
IOUtils.copy(in, output, 1024);
Expand All @@ -1511,7 +1510,7 @@ private StreamingOutput getStreamingOutput(String pkgPath) {
output.flush();
}
} catch (Exception e) {
log.error("Failed download package {} from packageMangment Service", pkgPath, e);
log.error("Failed download package {} from packageManagement Service", pkgPath, e);

}
} else {
Expand All @@ -1520,6 +1519,27 @@ private StreamingOutput getStreamingOutput(String pkgPath) {
};
}

private Path getBuiltinArchivePath(String pkgPath, FunctionDetails.ComponentType componentType) {
String type = pkgPath.replaceFirst("^builtin://", "");
if (!FunctionDetails.ComponentType.FUNCTION.equals(componentType)) {
Connector connector = worker().getConnectorsManager().getConnector(type);
if (connector != null) {
return connector.getArchivePath();
}
if (componentType != null) {
throw new IllegalStateException("Didn't find " + type + " in built-in connectors");
}
}
FunctionArchive function = worker().getFunctionsManager().getFunction(type);
if (function != null) {
return function.getArchivePath();
}
if (componentType != null) {
throw new IllegalStateException("Didn't find " + type + " in built-in functions");
}
throw new IllegalStateException("Didn't find " + type + " in built-in connectors or functions");
}

@Override
public StreamingOutput downloadFunction(
final String path, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
Expand Down

0 comments on commit e64931c

Please sign in to comment.