Skip to content

Commit

Permalink
Move expensive role building off transport thread (#113020) (#117002)
Browse files Browse the repository at this point in the history
# Backport

This will backport the following commits from `main` to `8.16`:
 - [Move expensive role building off transport thread (#113020)](#113020)
  • Loading branch information
n1v0lg authored Nov 19, 2024
1 parent 771bea1 commit 33b8fb1
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,15 @@
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeMetadata;
import org.elasticsearch.features.FeatureService;
Expand Down Expand Up @@ -1037,6 +1040,7 @@ Collection<Object> createComponents(
serviceAccountService,
dlsBitsetCache.get(),
restrictedIndices,
buildRoleBuildingExecutor(threadPool, settings),
new DeprecationRoleDescriptorConsumer(clusterService, threadPool)
);
systemIndices.getMainIndexManager().addStateListener(allRolesStore::onSecurityIndexStateChange);
Expand Down Expand Up @@ -1268,6 +1272,29 @@ private void submitPersistentMigrationTask(int migrationsVersion, boolean securi
);
}

private static Executor buildRoleBuildingExecutor(ThreadPool threadPool, Settings settings) {
final int allocatedProcessors = EsExecutors.allocatedProcessors(settings);
final ThrottledTaskRunner throttledTaskRunner = new ThrottledTaskRunner("build_roles", allocatedProcessors, threadPool.generic());
return r -> throttledTaskRunner.enqueueTask(new ActionListener<>() {
@Override
public void onResponse(Releasable releasable) {
try (releasable) {
r.run();
}
}

@Override
public void onFailure(Exception e) {
if (r instanceof AbstractRunnable abstractRunnable) {
abstractRunnable.onFailure(e);
}
// should be impossible, GENERIC pool doesn't reject anything
logger.error("unexpected failure running " + r, e);
assert false : new AssertionError("unexpected failure running " + r, e);
}
});
}

private AuthorizationEngine getAuthorizationEngine() {
return findValueFromExtensions("authorization engine", extension -> extension.getAuthorizationEngine(settings));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.cache.Cache;
Expand Down Expand Up @@ -65,6 +66,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand All @@ -91,6 +93,11 @@ public class CompositeRolesStore {
Property.NodeScope
);
private static final Logger logger = LogManager.getLogger(CompositeRolesStore.class);
/**
* See {@link #shouldForkRoleBuilding(Set)}
*/
private static final int ROLE_DESCRIPTOR_FORK_THRESHOLD = 100;
private static final int INDEX_PRIVILEGE_FORK_THRESHOLD = 1000;

private final RoleProviders roleProviders;
private final NativePrivilegeStore privilegeStore;
Expand All @@ -106,6 +113,7 @@ public class CompositeRolesStore {
private final Map<String, Role> internalUserRoles;
private final RestrictedIndices restrictedIndices;
private final ThreadContext threadContext;
private final Executor roleBuildingExecutor;

public CompositeRolesStore(
Settings settings,
Expand All @@ -118,6 +126,7 @@ public CompositeRolesStore(
ServiceAccountService serviceAccountService,
DocumentSubsetBitsetCache dlsBitsetCache,
RestrictedIndices restrictedIndices,
Executor roleBuildingExecutor,
Consumer<Collection<RoleDescriptor>> effectiveRoleDescriptorsConsumer
) {
this.roleProviders = roleProviders;
Expand Down Expand Up @@ -179,6 +188,7 @@ public void providersChanged() {
);
this.anonymousUser = new AnonymousUser(settings);
this.threadContext = threadContext;
this.roleBuildingExecutor = roleBuildingExecutor;
}

public void getRoles(Authentication authentication, ActionListener<Tuple<Role, Role>> roleActionListener) {
Expand Down Expand Up @@ -276,21 +286,70 @@ public void buildRoleFromRoleReference(RoleReference roleReference, ActionListen
} else if (RolesRetrievalResult.SUPERUSER == rolesRetrievalResult) {
roleActionListener.onResponse(superuserRole);
} else {
buildThenMaybeCacheRole(
roleKey,
rolesRetrievalResult.getRoleDescriptors(),
rolesRetrievalResult.getMissingRoles(),
rolesRetrievalResult.isSuccess(),
invalidationCounter,
ActionListener.wrap(roleActionListener::onResponse, failureHandler)
);
final ActionListener<Role> wrapped = ActionListener.wrap(roleActionListener::onResponse, failureHandler);
if (shouldForkRoleBuilding(rolesRetrievalResult.getRoleDescriptors())) {
roleBuildingExecutor.execute(
ActionRunnable.wrap(
wrapped,
l -> buildThenMaybeCacheRole(
roleKey,
rolesRetrievalResult.getRoleDescriptors(),
rolesRetrievalResult.getMissingRoles(),
rolesRetrievalResult.isSuccess(),
invalidationCounter,
l
)
)
);
} else {
buildThenMaybeCacheRole(
roleKey,
rolesRetrievalResult.getRoleDescriptors(),
rolesRetrievalResult.getMissingRoles(),
rolesRetrievalResult.isSuccess(),
invalidationCounter,
wrapped
);
}
}
}, failureHandler));
} else {
roleActionListener.onResponse(existing);
}
}

/**
* Uses heuristics such as presence of application privileges to determine if role building will be expensive
* and therefore warrants forking.
* Package-private for testing.
*/
boolean shouldForkRoleBuilding(Set<RoleDescriptor> roleDescriptors) {
// A role with many role descriptors is likely expensive to build
if (roleDescriptors.size() > ROLE_DESCRIPTOR_FORK_THRESHOLD) {
return true;
}
int totalIndexPrivileges = 0;
int totalRemoteIndexPrivileges = 0;
for (RoleDescriptor roleDescriptor : roleDescriptors) {
// Application privileges can also result in big automata; it's difficult to determine how big application privileges
// are so err on the side of caution
if (roleDescriptor.hasApplicationPrivileges()) {
return true;
}
// Index privilege names or remote index privilege names can result in big and complex automata
totalIndexPrivileges += roleDescriptor.getIndicesPrivileges().length;
totalRemoteIndexPrivileges += roleDescriptor.getRemoteIndicesPrivileges().length;
if (totalIndexPrivileges > INDEX_PRIVILEGE_FORK_THRESHOLD || totalRemoteIndexPrivileges > INDEX_PRIVILEGE_FORK_THRESHOLD) {
return true;
}
// Likewise for FLS/DLS
if (roleDescriptor.isUsingDocumentOrFieldLevelSecurity()) {
return true;
}
}
return false;
}

private static boolean includesSuperuserRole(RoleReference roleReference) {
if (roleReference instanceof RoleReference.NamedRoleReference namedRoles) {
return Arrays.asList(namedRoles.getRoleNames()).contains(ReservedRolesStore.SUPERUSER_ROLE_DESCRIPTOR.getName());
Expand All @@ -313,10 +372,11 @@ private void buildThenMaybeCacheRole(
ActionListener<Role> listener
) {
logger.trace(
"Building role from descriptors [{}] for names [{}] from source [{}]",
"Building role from descriptors [{}] for names [{}] from source [{}] on [{}]",
roleDescriptors,
roleKey.getNames(),
roleKey.getSource()
roleKey.getSource(),
Thread.currentThread().getName()
);
buildRoleFromDescriptors(
roleDescriptors,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -242,6 +243,7 @@ public void setup() {
mock(ServiceAccountService.class),
new DocumentSubsetBitsetCache(Settings.EMPTY, mock(ThreadPool.class)),
RESTRICTED_INDICES,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
rds -> {}
)
);
Expand Down
Loading

0 comments on commit 33b8fb1

Please sign in to comment.