Skip to content

Commit

Permalink
Move expensive role building off transport thread (elastic#113020)
Browse files Browse the repository at this point in the history
This PR moves role building off the transport thread to the generic
thread pool, since role building can be expensive depending on role
structure.

Role building is CPU bound so this PR uses a `ThrottledTaskRunner` to
limit the number of concurrent requests. I will explore adding a max
queue limit in a follow up.

Resolves: ES-9505
(cherry picked from commit 6cdd59b)
  • Loading branch information
n1v0lg committed Nov 19, 2024
1 parent 771bea1 commit 51fdb8f
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,15 @@
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeMetadata;
import org.elasticsearch.features.FeatureService;
Expand Down Expand Up @@ -1037,6 +1040,7 @@ Collection<Object> createComponents(
serviceAccountService,
dlsBitsetCache.get(),
restrictedIndices,
buildRoleBuildingExecutor(threadPool, settings),
new DeprecationRoleDescriptorConsumer(clusterService, threadPool)
);
systemIndices.getMainIndexManager().addStateListener(allRolesStore::onSecurityIndexStateChange);
Expand Down Expand Up @@ -1268,6 +1272,29 @@ private void submitPersistentMigrationTask(int migrationsVersion, boolean securi
);
}

private static Executor buildRoleBuildingExecutor(ThreadPool threadPool, Settings settings) {
final int allocatedProcessors = EsExecutors.allocatedProcessors(settings);
final ThrottledTaskRunner throttledTaskRunner = new ThrottledTaskRunner("build_roles", allocatedProcessors, threadPool.generic());
return r -> throttledTaskRunner.enqueueTask(new ActionListener<>() {
@Override
public void onResponse(Releasable releasable) {
try (releasable) {
r.run();
}
}

@Override
public void onFailure(Exception e) {
if (r instanceof AbstractRunnable abstractRunnable) {
abstractRunnable.onFailure(e);
}
// should be impossible, GENERIC pool doesn't reject anything
logger.error("unexpected failure running " + r, e);
assert false : new AssertionError("unexpected failure running " + r, e);
}
});
}

private AuthorizationEngine getAuthorizationEngine() {
return findValueFromExtensions("authorization engine", extension -> extension.getAuthorizationEngine(settings));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.cache.Cache;
Expand Down Expand Up @@ -65,6 +66,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand All @@ -91,6 +93,11 @@ public class CompositeRolesStore {
Property.NodeScope
);
private static final Logger logger = LogManager.getLogger(CompositeRolesStore.class);
/**
* See {@link #shouldForkRoleBuilding(Set)}
*/
private static final int ROLE_DESCRIPTOR_FORK_THRESHOLD = 100;
private static final int INDEX_PRIVILEGE_FORK_THRESHOLD = 1000;

private final RoleProviders roleProviders;
private final NativePrivilegeStore privilegeStore;
Expand All @@ -106,6 +113,7 @@ public class CompositeRolesStore {
private final Map<String, Role> internalUserRoles;
private final RestrictedIndices restrictedIndices;
private final ThreadContext threadContext;
private final Executor roleBuildingExecutor;

public CompositeRolesStore(
Settings settings,
Expand All @@ -118,6 +126,7 @@ public CompositeRolesStore(
ServiceAccountService serviceAccountService,
DocumentSubsetBitsetCache dlsBitsetCache,
RestrictedIndices restrictedIndices,
Executor roleBuildingExecutor,
Consumer<Collection<RoleDescriptor>> effectiveRoleDescriptorsConsumer
) {
this.roleProviders = roleProviders;
Expand Down Expand Up @@ -179,6 +188,7 @@ public void providersChanged() {
);
this.anonymousUser = new AnonymousUser(settings);
this.threadContext = threadContext;
this.roleBuildingExecutor = roleBuildingExecutor;
}

public void getRoles(Authentication authentication, ActionListener<Tuple<Role, Role>> roleActionListener) {
Expand Down Expand Up @@ -276,21 +286,70 @@ public void buildRoleFromRoleReference(RoleReference roleReference, ActionListen
} else if (RolesRetrievalResult.SUPERUSER == rolesRetrievalResult) {
roleActionListener.onResponse(superuserRole);
} else {
buildThenMaybeCacheRole(
roleKey,
rolesRetrievalResult.getRoleDescriptors(),
rolesRetrievalResult.getMissingRoles(),
rolesRetrievalResult.isSuccess(),
invalidationCounter,
ActionListener.wrap(roleActionListener::onResponse, failureHandler)
);
final ActionListener<Role> wrapped = ActionListener.wrap(roleActionListener::onResponse, failureHandler);
if (shouldForkRoleBuilding(rolesRetrievalResult.getRoleDescriptors())) {
roleBuildingExecutor.execute(
ActionRunnable.wrap(
wrapped,
l -> buildThenMaybeCacheRole(
roleKey,
rolesRetrievalResult.getRoleDescriptors(),
rolesRetrievalResult.getMissingRoles(),
rolesRetrievalResult.isSuccess(),
invalidationCounter,
l
)
)
);
} else {
buildThenMaybeCacheRole(
roleKey,
rolesRetrievalResult.getRoleDescriptors(),
rolesRetrievalResult.getMissingRoles(),
rolesRetrievalResult.isSuccess(),
invalidationCounter,
wrapped
);
}
}
}, failureHandler));
} else {
roleActionListener.onResponse(existing);
}
}

/**
* Uses heuristics such as presence of application privileges to determine if role building will be expensive
* and therefore warrants forking.
* Package-private for testing.
*/
boolean shouldForkRoleBuilding(Set<RoleDescriptor> roleDescriptors) {
// A role with many role descriptors is likely expensive to build
if (roleDescriptors.size() > ROLE_DESCRIPTOR_FORK_THRESHOLD) {
return true;
}
int totalIndexPrivileges = 0;
int totalRemoteIndexPrivileges = 0;
for (RoleDescriptor roleDescriptor : roleDescriptors) {
// Application privileges can also result in big automata; it's difficult to determine how big application privileges
// are so err on the side of caution
if (roleDescriptor.hasApplicationPrivileges()) {
return true;
}
// Index privilege names or remote index privilege names can result in big and complex automata
totalIndexPrivileges += roleDescriptor.getIndicesPrivileges().length;
totalRemoteIndexPrivileges += roleDescriptor.getRemoteIndicesPrivileges().length;
if (totalIndexPrivileges > INDEX_PRIVILEGE_FORK_THRESHOLD || totalRemoteIndexPrivileges > INDEX_PRIVILEGE_FORK_THRESHOLD) {
return true;
}
// Likewise for FLS/DLS
if (roleDescriptor.isUsingDocumentOrFieldLevelSecurity()) {
return true;
}
}
return false;
}

private static boolean includesSuperuserRole(RoleReference roleReference) {
if (roleReference instanceof RoleReference.NamedRoleReference namedRoles) {
return Arrays.asList(namedRoles.getRoleNames()).contains(ReservedRolesStore.SUPERUSER_ROLE_DESCRIPTOR.getName());
Expand All @@ -313,10 +372,11 @@ private void buildThenMaybeCacheRole(
ActionListener<Role> listener
) {
logger.trace(
"Building role from descriptors [{}] for names [{}] from source [{}]",
"Building role from descriptors [{}] for names [{}] from source [{}] on [{}]",
roleDescriptors,
roleKey.getNames(),
roleKey.getSource()
roleKey.getSource(),
Thread.currentThread().getName()
);
buildRoleFromDescriptors(
roleDescriptors,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -242,6 +243,7 @@ public void setup() {
mock(ServiceAccountService.class),
new DocumentSubsetBitsetCache(Settings.EMPTY, mock(ThreadPool.class)),
RESTRICTED_INDICES,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
rds -> {}
)
);
Expand Down
Loading

0 comments on commit 51fdb8f

Please sign in to comment.