Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move expensive role building off transport thread #113020

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,15 @@
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeMetadata;
import org.elasticsearch.features.FeatureService;
Expand Down Expand Up @@ -1036,6 +1039,7 @@ Collection<Object> createComponents(
serviceAccountService,
dlsBitsetCache.get(),
restrictedIndices,
buildRoleBuildingExecutor(threadPool, settings),
new DeprecationRoleDescriptorConsumer(clusterService, threadPool)
);
systemIndices.getMainIndexManager().addStateListener(allRolesStore::onSecurityIndexStateChange);
Expand Down Expand Up @@ -1267,6 +1271,29 @@ private void submitPersistentMigrationTask(int migrationsVersion, boolean securi
);
}

private static Executor buildRoleBuildingExecutor(ThreadPool threadPool, Settings settings) {
final int allocatedProcessors = EsExecutors.allocatedProcessors(settings);
final ThrottledTaskRunner throttledTaskRunner = new ThrottledTaskRunner("build_roles", allocatedProcessors, threadPool.generic());
return r -> throttledTaskRunner.enqueueTask(new ActionListener<>() {
@Override
public void onResponse(Releasable releasable) {
try (releasable) {
r.run();
}
}

@Override
public void onFailure(Exception e) {
if (r instanceof AbstractRunnable abstractRunnable) {
abstractRunnable.onFailure(e);
}
// should be impossible, GENERIC pool doesn't reject anything
logger.error("unexpected failure running " + r, e);
assert false : new AssertionError("unexpected failure running " + r, e);
}
});
}

private AuthorizationEngine getAuthorizationEngine() {
return findValueFromExtensions("authorization engine", extension -> extension.getAuthorizationEngine(settings));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.cache.Cache;
Expand Down Expand Up @@ -65,6 +66,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand All @@ -91,6 +93,11 @@ public class CompositeRolesStore {
Property.NodeScope
);
private static final Logger logger = LogManager.getLogger(CompositeRolesStore.class);
/**
* See {@link #shouldForkRoleBuilding(Set)}
*/
private static final int ROLE_DESCRIPTOR_FORK_THRESHOLD = 100;
private static final int INDEX_PRIVILEGE_FORK_THRESHOLD = 1000;

private final RoleProviders roleProviders;
private final NativePrivilegeStore privilegeStore;
Expand All @@ -106,6 +113,7 @@ public class CompositeRolesStore {
private final Map<String, Role> internalUserRoles;
private final RestrictedIndices restrictedIndices;
private final ThreadContext threadContext;
private final Executor roleBuildingExecutor;

public CompositeRolesStore(
Settings settings,
Expand All @@ -118,6 +126,7 @@ public CompositeRolesStore(
ServiceAccountService serviceAccountService,
DocumentSubsetBitsetCache dlsBitsetCache,
RestrictedIndices restrictedIndices,
Executor roleBuildingExecutor,
Consumer<Collection<RoleDescriptor>> effectiveRoleDescriptorsConsumer
) {
this.roleProviders = roleProviders;
Expand Down Expand Up @@ -179,6 +188,7 @@ public void providersChanged() {
);
this.anonymousUser = new AnonymousUser(settings);
this.threadContext = threadContext;
this.roleBuildingExecutor = roleBuildingExecutor;
}

public void getRoles(Authentication authentication, ActionListener<Tuple<Role, Role>> roleActionListener) {
Expand Down Expand Up @@ -276,21 +286,70 @@ public void buildRoleFromRoleReference(RoleReference roleReference, ActionListen
} else if (RolesRetrievalResult.SUPERUSER == rolesRetrievalResult) {
roleActionListener.onResponse(superuserRole);
} else {
buildThenMaybeCacheRole(
roleKey,
rolesRetrievalResult.getRoleDescriptors(),
rolesRetrievalResult.getMissingRoles(),
rolesRetrievalResult.isSuccess(),
invalidationCounter,
ActionListener.wrap(roleActionListener::onResponse, failureHandler)
);
final ActionListener<Role> wrapped = ActionListener.wrap(roleActionListener::onResponse, failureHandler);
if (shouldForkRoleBuilding(rolesRetrievalResult.getRoleDescriptors())) {
roleBuildingExecutor.execute(
ActionRunnable.wrap(
wrapped,
l -> buildThenMaybeCacheRole(
roleKey,
rolesRetrievalResult.getRoleDescriptors(),
rolesRetrievalResult.getMissingRoles(),
rolesRetrievalResult.isSuccess(),
invalidationCounter,
l
)
)
);
} else {
buildThenMaybeCacheRole(
n1v0lg marked this conversation as resolved.
Show resolved Hide resolved
roleKey,
rolesRetrievalResult.getRoleDescriptors(),
rolesRetrievalResult.getMissingRoles(),
rolesRetrievalResult.isSuccess(),
invalidationCounter,
wrapped
);
}
}
}, failureHandler));
} else {
roleActionListener.onResponse(existing);
}
}

/**
* Uses heuristics such as presence of application privileges to determine if role building will be expensive
* and therefore warrants forking.
* Package-private for testing.
*/
boolean shouldForkRoleBuilding(Set<RoleDescriptor> roleDescriptors) {
n1v0lg marked this conversation as resolved.
Show resolved Hide resolved
// A role with many role descriptors is likely expensive to build
if (roleDescriptors.size() > ROLE_DESCRIPTOR_FORK_THRESHOLD) {
return true;
}
int totalIndexPrivileges = 0;
int totalRemoteIndexPrivileges = 0;
for (RoleDescriptor roleDescriptor : roleDescriptors) {
// Application privileges can also result in big automata; it's difficult to determine how big application privileges
// are so err on the side of caution
if (roleDescriptor.hasApplicationPrivileges()) {
return true;
}
// Index privilege names or remote index privilege names can result in big and complex automata
totalIndexPrivileges += roleDescriptor.getIndicesPrivileges().length;
totalRemoteIndexPrivileges += roleDescriptor.getRemoteIndicesPrivileges().length;
if (totalIndexPrivileges > INDEX_PRIVILEGE_FORK_THRESHOLD || totalRemoteIndexPrivileges > INDEX_PRIVILEGE_FORK_THRESHOLD) {
return true;
}
// Likewise for FLS/DLS
if (roleDescriptor.isUsingDocumentOrFieldLevelSecurity()) {
return true;
}
}
return false;
}

private static boolean includesSuperuserRole(RoleReference roleReference) {
if (roleReference instanceof RoleReference.NamedRoleReference namedRoles) {
return Arrays.asList(namedRoles.getRoleNames()).contains(ReservedRolesStore.SUPERUSER_ROLE_DESCRIPTOR.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -242,6 +243,7 @@ public void setup() {
mock(ServiceAccountService.class),
new DocumentSubsetBitsetCache(Settings.EMPTY, mock(ThreadPool.class)),
RESTRICTED_INDICES,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
rds -> {}
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContentHelper;
Expand Down Expand Up @@ -686,6 +687,52 @@ public void testNegativeLookupsCacheDisabled() {
verifyNoMoreInteractions(fileRolesStore, reservedRolesStore, nativeRolesStore);
}

public void testShouldForkRoleBuilding() {
final CompositeRolesStore compositeRolesStore = new CompositeRolesStore(
SECURITY_ENABLED_SETTINGS,
mock(RoleProviders.class),
mock(NativePrivilegeStore.class),
new ThreadContext(SECURITY_ENABLED_SETTINGS),
mock(),
cache,
mock(ApiKeyService.class),
mock(ServiceAccountService.class),
buildBitsetCache(),
TestRestrictedIndices.RESTRICTED_INDICES,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
mock()
);

n1v0lg marked this conversation as resolved.
Show resolved Hide resolved
assertFalse(compositeRolesStore.shouldForkRoleBuilding(Set.of()));
assertFalse(
compositeRolesStore.shouldForkRoleBuilding(
Set.of(
randomValueOtherThanMany(
rd -> rd.isUsingDocumentOrFieldLevelSecurity() || rd.hasApplicationPrivileges(),
RoleDescriptorTestHelper::randomRoleDescriptor
)
)
)
);
assertTrue(
compositeRolesStore.shouldForkRoleBuilding(
Set.of(
randomValueOtherThanMany(
rd -> false == rd.isUsingDocumentOrFieldLevelSecurity(),
RoleDescriptorTestHelper::randomRoleDescriptor
)
)
)
);
assertTrue(
compositeRolesStore.shouldForkRoleBuilding(
Set.of(
randomValueOtherThanMany(rd -> false == rd.hasApplicationPrivileges(), RoleDescriptorTestHelper::randomRoleDescriptor)
)
)
);
}

public void testNegativeLookupsAreNotCachedWithFailures() {
final FileRolesStore fileRolesStore = mock(FileRolesStore.class);
doCallRealMethod().when(fileRolesStore).accept(anySet(), anyActionListener());
Expand Down Expand Up @@ -715,6 +762,7 @@ public void testNegativeLookupsAreNotCachedWithFailures() {
mock(ServiceAccountService.class),
documentSubsetBitsetCache,
TestRestrictedIndices.RESTRICTED_INDICES,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
effectiveRoleDescriptors::set
);
verify(fileRolesStore).addListener(anyConsumer()); // adds a listener in ctor
Expand Down Expand Up @@ -2318,6 +2366,7 @@ public void testGetRoleForWorkflowWithRestriction() {
mock(ServiceAccountService.class),
buildBitsetCache(),
TestRestrictedIndices.RESTRICTED_INDICES,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
rds -> {}
);

Expand Down Expand Up @@ -2431,6 +2480,7 @@ public void testGetRoleForWorkflowWithoutRestriction() {
mock(ServiceAccountService.class),
buildBitsetCache(),
TestRestrictedIndices.RESTRICTED_INDICES,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
rds -> {}
);

Expand Down Expand Up @@ -3024,6 +3074,7 @@ private CompositeRolesStore buildCompositeRolesStore(
serviceAccountService,
documentSubsetBitsetCache,
TestRestrictedIndices.RESTRICTED_INDICES,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
roleConsumer
) {
@Override
Expand Down