Skip to content

Commit

Permalink
Merge pull request #16 from viadee-internal/feat-typeOfUserMode
Browse files Browse the repository at this point in the history
Feat: type of user mode
  • Loading branch information
micudaj authored Mar 20, 2024
2 parents 337cc66 + 76c36e6 commit c779e98
Show file tree
Hide file tree
Showing 23 changed files with 294 additions and 91 deletions.
102 changes: 91 additions & 11 deletions element-templates/kubeflow-connector.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,24 @@
}
},
{
"id": "configuration.multiusernamespace",
"label": "Kubeflow Namespace",
"description": "If Kubeflow is running in a multiuser mode, define namespace to use.",
"type": "String",
"id": "configuration.typeOfUserMode",
"label": "Type of User Mode",
"type": "Dropdown",
"group": "configuration",
"feel": "optional",
"optional": true,
"tooltip": "If left empty, namespace is read from environment variable 'KF_CONNECTOR_MULTIUSER_NS'.",
"value": "multiUserMode",
"optional": false,
"constraints": {
"notEmpty": true
},
"choices": [
{
"name": "Multi-User Mode",
"value": "multiUserMode"
}
],
"binding": {
"type": "zeebe:input",
"name": "configuration.multiusernamespace"
"name": "configuration.typeOfUserMode"
}
},
{
Expand Down Expand Up @@ -378,6 +385,10 @@
"name": "Get Runs",
"value": "get_runs"
},
{
"name": "Get Experiment by ID",
"value": "get_experiment_by_id"
},
{
"name": "Get Run by ID",
"value": "get_run_by_id"
Expand Down Expand Up @@ -411,6 +422,34 @@
]
}
},
{
"id": "kubeflowapi.namespace",
"label": "Namespace",
"description": "Specify the name of the namespace.",
"type": "String",
"group": "kubeflowapi",
"feel": "optional",
"optional": false,
"binding": {
"type": "zeebe:input",
"name": "kubeflowapi.namespace"
},
"constraints": {
"notEmpty": true
},
"condition": {
"allMatch": [
{
"property": "configuration.typeOfUserMode",
"equals": "multiUserMode"
},
{
"property": "kubeflowapi.operation",
"oneOf": ["get_experiments", "get_runs", "get_run_by_name", "create_experiment"]
}
]
}
},
{
"id": "kubeflowapi.pipelineId",
"label": "Pipeline ID",
Expand Down Expand Up @@ -547,9 +586,30 @@
]
}
},
{
"id": "kubeflowapi.experimentId",
"label": "Experiment ID",
"description": "Specify the id of the experiment.",
"type": "String",
"group": "kubeflowapi",
"feel": "optional",
"constraints": {
"notEmpty": true
},
"binding": {
"type": "zeebe:input",
"name": "kubeflowapi.experimentId"
},
"condition": {
"property": "kubeflowapi.operation",
"oneOf": [
"get_experiment_by_id"
]
}
},
{
"id": "kubeflowapi.runName",
"label": "Run Name",
"label": "Suffix of Run Name",
"description": "Specify the suffix of the name of the run.",
"type": "String",
"group": "kubeflowapi",
Expand All @@ -565,12 +625,33 @@
"condition": {
"property": "kubeflowapi.operation",
"oneOf": [
"get_run_by_name",
"start_run",
"start_run_and_monitor"
]
}
},
{
"id": "kubeflowapi.runName",
"label": "Run Name",
"description": "Specify the name of the run.",
"type": "String",
"group": "kubeflowapi",
"feel": "optional",
"optional": false,
"constraints": {
"notEmpty": true
},
"binding": {
"type": "zeebe:input",
"name": "kubeflowapi.runName"
},
"condition": {
"property": "kubeflowapi.operation",
"oneOf": [
"get_run_by_name"
]
}
},
{
"id": "kubeflowapi.runParameters",
"label": "Run parameters",
Expand Down Expand Up @@ -641,7 +722,6 @@
"type": "String",
"group": "output",
"feel": "required",
"value": "={result: state}",
"binding": {
"type": "zeebe:taskHeader",
"key": "resultExpression"
Expand Down
1 change: 0 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@
<environmentVariables>
<!--set env vars for KubeflowConnectorExecutorTest.java-->
<KF_CONNECTOR_URL>http://localhost:8080</KF_CONNECTOR_URL>
<KF_CONNECTOR_MULTIUSER_NS>testNamespace</KF_CONNECTOR_MULTIUSER_NS>
<!--set env vars for AuthUtilTest.java-->
<KF_AUTH_MODE>oauth_cc</KF_AUTH_MODE>
<KF_AUTH_OAUTH_TOKEN_ENDPOINT>https//idProviderEndpoint.de</KF_AUTH_OAUTH_TOKEN_ENDPOINT>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.viadee.bpm.camunda.connectors.kubeflow.integration;

import de.viadee.bpm.camunda.connectors.kubeflow.enums.TypeOfUserModeEnum;
import java.net.http.HttpResponse;

import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -47,8 +48,7 @@ protected void setUp() throws Exception {
private Configuration createConfiguration() throws Exception {
String kubeflowUrl = getKubeflowUrl();

return new Configuration(kubeflowUrl,
getEnvOrDefault(KUBEFLOW_NAMESPACE_ENV_KEY, DEFAULT_KUBEFLOW_NAMESPACE));
return new Configuration(kubeflowUrl, TypeOfUserModeEnum.MULTI_USER_MODE.getValue());
}

private OAuthAuthenticationClientCredentialsFlow createOAuthAuthenticationClientCredentialsFlow() throws Exception {
Expand Down Expand Up @@ -82,8 +82,9 @@ protected Configuration getConfiguration() {
protected KubeflowConnectorExecutor getExecutor(String pipelineVersion, String operation, String experimentName,
String pipelineId, String experimentId, String runName)
throws Exception {
var namespace = getEnvOrDefault(KUBEFLOW_NAMESPACE_ENV_KEY, DEFAULT_KUBEFLOW_NAMESPACE);
KubeflowApi kubeflowApi = new KubeflowApi(pipelineVersion, operation, null, runName,
null, pipelineId, experimentId, null, null, experimentName, null, null);
null, pipelineId, experimentId, null, null, experimentName, null, null, namespace);
KubeflowConnectorRequest kubeflowConnectorRequest = new KubeflowConnectorRequest(
this.createOAuthAuthenticationClientCredentialsFlow(), // Authentication via OAuth
this.getConfiguration(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class RunsIT extends BaseIntegrationTest {

private void createRun(String pipelineVersion, String runName, String pipelineId, String experimentId)
throws Exception {
var test = getExecutor(pipelineVersion, "start_run", null, pipelineId, experimentId, runName).execute();
getExecutor(pipelineVersion, "start_run", null, pipelineId, experimentId, runName).execute();
}

private List<String> getNamesOfRuns(String pipelineVersion) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package de.viadee.bpm.camunda.connectors.kubeflow.entities.input;

import jakarta.validation.constraints.NotEmpty;

public record Configuration(
String kubeflowUrl,
String multiusernamespace) {
@NotEmpty
String typeOfUserMode) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,8 @@ public record KubeflowApi(

String experimentDescription,

Map<String, String> httpHeaders
Map<String, String> httpHeaders,

String namespace

) { }
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@ public enum KubeflowApiOperationsEnum {
"/pipeline/apis/%s/runs"),
START_RUN_AND_MONITOR("start_run_and_monitor","POST", false,
"/pipeline/apis/%s/runs"),
CREATE_EXPERIMENT("create_experiment", "POST", false,
CREATE_EXPERIMENT("create_experiment", "POST", true,
"/pipeline/apis/%s/experiments"),
GET_EXPERIMENT_BY_ID("get_experiment_by_id", "GET", false,
"/pipeline/apis/%s/experiments");

private final String value;
private final String httpMethod;
private final boolean requiresMultiuserFilter;
private final boolean isNamespaceFilterRequired;
private final String apiUrl;

KubeflowApiOperationsEnum(String value, String httpMethod, boolean requiresMultiuserFilter, String apiUrl) {
KubeflowApiOperationsEnum(String value, String httpMethod, boolean isNamespaceFilterRequired, String apiUrl) {
this.value = value;
this.httpMethod = httpMethod;
this.requiresMultiuserFilter = requiresMultiuserFilter;
this.isNamespaceFilterRequired = isNamespaceFilterRequired;
this.apiUrl = apiUrl;
}

Expand All @@ -40,8 +42,8 @@ public String getHttpMethod() {
return httpMethod;
}

public boolean requiresMultiuserFilter() {
return requiresMultiuserFilter;
public boolean isNamespaceFilterRequired() {
return isNamespaceFilterRequired;
}

public String getApiUrl() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package de.viadee.bpm.camunda.connectors.kubeflow.enums;

import java.util.Arrays;

public enum TypeOfUserModeEnum {
MULTI_USER_MODE("multiUserMode");

private final String value;

TypeOfUserModeEnum(String value) {
this.value = value;
}

public String getValue() {
return value;
}

public static TypeOfUserModeEnum fromValue(String value) {
return Arrays
.stream(values())
.filter(typeOfUserModeEnum -> typeOfUserModeEnum.value.equals(value))
.findFirst()
.orElseThrow(
() -> new IllegalArgumentException("Unbekannter Wert: " + value)
);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.viadee.bpm.camunda.connectors.kubeflow.services;

import de.viadee.bpm.camunda.connectors.kubeflow.enums.TypeOfUserModeEnum;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URI;
Expand Down Expand Up @@ -38,7 +39,6 @@
public class KubeflowConnectorExecutor {

private static final String KUBEFLOW_URL_ENV = "KF_CONNECTOR_URL";
private static final String KUBEFLOW_NAMESPACE_ENV = "KF_CONNECTOR_MULTIUSER_NS";
private static final String URI_PARAMETER_FILTER = "filter";
private static final Pair<String, String> URI_PARAMETER_PAIR_V1_TYPE_NS = Pair.of("resource_reference_key.type",
"NAMESPACE");
Expand All @@ -51,7 +51,7 @@ public class KubeflowConnectorExecutor {
protected KubeflowApiOperationsEnum kubeflowApiOperationsEnum;
protected HttpRequest httpRequest;
protected HttpClient httpClient;
protected String kubeflowMultiNs;
protected boolean isMultiUserMode;
protected String kubeflowUrl;

public KubeflowConnectorExecutor(KubeflowConnectorRequest connectorRequest, long processInstanceKey,
Expand Down Expand Up @@ -101,18 +101,17 @@ protected String getFilterString() {

private void setConfigurationParameters() {
var configPropertyGroup = connectorRequest.getConfiguration();
var typeOfUserMode = TypeOfUserModeEnum.fromValue(configPropertyGroup.typeOfUserMode());

kubeflowUrl = System.getenv(KUBEFLOW_URL_ENV);
kubeflowMultiNs = System.getenv(KUBEFLOW_NAMESPACE_ENV);

if (configPropertyGroup != null) {
kubeflowUrl = StringUtils.isBlank(configPropertyGroup.kubeflowUrl()) ? kubeflowUrl : configPropertyGroup.kubeflowUrl();
kubeflowMultiNs = StringUtils.isBlank(configPropertyGroup.multiusernamespace()) ? kubeflowMultiNs
: configPropertyGroup.multiusernamespace();
isMultiUserMode = TypeOfUserModeEnum.MULTI_USER_MODE.equals(typeOfUserMode);
}

if (kubeflowUrl == null || kubeflowMultiNs == null) {
throw new RuntimeException("Configuration parameters not found: url, cookie, and/or namespace null.");
if (kubeflowUrl == null) {
throw new RuntimeException("Configuration parameters not found: kubeflow url is null.");
}
}

Expand Down Expand Up @@ -190,18 +189,20 @@ private void addFilter(URIBuilder uriBuilder) throws UnsupportedEncodingExceptio
}

protected void addNamespaceFilter(URIBuilder uriBuilder) {
if (kubeflowApiOperationsEnum.requiresMultiuserFilter()) {
if (KubeflowApisEnum.PIPELINES_V1.equals(kubeflowApisEnum)) {
uriBuilder.addParameter(URI_PARAMETER_PAIR_V1_TYPE_NS.getKey(),
if (isMultiUserMode) {
if (kubeflowApiOperationsEnum.isNamespaceFilterRequired()) {
var namespace = connectorRequest.getKubeflowapi().namespace();
if (KubeflowApisEnum.PIPELINES_V1.equals(kubeflowApisEnum)) {
uriBuilder.addParameter(URI_PARAMETER_PAIR_V1_TYPE_NS.getKey(),
URI_PARAMETER_PAIR_V1_TYPE_NS.getValue());
uriBuilder.addParameter(URI_PARAMETER_V1_ID, kubeflowMultiNs);
} else {
uriBuilder.addParameter(URI_PARAMETER_V2_NS, kubeflowMultiNs);
uriBuilder.addParameter(URI_PARAMETER_V1_ID, namespace);
} else {
uriBuilder.addParameter(URI_PARAMETER_V2_NS, namespace);
}
}
}
}

// Sample: 'password=123&custom=secret&username=abc&ts=1570704369823'
public static HttpRequest.BodyPublisher ofFormData(Map<String, Object> data) {
var builder = new StringBuilder();
for (Map.Entry<String, Object> entry : data.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,27 @@ protected BodyPublisher buildPayloadForKubeflowEndpoint() {
}

private V1ApiExperiment getPayloadForEndpointV1() {
var v1ApiResourceReference = new V1ApiResourceReference()
.key(new V1ApiResourceKey()
.type(V1ApiResourceType.NAMESPACE)
.id(super.kubeflowMultiNs));

var v1ApiResourceReference = super.isMultiUserMode ? createV1ApiResourceReference() : null;
var v1ApiExperiment = new V1ApiExperiment()
.name(getName())
.description(getDescription())
.addResourceReferencesItem(v1ApiResourceReference);

return v1ApiExperiment;
}

private V1ApiResourceReference createV1ApiResourceReference() {
return new V1ApiResourceReference().key(
new V1ApiResourceKey()
.type(V1ApiResourceType.NAMESPACE)
.id(connectorRequest.getKubeflowapi().namespace()));
}

private Map<String, Object> getPayloadForEndpointV2() {
var namespace = super.isMultiUserMode ? connectorRequest.getKubeflowapi().namespace() : null;
var v2Beta1Experiment = new V2beta1Experiment()
.displayName(getName())
.description(getDescription())
.namespace(super.kubeflowMultiNs);
.namespace(namespace);
return JsonHelper.objectMapper.convertValue(v2Beta1Experiment,
new TypeReference<>() {});
}
Expand Down
Loading

0 comments on commit c779e98

Please sign in to comment.