Skip to content

Commit

Permalink
[Backport stable/8.4] Fix identity-sdk concurrency issue (#24269)
Browse files Browse the repository at this point in the history
# Description
Backport of #24196 to `stable/8.4`.

relates to #23853
original author: @koevskinikola
  • Loading branch information
koevskinikola authored Nov 1, 2024
2 parents 1453eb2 + 1801b9a commit c3b1901
Show file tree
Hide file tree
Showing 13 changed files with 828 additions and 29 deletions.
32 changes: 31 additions & 1 deletion dist/src/main/config/broker.standalone.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,37 @@
# - id: example-interceptor
# className: com.acme.ExampleInterceptor
# jarPath: /path/to/interceptor/example-interceptor.jar

# experimental:
# Be aware that all configuration's which are part of the experimental section
# are subject to change and can be dropped at any time.
# It might be that also some of them are actually dangerous so be aware when you change one of these!
#
# identityRequest:
# Enables the Identity Request wrapper feature, for caching and throttling large numbers of concurrent
# requests to the Identity service. This feature will cache an Identity JWT token and a list of tenant IDs
# that are associated with it. The feature will also reject requests if the tenant cache is unable to reduce
# the number of concurrent requests.This feature can be enabled if you start to see UNAVAILABLE errors
# due to the Identity service being overloaded in your Zeebe client.
#
# enabled: false
# Enables the Identity Request wrapper feature.
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_EXPERIMENTAL_IDENTITYREQUEST_ENABLED.
#
# tenantCacheTtl: 5000
# The time-to-live in milliseconds for each entry the tenant cache.
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_EXPERIMENTAL_IDENTITYREQUEST_TENANTCACHETTL.
#
# tenantCacheSize: 200
# The maximum number of entries the tenant cache can hold.
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_EXPERIMENTAL_IDENTITYREQUEST_TENANTCACHESIZE.
#
# tenantRequestCapacity: 300
# The maximum number of concurrent Identity requests. Additional concurrent Identity requests will be rejected after a timeout period.
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_EXPERIMENTAL_IDENTITYREQUEST_TENANTREQUESTCAPACITY.
#
# tenantRequestTimeout: 1000
# The timeout in milliseconds for the Identity Request wrapper feature.
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_EXPERIMENTAL_IDENTITYREQUEST_TENANTREQUESTTIMEOUT.
# network:
# This section contains the network configuration. Particularly, it allows to
# configure the hosts and ports the broker should bind to. The broker exposes two sockets:
Expand Down
31 changes: 31 additions & 0 deletions dist/src/main/config/gateway.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,34 @@
# - id: example-interceptor
# className: com.acme.ExampleInterceptor
# jarPath: /path/to/interceptor/example-interceptor.jar
# experimental:
# Be aware that all configuration's which are part of the experimental section
# are subject to change and can be dropped at any time.
# It might be that also some of them are actually dangerous so be aware when you change one of these!
#
# identityRequest:
# Enables the Identity Request wrapper feature, for caching and throttling large numbers of concurrent
# requests to the Identity service. This feature will cache an Identity JWT token and a list of tenant IDs
# that are associated with it. The feature will also reject requests if the tenant cache is unable to reduce
# the number of concurrent requests.This feature can be enabled if you start to see UNAVAILABLE errors
# due to the Identity service being overloaded in your Zeebe client.
#
# enabled: false
# Enables the Identity Request wrapper feature.
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_EXPERIMENTAL_IDENTITYREQUEST_ENABLED.
#
# tenantCacheTtl: 5000
# The time-to-live in milliseconds for each entry the tenant cache.
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_EXPERIMENTAL_IDENTITYREQUEST_TENANTCACHETTL.
#
# tenantCacheSize: 200
# The maximum number of entries the tenant cache can hold.
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_EXPERIMENTAL_IDENTITYREQUEST_TENANTCACHESIZE.
#
# tenantRequestCapacity: 300
# The maximum number of concurrent Identity requests. Additional concurrent Identity requests will be rejected after a timeout period.
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_EXPERIMENTAL_IDENTITYREQUEST_TENANTREQUESTCAPACITY.
#
# tenantRequestTimeout: 1000
# The timeout in milliseconds for the Identity Request wrapper feature.
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_EXPERIMENTAL_IDENTITYREQUEST_TENANTREQUESTTIMEOUT.
5 changes: 5 additions & 0 deletions gateway/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>org.agrona</groupId>
<artifactId>agrona</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions gateway/src/main/java/io/camunda/zeebe/gateway/Gateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -364,13 +364,13 @@ private ServerServiceDefinition applyInterceptors(final BindableService service)
if (AuthMode.IDENTITY == gatewayCfg.getSecurity().getAuthentication().getMode()) {
final var zeebeIdentityCfg = gatewayCfg.getSecurity().getAuthentication().getIdentity();
if (isZeebeIdentityConfigurationNotNull(zeebeIdentityCfg)) {
interceptors.add(new IdentityInterceptor(zeebeIdentityCfg, gatewayCfg.getMultiTenancy()));
interceptors.add(new IdentityInterceptor(zeebeIdentityCfg, gatewayCfg));
LOG.warn(
"These Zeebe configuration properties for Camunda Identity are deprecated! Please use the "
+ "corresponding Camunda Identity properties or the environment variables defined here: "
+ "https://docs.camunda.io/docs/self-managed/identity/deployment/configuration-variables/");
} else {
interceptors.add(new IdentityInterceptor(identityCfg, gatewayCfg.getMultiTenancy()));
interceptors.add(new IdentityInterceptor(identityCfg, gatewayCfg));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.gateway.cmd;

import java.util.concurrent.TimeUnit;

public class ConcurrentRequestException extends ClientException {

private static final String MESSAGE_FORMAT =
"Expected to fetch tenants from Identity within %d%s, but there are too many concurrent requests; either increase the tenant request capacity, or scale Identity to complete requests faster.";

public ConcurrentRequestException(final long timeout, final TimeUnit timeoutUnit) {
super(String.format(MESSAGE_FORMAT, timeout, timeoutUnit));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.gateway.impl.configuration;

import java.util.Objects;

/**
* Be aware that all configuration which are part of this class are experimental, which means they
* are subject to change and to drop. It might be that also some of them are actually dangerous so
* be aware when you change one of these!
*/
public class ExperimentalCfg {

private IdentityServiceCfg identityRequest = new IdentityServiceCfg();

public IdentityServiceCfg getIdentityRequest() {
return identityRequest;
}

public void setIdentityRequest(final IdentityServiceCfg identityRequest) {
this.identityRequest = identityRequest;
}

@Override
public int hashCode() {
return Objects.hash(identityRequest);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final ExperimentalCfg that = (ExperimentalCfg) o;
return Objects.equals(identityRequest, that.identityRequest);
}

@Override
public String toString() {
return "ExperimentalCfg{" + "identityRequest=" + identityRequest + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public class GatewayCfg {
private List<InterceptorCfg> interceptors = new ArrayList<>();
private MultiTenancyCfg multiTenancy = new MultiTenancyCfg();

private ExperimentalCfg experimental = new ExperimentalCfg();

public void init() {
init(ConfigurationDefaults.DEFAULT_HOST);
}
Expand Down Expand Up @@ -90,10 +92,18 @@ public void setMultiTenancy(final MultiTenancyCfg multiTenancy) {
this.multiTenancy = multiTenancy;
}

public ExperimentalCfg getExperimental() {
return experimental;
}

public void setExperimental(final ExperimentalCfg experimental) {
this.experimental = experimental;
}

@Override
public int hashCode() {
return Objects.hash(
network, cluster, threads, security, longPolling, interceptors, multiTenancy);
network, cluster, threads, security, longPolling, interceptors, multiTenancy, experimental);
}

@Override
Expand All @@ -111,7 +121,8 @@ public boolean equals(final Object o) {
&& Objects.equals(security, that.security)
&& Objects.equals(longPolling, that.longPolling)
&& Objects.equals(interceptors, that.interceptors)
&& Objects.equals(multiTenancy, that.multiTenancy);
&& Objects.equals(multiTenancy, that.multiTenancy)
&& Objects.equals(experimental, that.experimental);
}

@Override
Expand All @@ -131,6 +142,8 @@ public String toString() {
+ interceptors
+ ", multiTenancy="
+ multiTenancy
+ ", experimental="
+ experimental
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.gateway.impl.configuration;

import java.util.Objects;

public class IdentityServiceCfg {

private boolean enabled = false;
private long tenantCacheTtl = 5000;
private long tenantCacheSize = 200;
private int tenantRequestCapacity = 300;
private long tenantRequestTimeout = 1000;

public boolean isEnabled() {
return enabled;
}

public void setEnabled(final boolean enabled) {
this.enabled = enabled;
}

public long getTenantCacheTtl() {
return tenantCacheTtl;
}

public void setTenantCacheTtl(final long tenantCacheTtl) {
this.tenantCacheTtl = tenantCacheTtl;
}

public long getTenantCacheSize() {
return tenantCacheSize;
}

public void setTenantCacheSize(final long tenantCacheSize) {
this.tenantCacheSize = tenantCacheSize;
}

public int getTenantRequestCapacity() {
return tenantRequestCapacity;
}

public void setTenantRequestCapacity(final int tenantRequestCapacity) {
this.tenantRequestCapacity = tenantRequestCapacity;
}

public long getTenantRequestTimeout() {
return tenantRequestTimeout;
}

public void setTenantRequestTimeout(final long tenantRequestTimeout) {
this.tenantRequestTimeout = tenantRequestTimeout;
}

@Override
public int hashCode() {
return Objects.hash(
enabled, tenantCacheTtl, tenantCacheSize, tenantRequestCapacity, tenantRequestTimeout);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final IdentityServiceCfg that = (IdentityServiceCfg) o;
return enabled == that.enabled
&& tenantCacheTtl == that.tenantCacheTtl
&& tenantCacheSize == that.tenantCacheSize
&& tenantRequestCapacity == that.tenantRequestCapacity
&& tenantRequestTimeout == that.tenantRequestTimeout;
}

@Override
public String toString() {
return "IdentityRequestCfg{"
+ "enabled="
+ enabled
+ ", tenantCacheTtl="
+ tenantCacheTtl
+ ", tenantCacheSize="
+ tenantCacheSize
+ ", tenantRequestCapacity="
+ tenantRequestCapacity
+ ", tenantRequestTimeout="
+ tenantRequestTimeout
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.gateway.impl.identity;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.camunda.identity.sdk.Identity;
import io.camunda.identity.sdk.tenants.dto.Tenant;
import io.camunda.zeebe.gateway.cmd.ConcurrentRequestException;
import io.camunda.zeebe.gateway.impl.configuration.IdentityServiceCfg;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class IdentityTenantService {

private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS;

private final LoadingCache<String, List<Tenant>> tenantCache;
private final Semaphore semaphore;
private final Identity identity;

private final boolean isCachingEnabled;
private final long tenantRequestTimeout;

public IdentityTenantService(final Identity identity, final IdentityServiceCfg config) {
this.identity = identity;
isCachingEnabled = config.isEnabled();
tenantRequestTimeout = config.getTenantRequestTimeout();
semaphore = new Semaphore(config.getTenantRequestCapacity());
tenantCache =
CacheBuilder.newBuilder()
.expireAfterWrite(config.getTenantCacheTtl(), TIME_UNIT)
.maximumSize(config.getTenantCacheSize())
.build(
new CacheLoader<>() {
@Override
public List<Tenant> load(final String token) {
return getTenantsForTokenThrottled(token);
}
});
}

public List<Tenant> getTenantsForToken(final String token) throws ExecutionException {
if (!isCachingEnabled) {
return getTenantsForTokenInternal(token);
}
return tenantCache.get(token);
}

private List<Tenant> getTenantsForTokenThrottled(final String token) {
try {
if (!semaphore.tryAcquire(tenantRequestTimeout, TIME_UNIT)) {
throw new ConcurrentRequestException(tenantRequestTimeout, TIME_UNIT);
}
return getTenantsForTokenInternal(token);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(
"Expected to fetch tenants from Identity, but the request was interrupted", e);
} finally {
semaphore.release();
}
}

private List<Tenant> getTenantsForTokenInternal(final String token) {
return identity.tenants().forToken(token);
}
}
Loading

0 comments on commit c3b1901

Please sign in to comment.