From e529077379e49338d9b2f09d95e7b80a25697fa3 Mon Sep 17 00:00:00 2001 From: Andre Strothmann Date: Sat, 16 Mar 2024 14:25:00 +0100 Subject: [PATCH 01/14] wip --- element-templates/kubeflow-connector.json | 56 ++++++++++++++++--- .../entities/input/Configuration.java | 5 +- .../kubeflow/entities/input/KubeflowApi.java | 2 + .../enums/KubeflowApiOperationsEnum.java | 12 ++-- .../kubeflow/enums/TypeOfUserModeEnum.java | 16 ++++++ .../services/KubeflowConnectorExecutor.java | 28 +++++----- ...flowConnectorExecutorCreateExperiment.java | 6 +- src/test/resources/application.properties | 2 +- 8 files changed, 94 insertions(+), 33 deletions(-) create mode 100644 src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/enums/TypeOfUserModeEnum.java diff --git a/element-templates/kubeflow-connector.json b/element-templates/kubeflow-connector.json index 2c434ed..06d0d85 100644 --- a/element-templates/kubeflow-connector.json +++ b/element-templates/kubeflow-connector.json @@ -74,17 +74,29 @@ } }, { - "id": "configuration.multiusernamespace", - "label": "Kubeflow Namespace", - "description": "If Kubeflow is running in a multiuser mode, define namespace to use.", - "type": "String", + "id": "configuration.typeOfUserMode", + "label": "Type of User Mode", + "description": "Specify the type of user mode in use.", + "type": "Dropdown", "group": "configuration", - "feel": "optional", - "optional": true, - "tooltip": "If left empty, namespace is read from environment variable 'KF_CONNECTOR_MULTIUSER_NS'.", + "value": "singleUserMode", + "optional": false, + "constraints": { + "notEmpty": true + }, + "choices": [ + { + "name": "Single-User Mode", + "value": "singleUserMode" + }, + { + "name": "Multi-User Mode", + "value": "multiUserMode" + } + ], "binding": { "type": "zeebe:input", - "name": "configuration.multiusernamespace" + "name": "configuration.typeOfUserMode" } }, { @@ -411,6 +423,34 @@ ] } }, + { + "id": "kubeflowapi.namespace", + "label": "Namespace", + "description": "Specify the name of the namespace.", + "type": "String", + "group": "kubeflowapi", + "feel": "optional", + "optional": false, + "binding": { + "type": "zeebe:input", + "name": "kubeflowapi.namespace" + }, + "constraints": { + "notEmpty": true + }, + "condition": { + "allMatch": [ + { + "property": "configuration.typeOfUserMode", + "equals": "multiUserMode" + }, + { + "property": "kubeflowapi.operation", + "oneOf": ["get_experiments", "get_runs", "get_run_by_name", "create_experiment"] + } + ] + } + }, { "id": "kubeflowapi.pipelineId", "label": "Pipeline ID", diff --git a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/entities/input/Configuration.java b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/entities/input/Configuration.java index 466bc62..9afc420 100644 --- a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/entities/input/Configuration.java +++ b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/entities/input/Configuration.java @@ -1,6 +1,9 @@ package de.viadee.bpm.camunda.connectors.kubeflow.entities.input; +import jakarta.validation.constraints.NotEmpty; + public record Configuration( String kubeflowUrl, - String multiusernamespace) { + @NotEmpty + String typeOfUserMode) { } diff --git a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/entities/input/KubeflowApi.java b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/entities/input/KubeflowApi.java index f7e7817..178e0f7 100644 --- a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/entities/input/KubeflowApi.java +++ b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/entities/input/KubeflowApi.java @@ -11,6 +11,8 @@ public record KubeflowApi( @NotEmpty String operation, + String namespace, + String runId, String runName, diff --git a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/enums/KubeflowApiOperationsEnum.java b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/enums/KubeflowApiOperationsEnum.java index 31cee5c..16df081 100644 --- a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/enums/KubeflowApiOperationsEnum.java +++ b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/enums/KubeflowApiOperationsEnum.java @@ -17,18 +17,18 @@ public enum KubeflowApiOperationsEnum { "/pipeline/apis/%s/runs"), START_RUN_AND_MONITOR("start_run_and_monitor","POST", false, "/pipeline/apis/%s/runs"), - CREATE_EXPERIMENT("create_experiment", "POST", false, + CREATE_EXPERIMENT("create_experiment", "POST", true, "/pipeline/apis/%s/experiments"); private final String value; private final String httpMethod; - private final boolean requiresMultiuserFilter; + private final boolean isNamespaceFilterRequired; private final String apiUrl; - KubeflowApiOperationsEnum(String value, String httpMethod, boolean requiresMultiuserFilter, String apiUrl) { + KubeflowApiOperationsEnum(String value, String httpMethod, boolean isNamespaceFilterRequired, String apiUrl) { this.value = value; this.httpMethod = httpMethod; - this.requiresMultiuserFilter = requiresMultiuserFilter; + this.isNamespaceFilterRequired = isNamespaceFilterRequired; this.apiUrl = apiUrl; } @@ -40,8 +40,8 @@ public String getHttpMethod() { return httpMethod; } - public boolean requiresMultiuserFilter() { - return requiresMultiuserFilter; + public boolean isNamespaceFilterRequired() { + return isNamespaceFilterRequired; } public String getApiUrl() { diff --git a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/enums/TypeOfUserModeEnum.java b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/enums/TypeOfUserModeEnum.java new file mode 100644 index 0000000..7335cf2 --- /dev/null +++ b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/enums/TypeOfUserModeEnum.java @@ -0,0 +1,16 @@ +package de.viadee.bpm.camunda.connectors.kubeflow.enums; + +public enum TypeOfUserModeEnum { + SINGLE_USER_MODE("singleUserMode"), + MULTI_USER_MODE("multiUserMode"); + + private final String value; + + TypeOfUserModeEnum(String value) { + this.value = value; + } + + public String getValue() { + return value; + } +} diff --git a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutor.java b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutor.java index 1e78e99..ab7ae05 100644 --- a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutor.java +++ b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutor.java @@ -1,5 +1,6 @@ package de.viadee.bpm.camunda.connectors.kubeflow.services; +import de.viadee.bpm.camunda.connectors.kubeflow.enums.TypeOfUserModeEnum; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URI; @@ -38,7 +39,6 @@ public class KubeflowConnectorExecutor { private static final String KUBEFLOW_URL_ENV = "KF_CONNECTOR_URL"; - private static final String KUBEFLOW_NAMESPACE_ENV = "KF_CONNECTOR_MULTIUSER_NS"; private static final String URI_PARAMETER_FILTER = "filter"; private static final Pair URI_PARAMETER_PAIR_V1_TYPE_NS = Pair.of("resource_reference_key.type", "NAMESPACE"); @@ -51,7 +51,7 @@ public class KubeflowConnectorExecutor { protected KubeflowApiOperationsEnum kubeflowApiOperationsEnum; protected HttpRequest httpRequest; protected HttpClient httpClient; - protected String kubeflowMultiNs; + protected boolean isMultiUserMode; protected String kubeflowUrl; public KubeflowConnectorExecutor(KubeflowConnectorRequest connectorRequest, long processInstanceKey, @@ -103,16 +103,14 @@ private void setConfigurationParameters() { var configPropertyGroup = connectorRequest.getConfiguration(); kubeflowUrl = System.getenv(KUBEFLOW_URL_ENV); - kubeflowMultiNs = System.getenv(KUBEFLOW_NAMESPACE_ENV); if (configPropertyGroup != null) { kubeflowUrl = StringUtils.isBlank(configPropertyGroup.kubeflowUrl()) ? kubeflowUrl : configPropertyGroup.kubeflowUrl(); - kubeflowMultiNs = StringUtils.isBlank(configPropertyGroup.multiusernamespace()) ? kubeflowMultiNs - : configPropertyGroup.multiusernamespace(); + isMultiUserMode = TypeOfUserModeEnum.MULTI_USER_MODE.equals(configPropertyGroup.typeOfUserMode()) ? true : false; } - if (kubeflowUrl == null || kubeflowMultiNs == null) { - throw new RuntimeException("Configuration parameters not found: url, cookie, and/or namespace null."); + if (kubeflowUrl == null) { + throw new RuntimeException("Configuration parameters not found: kubeflow url is null."); } } @@ -190,18 +188,20 @@ private void addFilter(URIBuilder uriBuilder) throws UnsupportedEncodingExceptio } protected void addNamespaceFilter(URIBuilder uriBuilder) { - if (kubeflowApiOperationsEnum.requiresMultiuserFilter()) { - if (KubeflowApisEnum.PIPELINES_V1.equals(kubeflowApisEnum)) { - uriBuilder.addParameter(URI_PARAMETER_PAIR_V1_TYPE_NS.getKey(), + if (isMultiUserMode) { + if (kubeflowApiOperationsEnum.isNamespaceFilterRequired()) { + var namespace = connectorRequest.getKubeflowapi().namespace(); + if (KubeflowApisEnum.PIPELINES_V1.equals(kubeflowApisEnum)) { + uriBuilder.addParameter(URI_PARAMETER_PAIR_V1_TYPE_NS.getKey(), URI_PARAMETER_PAIR_V1_TYPE_NS.getValue()); - uriBuilder.addParameter(URI_PARAMETER_V1_ID, kubeflowMultiNs); - } else { - uriBuilder.addParameter(URI_PARAMETER_V2_NS, kubeflowMultiNs); + uriBuilder.addParameter(URI_PARAMETER_V1_ID, namespace); + } else { + uriBuilder.addParameter(URI_PARAMETER_V2_NS, namespace); + } } } } - // Sample: 'password=123&custom=secret&username=abc&ts=1570704369823' public static HttpRequest.BodyPublisher ofFormData(Map data) { var builder = new StringBuilder(); for (Map.Entry entry : data.entrySet()) { diff --git a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorCreateExperiment.java b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorCreateExperiment.java index 69299b6..0fb6106 100644 --- a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorCreateExperiment.java +++ b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorCreateExperiment.java @@ -43,10 +43,10 @@ protected BodyPublisher buildPayloadForKubeflowEndpoint() { } private V1ApiExperiment getPayloadForEndpointV1() { - var v1ApiResourceReference = new V1ApiResourceReference() + var v1ApiResourceReference = new V1ApiResourceReference() // TODO in singleUserMode no namespace is needed .key(new V1ApiResourceKey() .type(V1ApiResourceType.NAMESPACE) - .id(super.kubeflowMultiNs)); + .id(connectorRequest.getKubeflowapi().namespace())); var v1ApiExperiment = new V1ApiExperiment() .name(getName()) @@ -60,7 +60,7 @@ private Map getPayloadForEndpointV2() { var v2Beta1Experiment = new V2beta1Experiment() .displayName(getName()) .description(getDescription()) - .namespace(super.kubeflowMultiNs); + .namespace(connectorRequest.getKubeflowapi().namespace()); return JsonHelper.objectMapper.convertValue(v2Beta1Experiment, new TypeReference<>() {}); } diff --git a/src/test/resources/application.properties b/src/test/resources/application.properties index c445359..630c9e7 100644 --- a/src/test/resources/application.properties +++ b/src/test/resources/application.properties @@ -1,6 +1,6 @@ # Configuration for running connectors locally in bundle with connector-runtime server.port=9898 -zeebe.client.broker.gateway-address=192.168.1.2:26500 +zeebe.client.broker.gateway-address=localhost:26500 zeebe.client.security.plaintext=true camunda.connector.polling.enabled=false zeebe.client.worker.max-jobs-active=5 From 10577ec7b5de7630be80d11356bab8109044c968 Mon Sep 17 00:00:00 2001 From: Andre Strothmann Date: Sat, 16 Mar 2024 16:05:55 +0100 Subject: [PATCH 02/14] wip --- .../integration/BaseIntegrationTest.java | 7 +- .../kubeflow/integration/RunsIT.java | 2 +- .../kubeflow/entities/input/KubeflowApi.java | 7 +- ...flowConnectorExecutorCreateExperiment.java | 17 +++-- .../KubeflowConnectorExecutorStartRun.java | 70 ++++++++++++++++++- .../services/async/KubeflowCallable.java | 2 +- .../KubeflowConnectorFunctionTest.java | 6 +- .../kubeflow/async/ExecutionHandlerTest.java | 4 +- 8 files changed, 94 insertions(+), 21 deletions(-) diff --git a/src/it/de/viadee/bpm/camunda/connectors/kubeflow/integration/BaseIntegrationTest.java b/src/it/de/viadee/bpm/camunda/connectors/kubeflow/integration/BaseIntegrationTest.java index 292ccda..42d227e 100644 --- a/src/it/de/viadee/bpm/camunda/connectors/kubeflow/integration/BaseIntegrationTest.java +++ b/src/it/de/viadee/bpm/camunda/connectors/kubeflow/integration/BaseIntegrationTest.java @@ -1,5 +1,6 @@ package de.viadee.bpm.camunda.connectors.kubeflow.integration; +import de.viadee.bpm.camunda.connectors.kubeflow.enums.TypeOfUserModeEnum; import java.net.http.HttpResponse; import org.junit.jupiter.api.BeforeEach; @@ -47,8 +48,7 @@ protected void setUp() throws Exception { private Configuration createConfiguration() throws Exception { String kubeflowUrl = getKubeflowUrl(); - return new Configuration(kubeflowUrl, - getEnvOrDefault(KUBEFLOW_NAMESPACE_ENV_KEY, DEFAULT_KUBEFLOW_NAMESPACE)); + return new Configuration(kubeflowUrl, TypeOfUserModeEnum.MULTI_USER_MODE.getValue()); } private OAuthAuthenticationClientCredentialsFlow createOAuthAuthenticationClientCredentialsFlow() throws Exception { @@ -82,8 +82,9 @@ protected Configuration getConfiguration() { protected KubeflowConnectorExecutor getExecutor(String pipelineVersion, String operation, String experimentName, String pipelineId, String experimentId, String runName) throws Exception { + var namespace = getEnvOrDefault(KUBEFLOW_NAMESPACE_ENV_KEY, DEFAULT_KUBEFLOW_NAMESPACE); KubeflowApi kubeflowApi = new KubeflowApi(pipelineVersion, operation, null, runName, - null, pipelineId, experimentId, null, null, experimentName, null, null); + null, pipelineId, experimentId, null, null, experimentName, null, null, namespace); KubeflowConnectorRequest kubeflowConnectorRequest = new KubeflowConnectorRequest( this.createOAuthAuthenticationClientCredentialsFlow(), // Authentication via OAuth this.getConfiguration(), diff --git a/src/it/de/viadee/bpm/camunda/connectors/kubeflow/integration/RunsIT.java b/src/it/de/viadee/bpm/camunda/connectors/kubeflow/integration/RunsIT.java index 4f546fd..aa2bf33 100644 --- a/src/it/de/viadee/bpm/camunda/connectors/kubeflow/integration/RunsIT.java +++ b/src/it/de/viadee/bpm/camunda/connectors/kubeflow/integration/RunsIT.java @@ -22,7 +22,7 @@ public class RunsIT extends BaseIntegrationTest { private void createRun(String pipelineVersion, String runName, String pipelineId, String experimentId) throws Exception { - var test = getExecutor(pipelineVersion, "start_run", null, pipelineId, experimentId, runName).execute(); + getExecutor(pipelineVersion, "start_run", null, pipelineId, experimentId, runName).execute(); } private List getNamesOfRuns(String pipelineVersion) throws Exception { diff --git a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/entities/input/KubeflowApi.java b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/entities/input/KubeflowApi.java index 178e0f7..eafaa7f 100644 --- a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/entities/input/KubeflowApi.java +++ b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/entities/input/KubeflowApi.java @@ -11,8 +11,6 @@ public record KubeflowApi( @NotEmpty String operation, - String namespace, - String runId, String runName, @@ -31,5 +29,8 @@ public record KubeflowApi( String experimentDescription, - Map httpHeaders + Map httpHeaders, + + String namespace + ) { } diff --git a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorCreateExperiment.java b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorCreateExperiment.java index 0fb6106..763fcf0 100644 --- a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorCreateExperiment.java +++ b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorCreateExperiment.java @@ -43,24 +43,27 @@ protected BodyPublisher buildPayloadForKubeflowEndpoint() { } private V1ApiExperiment getPayloadForEndpointV1() { - var v1ApiResourceReference = new V1ApiResourceReference() // TODO in singleUserMode no namespace is needed - .key(new V1ApiResourceKey() - .type(V1ApiResourceType.NAMESPACE) - .id(connectorRequest.getKubeflowapi().namespace())); - + var v1ApiResourceReference = super.isMultiUserMode ? createV1ApiResourceReference() : null; var v1ApiExperiment = new V1ApiExperiment() .name(getName()) .description(getDescription()) .addResourceReferencesItem(v1ApiResourceReference); - return v1ApiExperiment; } + private V1ApiResourceReference createV1ApiResourceReference() { + return new V1ApiResourceReference().key( + new V1ApiResourceKey() + .type(V1ApiResourceType.NAMESPACE) + .id(connectorRequest.getKubeflowapi().namespace())); + } + private Map getPayloadForEndpointV2() { + var namespace = super.isMultiUserMode ? connectorRequest.getKubeflowapi().namespace() : null; var v2Beta1Experiment = new V2beta1Experiment() .displayName(getName()) .description(getDescription()) - .namespace(connectorRequest.getKubeflowapi().namespace()); + .namespace(namespace); return JsonHelper.objectMapper.convertValue(v2Beta1Experiment, new TypeReference<>() {}); } diff --git a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorStartRun.java b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorStartRun.java index 493c0ec..040f0d6 100644 --- a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorStartRun.java +++ b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorStartRun.java @@ -1,5 +1,13 @@ package de.viadee.bpm.camunda.connectors.kubeflow.services; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.module.SimpleModule; +import de.viadee.bpm.camunda.connectors.kubeflow.utils.OffsetDateTimeDeserializer; +import io.swagger.client.model.V1ApiListExperimentsResponse; +import io.swagger.client.model.V2beta1Experiment; +import io.swagger.client.model.V2beta1ListExperimentsResponse; import java.io.IOException; import java.net.http.HttpRequest; import java.net.http.HttpRequest.BodyPublisher; @@ -31,6 +39,7 @@ import io.swagger.client.model.V2beta1Run; import io.swagger.client.model.V2beta1RuntimeConfig; import io.swagger.client.model.V2beta1RuntimeState; +import org.threeten.bp.OffsetDateTime; public class KubeflowConnectorExecutorStartRun extends KubeflowConnectorExecutor { @@ -160,10 +169,24 @@ private V2beta1Run getPayloadForEndpointV2() { private String getIdOfAlreadyStartedRunByName(String runName) throws InstantiationException, IllegalAccessException, IOException { + + /* + the namespace is a required param. when trying to find a run by its name. However, + when starting the operation "Start Run and Monitor" the properties panel does not + ask for the namespace to be specified. This is, because Kubeflow can derive the + namespace with the help of the experiment id (an experiment is always uniquely + assigned to a namespace). In order to start the operation "Get Run By Name", + we, thus, need to first find the experiment via the ID from the properties + panel and extract the namespace from the resulting object if it exists. + */ + var namespaceInWhichRunPotentiallyStarted = getNamespaceByExperimentId( + connectorRequest.getKubeflowapi().experimentId()); + + // use previously determined namespace to search for run by its name KubeflowApi kubeflowApi = new KubeflowApi(kubeflowApisEnum.getValue(), KubeflowApiOperationsEnum.GET_RUN_BY_NAME.getValue(), null, null, null, null, null, runName, null, null, null, connectorRequest.getKubeflowapi() - .httpHeaders()); + .httpHeaders(), namespaceInWhichRunPotentiallyStarted); KubeflowConnectorRequest getRunByNameConnectorRequest = new KubeflowConnectorRequest( connectorRequest.getAuthentication(), @@ -182,6 +205,51 @@ private String getIdOfAlreadyStartedRunByName(String runName) return id; } + private String getNamespaceByExperimentId(String experimentId) throws JsonProcessingException { + KubeflowApi kubeflowApi = new KubeflowApi(kubeflowApisEnum.getValue(), + KubeflowApiOperationsEnum.GET_EXPERIMENTS.getValue(), null, + null, null, null, null, null, null, null, null, connectorRequest.getKubeflowapi() + .httpHeaders(), null); + + KubeflowConnectorRequest getExperimentsConnectorRequest = new KubeflowConnectorRequest( + connectorRequest.getAuthentication(), + connectorRequest.getConfiguration(), kubeflowApi, + connectorRequest.getTimeout()); + + var response = ExecutionHandler.getExecutor(getExperimentsConnectorRequest, processInstanceKey).execute(); + + var objectMapper = new ObjectMapper() + .registerModule(new SimpleModule().addDeserializer(OffsetDateTime.class, + new OffsetDateTimeDeserializer())) + .setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE) + .enable(DeserializationFeature.READ_ENUMS_USING_TO_STRING); + + if (KubeflowApisEnum.PIPELINES_V1.getValue().equals(kubeflowApisEnum.getValue())) { + var experiment = objectMapper.readValue(response.body(), V1ApiListExperimentsResponse.class) + .getExperiments() + .stream() + .filter(exp -> exp.getId().equals(experimentId)) + .findFirst() + .orElse(null); + + return experiment == null ? null : + experiment.getResourceReferences().stream() + .filter(refs -> refs.getKey().getType().equals("NAMESPACE")) + .findFirst() + .map(V1ApiResourceReference::getKey) + .map(V1ApiResourceKey::getId) + .orElse(null); + } else { + return objectMapper.readValue(response.body(), V2beta1ListExperimentsResponse.class) + .getExperiments() + .stream() + .filter(exp -> exp.getExperimentId().equals(experimentId)) + .findFirst() + .map(V2beta1Experiment::getNamespace) + .orElse(null); + } + } + private HttpResponse retrieveRunStatusWithDelay(KubeflowCallable kubeflowCallable, long delay, boolean isPerformPreCheck) { HttpResponse httpResponse = null; diff --git a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/async/KubeflowCallable.java b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/async/KubeflowCallable.java index 73bc255..6453f0e 100644 --- a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/async/KubeflowCallable.java +++ b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/async/KubeflowCallable.java @@ -32,7 +32,7 @@ private HttpResponse getStatusOfRunById(String runId) throws InstantiationException, IllegalAccessException, IOException { KubeflowApi kubeflowApi = new KubeflowApi(connectorRequest.getKubeflowapi().api(), KubeflowApiOperationsEnum.GET_RUN_BY_ID.getValue(), runId, null, null, null, null, null, null, null, null, connectorRequest.getKubeflowapi() - .httpHeaders()); + .httpHeaders(), null); KubeflowConnectorRequest getRunByIdConnectorRequest = new KubeflowConnectorRequest(connectorRequest.getAuthentication(), connectorRequest.getConfiguration(), kubeflowApi, connectorRequest.getTimeout()); KubeflowConnectorExecutorGetRunById getRunByIdExecutor = (KubeflowConnectorExecutorGetRunById) ExecutionHandler.getExecutor( diff --git a/src/test/java/de/viadee/bpm/camunda/connectors/kubeflow/KubeflowConnectorFunctionTest.java b/src/test/java/de/viadee/bpm/camunda/connectors/kubeflow/KubeflowConnectorFunctionTest.java index 0ed2755..32a9637 100644 --- a/src/test/java/de/viadee/bpm/camunda/connectors/kubeflow/KubeflowConnectorFunctionTest.java +++ b/src/test/java/de/viadee/bpm/camunda/connectors/kubeflow/KubeflowConnectorFunctionTest.java @@ -42,7 +42,7 @@ void failOnExecuteWhenApiAndOperationEmpty(String api, String operation) throws new NoAuthentication(), new Configuration("http://localhost:8281", "testNamespace"), new KubeflowApi(api, operation, null, null, - null, null, null, null, null, null, null, null), + null, null, null, null, null, null, null, null, null), new Timeout(20) ); var context = OutboundConnectorContextBuilder.create() @@ -63,7 +63,7 @@ void failOnExecuteWhenApiAndOperationUnknown(String api, String operation) throw new NoAuthentication(), new Configuration("http://localhost:8281", "testNamespace"), new KubeflowApi(api, operation, null, null, - null, null, null, null, null, null, null, null), + null, null, null, null, null, null, null, null, null), new Timeout(20) ); var context = OutboundConnectorContextBuilder.create() @@ -85,7 +85,7 @@ void failOnExecuteWhenConfigurationHasInvalidURL(String url) throws Exception { new NoAuthentication(), new Configuration(url, "testNamespace"), new KubeflowApi("pipelinesV1", "get_pipelines", null, null, - null, null, null, null, null, null, null, null), + null, null, null, null, null, null, null, null, null), new Timeout(20) ); var context = OutboundConnectorContextBuilder.create() diff --git a/src/test/java/de/viadee/bpm/camunda/connectors/kubeflow/async/ExecutionHandlerTest.java b/src/test/java/de/viadee/bpm/camunda/connectors/kubeflow/async/ExecutionHandlerTest.java index 980ab54..7a9215e 100644 --- a/src/test/java/de/viadee/bpm/camunda/connectors/kubeflow/async/ExecutionHandlerTest.java +++ b/src/test/java/de/viadee/bpm/camunda/connectors/kubeflow/async/ExecutionHandlerTest.java @@ -24,9 +24,9 @@ void getExecutor(String pipelineOperation, String className) { // given KubeflowConnectorRequest kubeflowConnectorRequest = new KubeflowConnectorRequest( new NoAuthentication(), - new Configuration("http://localhost:8281", "testNamespace"), + new Configuration("http://localhost:8281", "multiUserMode"), new KubeflowApi("pipelinesV1", pipelineOperation, null, null, - null, null, null, null, null, null, null, null), + null, null, null, null, null, null, null, null, null), new Timeout(20) ); // when From 54d568f1fd88dd89e145aac682d3b4e493e0eddd Mon Sep 17 00:00:00 2001 From: Andre Strothmann Date: Sat, 16 Mar 2024 16:10:26 +0100 Subject: [PATCH 03/14] wip --- pom.xml | 1 - .../connectors/kubeflow/KubeflowConnectorFunctionTest.java | 6 +++--- .../kubeflow/services/KubeflowConnectorExecutorTest.java | 7 +++---- .../camunda/connectors/kubeflow/utils/AuthUtilTest.java | 2 +- 4 files changed, 7 insertions(+), 9 deletions(-) diff --git a/pom.xml b/pom.xml index cc7033d..6f370fe 100644 --- a/pom.xml +++ b/pom.xml @@ -255,7 +255,6 @@ http://localhost:8080 - testNamespace oauth_cc https//idProviderEndpoint.de diff --git a/src/test/java/de/viadee/bpm/camunda/connectors/kubeflow/KubeflowConnectorFunctionTest.java b/src/test/java/de/viadee/bpm/camunda/connectors/kubeflow/KubeflowConnectorFunctionTest.java index 32a9637..5fa988d 100644 --- a/src/test/java/de/viadee/bpm/camunda/connectors/kubeflow/KubeflowConnectorFunctionTest.java +++ b/src/test/java/de/viadee/bpm/camunda/connectors/kubeflow/KubeflowConnectorFunctionTest.java @@ -40,7 +40,7 @@ void failOnExecuteWhenApiAndOperationEmpty(String api, String operation) throws // given var kubeflowConnectorRequest = new KubeflowConnectorRequest( new NoAuthentication(), - new Configuration("http://localhost:8281", "testNamespace"), + new Configuration("http://localhost:8281", "multiUserMode"), new KubeflowApi(api, operation, null, null, null, null, null, null, null, null, null, null, null), new Timeout(20) @@ -61,7 +61,7 @@ void failOnExecuteWhenApiAndOperationUnknown(String api, String operation) throw // given var kubeflowConnectorRequest = new KubeflowConnectorRequest( new NoAuthentication(), - new Configuration("http://localhost:8281", "testNamespace"), + new Configuration("http://localhost:8281", "multiUserMode"), new KubeflowApi(api, operation, null, null, null, null, null, null, null, null, null, null, null), new Timeout(20) @@ -83,7 +83,7 @@ void failOnExecuteWhenConfigurationHasInvalidURL(String url) throws Exception { // given var kubeflowConnectorRequest = new KubeflowConnectorRequest( new NoAuthentication(), - new Configuration(url, "testNamespace"), + new Configuration(url, "multiUserMode"), new KubeflowApi("pipelinesV1", "get_pipelines", null, null, null, null, null, null, null, null, null, null, null), new Timeout(20) diff --git a/src/test/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorTest.java b/src/test/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorTest.java index 7b4511d..1b5763c 100644 --- a/src/test/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorTest.java +++ b/src/test/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorTest.java @@ -27,12 +27,12 @@ public void setup() { } @ParameterizedTest - @CsvSource(value = {"null, null", "'', ''", "' ', ' '"}, nullValues = {"null"}) - void readConfigurationParameterFromSystemEnvWhenBlank(String kubeflowUrl, String multiusernamespace) { + @CsvSource(value = {"null", "''", "' '"}, nullValues = {"null"}) + void readConfigurationParameterFromSystemEnvWhenBlank(String kubeflowUrl) { // given var kubeflowConnectorRequest = new KubeflowConnectorRequest( new NoAuthentication(), - new Configuration(kubeflowUrl, multiusernamespace), + new Configuration(kubeflowUrl, "multiUserMode"), kubeflowApiMock, new Timeout(20) ); @@ -41,6 +41,5 @@ void readConfigurationParameterFromSystemEnvWhenBlank(String kubeflowUrl, String KubeflowApisEnum.PIPELINES_V1, KubeflowApiOperationsEnum.GET_PIPELINES); // then assertEquals("http://localhost:8080", kubeflowConnectorExecutor.kubeflowUrl); // env. configured in pom.xml via surefire plugin - assertEquals("testNamespace", kubeflowConnectorExecutor.kubeflowMultiNs); // env. configured in pom.xml via surefire plugin } } \ No newline at end of file diff --git a/src/test/java/de/viadee/bpm/camunda/connectors/kubeflow/utils/AuthUtilTest.java b/src/test/java/de/viadee/bpm/camunda/connectors/kubeflow/utils/AuthUtilTest.java index 45825d7..9cc456c 100644 --- a/src/test/java/de/viadee/bpm/camunda/connectors/kubeflow/utils/AuthUtilTest.java +++ b/src/test/java/de/viadee/bpm/camunda/connectors/kubeflow/utils/AuthUtilTest.java @@ -28,7 +28,7 @@ void setAuthenticationParameters() { // given var kubeflowConnectorRequest = new KubeflowConnectorRequest( new EnvironmentAuthentication(), - new Configuration("http://localhost:8080", "testNamespace"), + new Configuration("http://localhost:8080", "multiUserMode"), kubeflowApiMock, new Timeout(20) ); From 090af52317b05b20fe0d7eea390e298c2144b36b Mon Sep 17 00:00:00 2001 From: Andre Strothmann Date: Sat, 16 Mar 2024 16:21:57 +0100 Subject: [PATCH 04/14] wip --- .../connectors/kubeflow/integration/BaseIntegrationTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/it/de/viadee/bpm/camunda/connectors/kubeflow/integration/BaseIntegrationTest.java b/src/it/de/viadee/bpm/camunda/connectors/kubeflow/integration/BaseIntegrationTest.java index 42d227e..a1344f4 100644 --- a/src/it/de/viadee/bpm/camunda/connectors/kubeflow/integration/BaseIntegrationTest.java +++ b/src/it/de/viadee/bpm/camunda/connectors/kubeflow/integration/BaseIntegrationTest.java @@ -83,6 +83,7 @@ protected KubeflowConnectorExecutor getExecutor(String pipelineVersion, String o String pipelineId, String experimentId, String runName) throws Exception { var namespace = getEnvOrDefault(KUBEFLOW_NAMESPACE_ENV_KEY, DEFAULT_KUBEFLOW_NAMESPACE); + System.out.println(namespace); // TODO KubeflowApi kubeflowApi = new KubeflowApi(pipelineVersion, operation, null, runName, null, pipelineId, experimentId, null, null, experimentName, null, null, namespace); KubeflowConnectorRequest kubeflowConnectorRequest = new KubeflowConnectorRequest( From 2c6ee858a88edaed0e27a974585b2620d0861c80 Mon Sep 17 00:00:00 2001 From: Andre Strothmann Date: Sat, 16 Mar 2024 16:46:44 +0100 Subject: [PATCH 05/14] wip --- .../kubeflow/integration/BaseIntegrationTest.java | 1 - .../kubeflow/enums/TypeOfUserModeEnum.java | 12 ++++++++++++ .../kubeflow/services/KubeflowConnectorExecutor.java | 3 ++- .../resources/kubeflow-environment/oauth2_proxy.yml | 2 +- 4 files changed, 15 insertions(+), 3 deletions(-) diff --git a/src/it/de/viadee/bpm/camunda/connectors/kubeflow/integration/BaseIntegrationTest.java b/src/it/de/viadee/bpm/camunda/connectors/kubeflow/integration/BaseIntegrationTest.java index a1344f4..42d227e 100644 --- a/src/it/de/viadee/bpm/camunda/connectors/kubeflow/integration/BaseIntegrationTest.java +++ b/src/it/de/viadee/bpm/camunda/connectors/kubeflow/integration/BaseIntegrationTest.java @@ -83,7 +83,6 @@ protected KubeflowConnectorExecutor getExecutor(String pipelineVersion, String o String pipelineId, String experimentId, String runName) throws Exception { var namespace = getEnvOrDefault(KUBEFLOW_NAMESPACE_ENV_KEY, DEFAULT_KUBEFLOW_NAMESPACE); - System.out.println(namespace); // TODO KubeflowApi kubeflowApi = new KubeflowApi(pipelineVersion, operation, null, runName, null, pipelineId, experimentId, null, null, experimentName, null, null, namespace); KubeflowConnectorRequest kubeflowConnectorRequest = new KubeflowConnectorRequest( diff --git a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/enums/TypeOfUserModeEnum.java b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/enums/TypeOfUserModeEnum.java index 7335cf2..fd92af7 100644 --- a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/enums/TypeOfUserModeEnum.java +++ b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/enums/TypeOfUserModeEnum.java @@ -1,5 +1,7 @@ package de.viadee.bpm.camunda.connectors.kubeflow.enums; +import java.util.Arrays; + public enum TypeOfUserModeEnum { SINGLE_USER_MODE("singleUserMode"), MULTI_USER_MODE("multiUserMode"); @@ -13,4 +15,14 @@ public enum TypeOfUserModeEnum { public String getValue() { return value; } + + public static TypeOfUserModeEnum fromValue(String value) { + return Arrays + .stream(values()) + .filter(typeOfUserModeEnum -> typeOfUserModeEnum.value.equals(value)) + .findFirst() + .orElseThrow( + () -> new IllegalArgumentException("Unbekannter Wert: " + value) + ); + } } diff --git a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutor.java b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutor.java index ddee238..b0de691 100644 --- a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutor.java +++ b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutor.java @@ -101,12 +101,13 @@ protected String getFilterString() { private void setConfigurationParameters() { var configPropertyGroup = connectorRequest.getConfiguration(); + var typeOfUserMode = TypeOfUserModeEnum.fromValue(configPropertyGroup.typeOfUserMode()); kubeflowUrl = System.getenv(KUBEFLOW_URL_ENV); if (configPropertyGroup != null) { kubeflowUrl = StringUtils.isBlank(configPropertyGroup.kubeflowUrl()) ? kubeflowUrl : configPropertyGroup.kubeflowUrl(); - isMultiUserMode = TypeOfUserModeEnum.MULTI_USER_MODE.equals(configPropertyGroup.typeOfUserMode()) ? true : false; + isMultiUserMode = TypeOfUserModeEnum.MULTI_USER_MODE.equals(typeOfUserMode) ? true : false; } if (kubeflowUrl == null) { diff --git a/src/test/resources/kubeflow-environment/oauth2_proxy.yml b/src/test/resources/kubeflow-environment/oauth2_proxy.yml index f9d2d84..420d1e4 100644 --- a/src/test/resources/kubeflow-environment/oauth2_proxy.yml +++ b/src/test/resources/kubeflow-environment/oauth2_proxy.yml @@ -24,7 +24,7 @@ providers: - aud emailClaim: email groupsClaim: groups - issuerURL: http://${KUBEFLOW_HOST}:30000/auth/realms/kubeflow + issuerURL: http://localhost:30000/auth/realms/kubeflow jwksURL: http://keycloak.keycloak.svc.cluster.local:8080/auth/realms/kubeflow/protocol/openid-connect/certs skipDiscovery: true userIDClaim: email From d747bbbac7869cf01f2f1236ccdbd90ee8b83005 Mon Sep 17 00:00:00 2001 From: Andre Strothmann Date: Sat, 16 Mar 2024 16:57:29 +0100 Subject: [PATCH 06/14] wip --- src/test/resources/kubeflow-environment/oauth2_proxy.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/resources/kubeflow-environment/oauth2_proxy.yml b/src/test/resources/kubeflow-environment/oauth2_proxy.yml index 420d1e4..f9d2d84 100644 --- a/src/test/resources/kubeflow-environment/oauth2_proxy.yml +++ b/src/test/resources/kubeflow-environment/oauth2_proxy.yml @@ -24,7 +24,7 @@ providers: - aud emailClaim: email groupsClaim: groups - issuerURL: http://localhost:30000/auth/realms/kubeflow + issuerURL: http://${KUBEFLOW_HOST}:30000/auth/realms/kubeflow jwksURL: http://keycloak.keycloak.svc.cluster.local:8080/auth/realms/kubeflow/protocol/openid-connect/certs skipDiscovery: true userIDClaim: email From 8c7abce3cdb3409527575837e79642a6eb8f4f28 Mon Sep 17 00:00:00 2001 From: Andre Strothmann Date: Sat, 16 Mar 2024 20:38:10 +0100 Subject: [PATCH 07/14] working version with getExpById --- element-templates/kubeflow-connector.json | 26 ++++++- .../enums/KubeflowApiOperationsEnum.java | 2 + ...lowConnectorExecutorGetExperimentById.java | 25 +++++++ .../KubeflowConnectorExecutorGetRunById.java | 12 --- .../KubeflowConnectorExecutorStartRun.java | 75 ++++++++++--------- .../services/async/ExecutionHandler.java | 4 + .../connectors/kubeflow/utils/RunUtil.java | 7 +- 7 files changed, 99 insertions(+), 52 deletions(-) create mode 100644 src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorGetExperimentById.java diff --git a/element-templates/kubeflow-connector.json b/element-templates/kubeflow-connector.json index 06d0d85..70f34d0 100644 --- a/element-templates/kubeflow-connector.json +++ b/element-templates/kubeflow-connector.json @@ -390,6 +390,10 @@ "name": "Get Runs", "value": "get_runs" }, + { + "name": "Get Experiment by ID", + "value": "get_experiment_by_id" + }, { "name": "Get Run by ID", "value": "get_run_by_id" @@ -587,6 +591,27 @@ ] } }, + { + "id": "kubeflowapi.experimentId", + "label": "Experiment ID", + "description": "Specify the id of the experiment.", + "type": "String", + "group": "kubeflowapi", + "feel": "optional", + "constraints": { + "notEmpty": true + }, + "binding": { + "type": "zeebe:input", + "name": "kubeflowapi.experimentId" + }, + "condition": { + "property": "kubeflowapi.operation", + "oneOf": [ + "get_experiment_by_id" + ] + } + }, { "id": "kubeflowapi.runName", "label": "Run Name", @@ -681,7 +706,6 @@ "type": "String", "group": "output", "feel": "required", - "value": "={result: state}", "binding": { "type": "zeebe:taskHeader", "key": "resultExpression" diff --git a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/enums/KubeflowApiOperationsEnum.java b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/enums/KubeflowApiOperationsEnum.java index 16df081..307ef14 100644 --- a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/enums/KubeflowApiOperationsEnum.java +++ b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/enums/KubeflowApiOperationsEnum.java @@ -18,6 +18,8 @@ public enum KubeflowApiOperationsEnum { START_RUN_AND_MONITOR("start_run_and_monitor","POST", false, "/pipeline/apis/%s/runs"), CREATE_EXPERIMENT("create_experiment", "POST", true, + "/pipeline/apis/%s/experiments"), + GET_EXPERIMENT_BY_ID("get_experiment_by_id", "GET", false, "/pipeline/apis/%s/experiments"); private final String value; diff --git a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorGetExperimentById.java b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorGetExperimentById.java new file mode 100644 index 0000000..1f12d6f --- /dev/null +++ b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorGetExperimentById.java @@ -0,0 +1,25 @@ +package de.viadee.bpm.camunda.connectors.kubeflow.services; + +import com.fasterxml.jackson.core.JsonProcessingException; +import de.viadee.bpm.camunda.connectors.kubeflow.entities.KubeflowConnectorRequest; +import de.viadee.bpm.camunda.connectors.kubeflow.enums.KubeflowApiOperationsEnum; +import de.viadee.bpm.camunda.connectors.kubeflow.enums.KubeflowApisEnum; +import de.viadee.bpm.camunda.connectors.kubeflow.utils.RunUtil; +import java.net.http.HttpResponse; +import org.apache.http.client.utils.URIBuilder; + +public class KubeflowConnectorExecutorGetExperimentById extends KubeflowConnectorExecutor { + + public KubeflowConnectorExecutorGetExperimentById(KubeflowConnectorRequest connectorRequest, long processInstanceKey, KubeflowApisEnum kubeflowApisEnum, + KubeflowApiOperationsEnum kubeflowApiOperationsEnum) { + super(connectorRequest, processInstanceKey, kubeflowApisEnum, kubeflowApiOperationsEnum); + } + + @Override + protected void addKubeflowUrlPath(URIBuilder uriBuilder) { + var kubeflowUrlPath = String.format("%s/%s", + String.format(kubeflowApiOperationsEnum.getApiUrl(), kubeflowApisEnum.getUrlPathVersion()), + connectorRequest.getKubeflowapi().experimentId()); + uriBuilder.setPath(kubeflowUrlPath); + } +} diff --git a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorGetRunById.java b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorGetRunById.java index 421adaa..c29b5f2 100644 --- a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorGetRunById.java +++ b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorGetRunById.java @@ -52,16 +52,4 @@ protected void addKubeflowUrlPath(URIBuilder uriBuilder) { connectorRequest.getKubeflowapi().runId()); uriBuilder.setPath(kubeflowUrlPath); } - - public V1ApiRun getRunByIdV1Typed(HttpClient httpClient) - throws IOException, InstantiationException, IllegalAccessException { - var httpResponse = this.execute(); - return runUtil.readV1RunAsTypedResponse(httpResponse); - } - - public V2beta1Run getRunByIdV2Typed(HttpClient httpClient) - throws IOException, InstantiationException, IllegalAccessException { - var httpResponse = this.execute(); - return runUtil.readV2RunAsTypedResponse(httpResponse); - } } diff --git a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorStartRun.java b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorStartRun.java index 040f0d6..c7afd20 100644 --- a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorStartRun.java +++ b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorStartRun.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.PropertyNamingStrategies; import com.fasterxml.jackson.databind.module.SimpleModule; import de.viadee.bpm.camunda.connectors.kubeflow.utils.OffsetDateTimeDeserializer; +import io.swagger.client.model.V1ApiExperiment; import io.swagger.client.model.V1ApiListExperimentsResponse; import io.swagger.client.model.V2beta1Experiment; import io.swagger.client.model.V2beta1ListExperimentsResponse; @@ -39,6 +40,7 @@ import io.swagger.client.model.V2beta1Run; import io.swagger.client.model.V2beta1RuntimeConfig; import io.swagger.client.model.V2beta1RuntimeState; +import java.util.stream.Collectors; import org.threeten.bp.OffsetDateTime; public class KubeflowConnectorExecutorStartRun extends KubeflowConnectorExecutor { @@ -58,6 +60,12 @@ public class KubeflowConnectorExecutorStartRun extends KubeflowConnectorExecutor private RunUtil runUtil; private String runName; + private static ObjectMapper experimentObjectMapper = new ObjectMapper() + .registerModule(new SimpleModule().addDeserializer(OffsetDateTime.class, + new OffsetDateTimeDeserializer())) + .setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE) + .enable(DeserializationFeature.READ_ENUMS_USING_TO_STRING); + public KubeflowConnectorExecutorStartRun(KubeflowConnectorRequest connectorRequest, long processInstanceKey, KubeflowApisEnum kubeflowApisEnum, KubeflowApiOperationsEnum kubeflowApiOperationsEnum) { @@ -179,6 +187,7 @@ namespace with the help of the experiment id (an experiment is always uniquely we, thus, need to first find the experiment via the ID from the properties panel and extract the namespace from the resulting object if it exists. */ + // TODO only relevant for multi user mode var namespaceInWhichRunPotentiallyStarted = getNamespaceByExperimentId( connectorRequest.getKubeflowapi().experimentId()); @@ -206,48 +215,42 @@ namespace with the help of the experiment id (an experiment is always uniquely } private String getNamespaceByExperimentId(String experimentId) throws JsonProcessingException { - KubeflowApi kubeflowApi = new KubeflowApi(kubeflowApisEnum.getValue(), - KubeflowApiOperationsEnum.GET_EXPERIMENTS.getValue(), null, - null, null, null, null, null, null, null, null, connectorRequest.getKubeflowapi() + + // get experiment + KubeflowApi getExperimentKubeflowApi = new KubeflowApi(kubeflowApisEnum.getValue(), + KubeflowApiOperationsEnum.GET_EXPERIMENT_BY_ID.getValue(), null, + null, null, null, experimentId, null, null, null, null, connectorRequest.getKubeflowapi() .httpHeaders(), null); - KubeflowConnectorRequest getExperimentsConnectorRequest = new KubeflowConnectorRequest( + KubeflowConnectorRequest getExperimentConnectorRequest = new KubeflowConnectorRequest( connectorRequest.getAuthentication(), - connectorRequest.getConfiguration(), kubeflowApi, + connectorRequest.getConfiguration(), getExperimentKubeflowApi, connectorRequest.getTimeout()); - var response = ExecutionHandler.getExecutor(getExperimentsConnectorRequest, processInstanceKey).execute(); - - var objectMapper = new ObjectMapper() - .registerModule(new SimpleModule().addDeserializer(OffsetDateTime.class, - new OffsetDateTimeDeserializer())) - .setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE) - .enable(DeserializationFeature.READ_ENUMS_USING_TO_STRING); - - if (KubeflowApisEnum.PIPELINES_V1.getValue().equals(kubeflowApisEnum.getValue())) { - var experiment = objectMapper.readValue(response.body(), V1ApiListExperimentsResponse.class) - .getExperiments() - .stream() - .filter(exp -> exp.getId().equals(experimentId)) - .findFirst() - .orElse(null); - - return experiment == null ? null : - experiment.getResourceReferences().stream() - .filter(refs -> refs.getKey().getType().equals("NAMESPACE")) - .findFirst() - .map(V1ApiResourceReference::getKey) - .map(V1ApiResourceKey::getId) - .orElse(null); - } else { - return objectMapper.readValue(response.body(), V2beta1ListExperimentsResponse.class) - .getExperiments() - .stream() - .filter(exp -> exp.getExperimentId().equals(experimentId)) - .findFirst() - .map(V2beta1Experiment::getNamespace) - .orElse(null); + var experimentHttpResponse = ExecutionHandler.getExecutor(getExperimentConnectorRequest, processInstanceKey).execute(); + + // get namespace + String namespace; + try { + if (KubeflowApisEnum.PIPELINES_V1.getValue().equals(kubeflowApisEnum.getValue())) { + var v1Experiment = experimentObjectMapper.readValue(experimentHttpResponse.body(), V1ApiExperiment.class); + namespace = v1Experiment.getResourceReferences() == null ? null : + v1Experiment.getResourceReferences() + .stream() + .filter(refs -> refs.getKey().getType().getValue().equals("NAMESPACE")) + .findFirst() + .map(V1ApiResourceReference::getKey) + .map(V1ApiResourceKey::getId) + .orElse(null); + } else { + var v2Experiment = experimentObjectMapper.readValue(experimentHttpResponse.body(), V2beta1Experiment.class); + namespace = v2Experiment.getNamespace(); + } + } catch (Exception e) { + namespace = null; } + + return namespace; } private HttpResponse retrieveRunStatusWithDelay(KubeflowCallable kubeflowCallable, long delay, diff --git a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/async/ExecutionHandler.java b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/async/ExecutionHandler.java index 784f265..10f0859 100644 --- a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/async/ExecutionHandler.java +++ b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/async/ExecutionHandler.java @@ -1,5 +1,6 @@ package de.viadee.bpm.camunda.connectors.kubeflow.services.async; +import de.viadee.bpm.camunda.connectors.kubeflow.services.KubeflowConnectorExecutorGetExperimentById; import java.net.http.HttpResponse; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -55,6 +56,9 @@ public static KubeflowConnectorExecutor getExecutor(KubeflowConnectorRequest con case CREATE_EXPERIMENT: return new KubeflowConnectorExecutorCreateExperiment(connectorRequest, processInstanceKey, selectedApi, KubeflowApiOperationsEnum.CREATE_EXPERIMENT); + case GET_EXPERIMENT_BY_ID: + return new KubeflowConnectorExecutorGetExperimentById(connectorRequest, processInstanceKey, selectedApi, + KubeflowApiOperationsEnum.GET_EXPERIMENT_BY_ID); default: // OTHER throw new RuntimeException("Selected operation is not supported"); } diff --git a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/utils/RunUtil.java b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/utils/RunUtil.java index 4c8286b..cdbb16c 100644 --- a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/utils/RunUtil.java +++ b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/utils/RunUtil.java @@ -1,5 +1,6 @@ package de.viadee.bpm.camunda.connectors.kubeflow.utils; +import io.swagger.client.model.V1ApiRunDetail; import java.net.http.HttpResponse; import java.util.ArrayList; import java.util.List; @@ -37,12 +38,12 @@ public V1ApiListRunsResponse readV1RunListAsTypedResponse(HttpResponse r public V1ApiRun readV1RunAsTypedResponse(HttpResponse runResponse) throws JsonProcessingException { - V1ApiRun v1ApiRunResponse = null; + V1ApiRunDetail v1ApiRunResponse = null; if (!JsonHelper.getAsJsonElement(runResponse.body(), new ObjectMapper()).isEmpty()) { v1ApiRunResponse = runMapper - .readValue(runResponse.body(), V1ApiRun.class); + .readValue(runResponse.body(), V1ApiRunDetail.class); } - return v1ApiRunResponse; + return v1ApiRunResponse.getRun(); } public String extractIdFromV1RunResponse(HttpResponse runResponse) throws JsonProcessingException { From cfd0717a0a6164b6e3726d23fa0b7c1afccb909a Mon Sep 17 00:00:00 2001 From: Andre Strothmann Date: Sun, 17 Mar 2024 10:01:27 +0100 Subject: [PATCH 08/14] fix: unit tests --- .../bpm/camunda/connectors/kubeflow/utils/RunUtil.java | 6 +++--- .../bpm/camunda/connectors/kubeflow/utils/RunUtilTest.java | 4 ++-- src/test/resources/kubeflow-environment/oauth2_proxy.yml | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/utils/RunUtil.java b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/utils/RunUtil.java index cdbb16c..56ef9ed 100644 --- a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/utils/RunUtil.java +++ b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/utils/RunUtil.java @@ -38,12 +38,12 @@ public V1ApiListRunsResponse readV1RunListAsTypedResponse(HttpResponse r public V1ApiRun readV1RunAsTypedResponse(HttpResponse runResponse) throws JsonProcessingException { - V1ApiRunDetail v1ApiRunResponse = null; + V1ApiRun v1ApiRunResponse = null; if (!JsonHelper.getAsJsonElement(runResponse.body(), new ObjectMapper()).isEmpty()) { v1ApiRunResponse = runMapper - .readValue(runResponse.body(), V1ApiRunDetail.class); + .readValue(runResponse.body(), V1ApiRunDetail.class).getRun(); } - return v1ApiRunResponse.getRun(); + return v1ApiRunResponse; } public String extractIdFromV1RunResponse(HttpResponse runResponse) throws JsonProcessingException { diff --git a/src/test/java/de/viadee/bpm/camunda/connectors/kubeflow/utils/RunUtilTest.java b/src/test/java/de/viadee/bpm/camunda/connectors/kubeflow/utils/RunUtilTest.java index 99b0d0f..a007591 100644 --- a/src/test/java/de/viadee/bpm/camunda/connectors/kubeflow/utils/RunUtilTest.java +++ b/src/test/java/de/viadee/bpm/camunda/connectors/kubeflow/utils/RunUtilTest.java @@ -33,7 +33,7 @@ void readV1RunListAsTypedResponse() throws JsonProcessingException { @Test void readV1RunAsTypedResponse() throws JsonProcessingException { // given - when(httpResponseMock.body()).thenReturn("{\"id\":\"1\",\"name\":\"test\"}"); + when(httpResponseMock.body()).thenReturn("{\"run\":{\"id\":\"1\",\"name\":\"test\"}}"); // when var result = runUtil.readV1RunAsTypedResponse(httpResponseMock); // then @@ -45,7 +45,7 @@ void readV1RunAsTypedResponse() throws JsonProcessingException { @Test void extractIdFromV1RunResponse() throws JsonProcessingException { // given - when(httpResponseMock.body()).thenReturn("{\"id\":\"1\",\"name\":\"test\"}"); + when(httpResponseMock.body()).thenReturn("{\"run\":{\"id\":\"1\",\"name\":\"test\"}}"); // when var result = runUtil.extractIdFromV1RunResponse(httpResponseMock); // then diff --git a/src/test/resources/kubeflow-environment/oauth2_proxy.yml b/src/test/resources/kubeflow-environment/oauth2_proxy.yml index f9d2d84..420d1e4 100644 --- a/src/test/resources/kubeflow-environment/oauth2_proxy.yml +++ b/src/test/resources/kubeflow-environment/oauth2_proxy.yml @@ -24,7 +24,7 @@ providers: - aud emailClaim: email groupsClaim: groups - issuerURL: http://${KUBEFLOW_HOST}:30000/auth/realms/kubeflow + issuerURL: http://localhost:30000/auth/realms/kubeflow jwksURL: http://keycloak.keycloak.svc.cluster.local:8080/auth/realms/kubeflow/protocol/openid-connect/certs skipDiscovery: true userIDClaim: email From 3a83a095b6e5993a30505339fd4b5e3712755811 Mon Sep 17 00:00:00 2001 From: Andre Strothmann Date: Sun, 17 Mar 2024 10:02:34 +0100 Subject: [PATCH 09/14] fix: oauth2_proxy --- src/test/resources/kubeflow-environment/oauth2_proxy.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/resources/kubeflow-environment/oauth2_proxy.yml b/src/test/resources/kubeflow-environment/oauth2_proxy.yml index 420d1e4..f9d2d84 100644 --- a/src/test/resources/kubeflow-environment/oauth2_proxy.yml +++ b/src/test/resources/kubeflow-environment/oauth2_proxy.yml @@ -24,7 +24,7 @@ providers: - aud emailClaim: email groupsClaim: groups - issuerURL: http://localhost:30000/auth/realms/kubeflow + issuerURL: http://${KUBEFLOW_HOST}:30000/auth/realms/kubeflow jwksURL: http://keycloak.keycloak.svc.cluster.local:8080/auth/realms/kubeflow/protocol/openid-connect/certs skipDiscovery: true userIDClaim: email From 8d34ab28bb3ec52af03305961dbb0ede10fabc03 Mon Sep 17 00:00:00 2001 From: Andre Strothmann Date: Sun, 17 Mar 2024 10:15:29 +0100 Subject: [PATCH 10/14] add exception handling --- .../services/KubeflowConnectorExecutorStartRun.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorStartRun.java b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorStartRun.java index c7afd20..2f43b9e 100644 --- a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorStartRun.java +++ b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorStartRun.java @@ -187,9 +187,8 @@ namespace with the help of the experiment id (an experiment is always uniquely we, thus, need to first find the experiment via the ID from the properties panel and extract the namespace from the resulting object if it exists. */ - // TODO only relevant for multi user mode - var namespaceInWhichRunPotentiallyStarted = getNamespaceByExperimentId( - connectorRequest.getKubeflowapi().experimentId()); + var namespaceInWhichRunPotentiallyStarted = super.isMultiUserMode? getNamespaceByExperimentId( + connectorRequest.getKubeflowapi().experimentId()) : null; // use previously determined namespace to search for run by its name KubeflowApi kubeflowApi = new KubeflowApi(kubeflowApisEnum.getValue(), @@ -214,7 +213,7 @@ namespace with the help of the experiment id (an experiment is always uniquely return id; } - private String getNamespaceByExperimentId(String experimentId) throws JsonProcessingException { + private String getNamespaceByExperimentId(String experimentId) { // get experiment KubeflowApi getExperimentKubeflowApi = new KubeflowApi(kubeflowApisEnum.getValue(), @@ -247,7 +246,7 @@ private String getNamespaceByExperimentId(String experimentId) throws JsonProces namespace = v2Experiment.getNamespace(); } } catch (Exception e) { - namespace = null; + throw new RuntimeException("Could not derive namespace from experiment ID: " + e); } return namespace; From 2dd3fd08274a7dd667dfacb830f08a4342e7590b Mon Sep 17 00:00:00 2001 From: Andre Strothmann Date: Sun, 17 Mar 2024 10:18:28 +0100 Subject: [PATCH 11/14] chore: commenting --- .../KubeflowConnectorExecutorStartRun.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorStartRun.java b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorStartRun.java index 2f43b9e..f1f8d93 100644 --- a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorStartRun.java +++ b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorStartRun.java @@ -179,13 +179,14 @@ private String getIdOfAlreadyStartedRunByName(String runName) throws InstantiationException, IllegalAccessException, IOException { /* - the namespace is a required param. when trying to find a run by its name. However, - when starting the operation "Start Run and Monitor" the properties panel does not - ask for the namespace to be specified. This is, because Kubeflow can derive the - namespace with the help of the experiment id (an experiment is always uniquely - assigned to a namespace). In order to start the operation "Get Run By Name", - we, thus, need to first find the experiment via the ID from the properties - panel and extract the namespace from the resulting object if it exists. + in case of multi-user mode, the namespace is a required param. when trying to find + a run by its name. However, when starting the operation "Start Run and Monitor" + the properties panel does not ask for the namespace to be specified. This is, + because Kubeflow can derive the namespace with the help of the experiment id + (an experiment is always uniquely assigned to a namespace). In order to + start the operation "Get Run By Name", we, thus, need to first find + the experiment via the ID from the properties panel and extract + the namespace from the resulting object if it exists. */ var namespaceInWhichRunPotentiallyStarted = super.isMultiUserMode? getNamespaceByExperimentId( connectorRequest.getKubeflowapi().experimentId()) : null; From 0ce1fba72f2fd59f8f4b2dbb3cd02c5577d18df2 Mon Sep 17 00:00:00 2001 From: Andre Strothmann Date: Sun, 17 Mar 2024 10:55:56 +0100 Subject: [PATCH 12/14] fix: runName in 'Get Run By Name' vs in 'Start Run and Monitor' --- element-templates/kubeflow-connector.json | 23 +++++++++++++++++- ...KubeflowConnectorExecutorGetRunByName.java | 8 +++---- .../KubeflowConnectorExecutorStartRun.java | 24 ++++++++----------- 3 files changed, 36 insertions(+), 19 deletions(-) diff --git a/element-templates/kubeflow-connector.json b/element-templates/kubeflow-connector.json index 70f34d0..aeb0b62 100644 --- a/element-templates/kubeflow-connector.json +++ b/element-templates/kubeflow-connector.json @@ -630,12 +630,33 @@ "condition": { "property": "kubeflowapi.operation", "oneOf": [ - "get_run_by_name", "start_run", "start_run_and_monitor" ] } }, + { + "id": "kubeflowapi.runName", + "label": "Run Name", + "description": "Specify the name of the run.", + "type": "String", + "group": "kubeflowapi", + "feel": "optional", + "optional": false, + "constraints": { + "notEmpty": true + }, + "binding": { + "type": "zeebe:input", + "name": "kubeflowapi.runName" + }, + "condition": { + "property": "kubeflowapi.operation", + "oneOf": [ + "get_run_by_name" + ] + } + }, { "id": "kubeflowapi.runParameters", "label": "Run parameters", diff --git a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorGetRunByName.java b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorGetRunByName.java index 1cfc642..c4cbb90 100644 --- a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorGetRunByName.java +++ b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorGetRunByName.java @@ -67,7 +67,7 @@ public HttpResponse execute() { } public Optional getRunByNameV1Typed() - throws IOException, InstantiationException, IllegalAccessException { + throws IOException { var httpResponse = this.execute(); if (JsonHelper.getAsJsonElement(httpResponse.body(), new ObjectMapper()).isEmpty()) { return Optional.ofNullable(null); @@ -78,7 +78,7 @@ public Optional getRunByNameV1Typed() } public Optional getRunByNameV2Typed() - throws IOException, InstantiationException, IllegalAccessException { + throws IOException { var httpResponse = this.execute(); if (JsonHelper.getAsJsonElement(httpResponse.body(), new ObjectMapper()).isEmpty()) { return Optional.ofNullable(null); @@ -92,7 +92,7 @@ private V1Filter getV1Filter() { var predicate = new V1FilterPredicate() .op(V2beta1PredicateOperation.EQUALS) .key(FILTER_BY_KEY) - .stringValue(connectorRequest.getKubeflowapi().filter()); + .stringValue(connectorRequest.getKubeflowapi().runName()); return new V1Filter() .addPredicatesItem(predicate); @@ -102,7 +102,7 @@ private V2beta1Filter getV2Filter() { var predicate = new V2beta1Predicate() .operation(V2beta1PredicateOperation.EQUALS) .key(FILTER_BY_KEY) - .stringValue(connectorRequest.getKubeflowapi().filter()); + .stringValue(connectorRequest.getKubeflowapi().runName()); return new V2beta1Filter() .addPredicatesItem(predicate); diff --git a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorStartRun.java b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorStartRun.java index f1f8d93..3f9012e 100644 --- a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorStartRun.java +++ b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutorStartRun.java @@ -6,9 +6,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import de.viadee.bpm.camunda.connectors.kubeflow.utils.OffsetDateTimeDeserializer; import io.swagger.client.model.V1ApiExperiment; -import io.swagger.client.model.V1ApiListExperimentsResponse; import io.swagger.client.model.V2beta1Experiment; -import io.swagger.client.model.V2beta1ListExperimentsResponse; import java.io.IOException; import java.net.http.HttpRequest; import java.net.http.HttpRequest.BodyPublisher; @@ -40,7 +38,6 @@ import io.swagger.client.model.V2beta1Run; import io.swagger.client.model.V2beta1RuntimeConfig; import io.swagger.client.model.V2beta1RuntimeState; -import java.util.stream.Collectors; import org.threeten.bp.OffsetDateTime; public class KubeflowConnectorExecutorStartRun extends KubeflowConnectorExecutor { @@ -58,7 +55,7 @@ public class KubeflowConnectorExecutorStartRun extends KubeflowConnectorExecutor V2beta1RuntimeState.CANCELED.getValue()); private RunUtil runUtil; - private String runName; + private String runNameWithKeyAndSuffix; private static ObjectMapper experimentObjectMapper = new ObjectMapper() .registerModule(new SimpleModule().addDeserializer(OffsetDateTime.class, @@ -75,8 +72,8 @@ public KubeflowConnectorExecutorStartRun(KubeflowConnectorRequest connectorReque @Override protected BodyPublisher buildPayloadForKubeflowEndpoint() { - //define runName - runName = processInstanceKey+"_"+connectorRequest.getKubeflowapi().runName(); + // derive complete name of run using process instance key and suffix from prop. panel + runNameWithKeyAndSuffix = processInstanceKey+"_"+connectorRequest.getKubeflowapi().runName(); try { if (KubeflowApisEnum.PIPELINES_V1.equals(kubeflowApisEnum)) { @@ -99,7 +96,7 @@ public HttpResponse execute() { } else if (kubeflowApiOperationsEnum.equals(KubeflowApiOperationsEnum.START_RUN_AND_MONITOR)) { String idOfAlreadyStartedRun; try { - idOfAlreadyStartedRun = getIdOfAlreadyStartedRunByName(this.runName); + idOfAlreadyStartedRun = getIdOfAlreadyStartedRunByName(); } catch (InstantiationException | IllegalAccessException | IOException e) { throw new RuntimeException(e); } @@ -152,7 +149,7 @@ private V1ApiRun getPayloadForEndpointV1() { .id(connectorRequest.getKubeflowapi().experimentId())); var v1ApiRun = new V1ApiRun() - .name(this.runName) + .name(this.runNameWithKeyAndSuffix) .pipelineSpec(v1ApiPipelineSpec) .addResourceReferencesItem(v1ApiResourceReference); @@ -167,7 +164,7 @@ private V2beta1Run getPayloadForEndpointV2() { .parameters(connectorRequest.getKubeflowapi().runParameters()); var v2ApiRun = new V2beta1Run() - .displayName(this.runName) + .displayName(this.runNameWithKeyAndSuffix) .runtimeConfig(v2beta1RuntimeConfig) .pipelineVersionReference(v2beta1PipelineVersionReference) .experimentId(connectorRequest.getKubeflowapi().experimentId()); @@ -175,7 +172,7 @@ private V2beta1Run getPayloadForEndpointV2() { return v2ApiRun; } - private String getIdOfAlreadyStartedRunByName(String runName) + private String getIdOfAlreadyStartedRunByName() throws InstantiationException, IllegalAccessException, IOException { /* @@ -194,7 +191,7 @@ private String getIdOfAlreadyStartedRunByName(String runName) // use previously determined namespace to search for run by its name KubeflowApi kubeflowApi = new KubeflowApi(kubeflowApisEnum.getValue(), KubeflowApiOperationsEnum.GET_RUN_BY_NAME.getValue(), null, - null, null, null, null, runName, null, null, null, connectorRequest.getKubeflowapi() + runNameWithKeyAndSuffix, null, null, null, null, null, null, null, connectorRequest.getKubeflowapi() .httpHeaders(), namespaceInWhichRunPotentiallyStarted); KubeflowConnectorRequest getRunByNameConnectorRequest = new KubeflowConnectorRequest( @@ -203,9 +200,8 @@ private String getIdOfAlreadyStartedRunByName(String runName) connectorRequest.getTimeout()); KubeflowConnectorExecutorGetRunByName getRunByNameExecutor = (KubeflowConnectorExecutorGetRunByName) ExecutionHandler - .getExecutor( - getRunByNameConnectorRequest, - processInstanceKey); + .getExecutor(getRunByNameConnectorRequest, processInstanceKey); + String id = KubeflowApisEnum.PIPELINES_V1.equals(kubeflowApisEnum) ? getRunByNameExecutor.getRunByNameV1Typed().map(V1ApiRun::getId).orElse(null) : getRunByNameExecutor.getRunByNameV2Typed().map(V2beta1Run::getRunId) From ed5a9797313ed65ed95e23c1161bfb4d8f598161 Mon Sep 17 00:00:00 2001 From: Andre Strothmann Date: Sun, 17 Mar 2024 11:02:36 +0100 Subject: [PATCH 13/14] chore: naming von run name label in template --- element-templates/kubeflow-connector.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/element-templates/kubeflow-connector.json b/element-templates/kubeflow-connector.json index aeb0b62..f78f873 100644 --- a/element-templates/kubeflow-connector.json +++ b/element-templates/kubeflow-connector.json @@ -614,7 +614,7 @@ }, { "id": "kubeflowapi.runName", - "label": "Run Name", + "label": "Suffix of Run Name", "description": "Specify the suffix of the name of the run.", "type": "String", "group": "kubeflowapi", From 76c36e6e4aa33dc18af8431a393e77dd11c8247f Mon Sep 17 00:00:00 2001 From: Andre Strothmann Date: Tue, 19 Mar 2024 18:16:50 +0100 Subject: [PATCH 14/14] chore: only of multi user mode --- element-templates/kubeflow-connector.json | 7 +------ .../connectors/kubeflow/enums/TypeOfUserModeEnum.java | 1 - .../kubeflow/services/KubeflowConnectorExecutor.java | 2 +- 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/element-templates/kubeflow-connector.json b/element-templates/kubeflow-connector.json index f78f873..52bf60e 100644 --- a/element-templates/kubeflow-connector.json +++ b/element-templates/kubeflow-connector.json @@ -76,19 +76,14 @@ { "id": "configuration.typeOfUserMode", "label": "Type of User Mode", - "description": "Specify the type of user mode in use.", "type": "Dropdown", "group": "configuration", - "value": "singleUserMode", + "value": "multiUserMode", "optional": false, "constraints": { "notEmpty": true }, "choices": [ - { - "name": "Single-User Mode", - "value": "singleUserMode" - }, { "name": "Multi-User Mode", "value": "multiUserMode" diff --git a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/enums/TypeOfUserModeEnum.java b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/enums/TypeOfUserModeEnum.java index fd92af7..921e831 100644 --- a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/enums/TypeOfUserModeEnum.java +++ b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/enums/TypeOfUserModeEnum.java @@ -3,7 +3,6 @@ import java.util.Arrays; public enum TypeOfUserModeEnum { - SINGLE_USER_MODE("singleUserMode"), MULTI_USER_MODE("multiUserMode"); private final String value; diff --git a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutor.java b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutor.java index b0de691..8cf7602 100644 --- a/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutor.java +++ b/src/main/java/de/viadee/bpm/camunda/connectors/kubeflow/services/KubeflowConnectorExecutor.java @@ -107,7 +107,7 @@ private void setConfigurationParameters() { if (configPropertyGroup != null) { kubeflowUrl = StringUtils.isBlank(configPropertyGroup.kubeflowUrl()) ? kubeflowUrl : configPropertyGroup.kubeflowUrl(); - isMultiUserMode = TypeOfUserModeEnum.MULTI_USER_MODE.equals(typeOfUserMode) ? true : false; + isMultiUserMode = TypeOfUserModeEnum.MULTI_USER_MODE.equals(typeOfUserMode); } if (kubeflowUrl == null) {