diff --git a/agent/build.gradle b/agent/build.gradle
new file mode 100644
index 0000000..604835f
--- /dev/null
+++ b/agent/build.gradle
@@ -0,0 +1,124 @@
+plugins {
+ id 'com.github.johnrengelman.shadow' version '6.0.0'
+}
+
+dependencies {
+
+ implementation project(':needle-core')
+ testImplementation project(':needle-core').sourceSets.test.output
+
+ implementation 'net.bytebuddy:byte-buddy:1.10.14'
+
+ implementation 'com.fasterxml.jackson.core:jackson-core:2.11.2'
+ implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.11.2'
+}
+
+task sourcesJar(type: Jar, dependsOn: classes) {
+ classifier 'sources'
+ from sourceSets.main.allSource
+}
+
+task javadocJar(type: Jar, dependsOn: javadoc) {
+ classifier 'javadoc'
+ from javadoc.destinationDir
+}
+
+jar {
+ enabled = false
+}
+
+afterEvaluate {
+
+ task copyAgent(dependsOn: shadowJar, type: Copy) {
+ from "$buildDir/libs/"
+ into "$buildDir/resources/test"
+ include "*.jar"
+ }
+
+ shadowJar {
+
+ archiveClassifier.set('')
+
+ from sourceSets.main.output
+
+ dependencies {
+ exclude(dependency('org.jetbrains.kotlin:.*:.*'))
+ }
+
+ manifest {
+ attributes(
+ "Agent-Class": "org.sheinbergon.needle.agent.NeedleAgent",
+ "Can-Redefine-Classes": true,
+ "Can-Retransform-Classes": true,
+ "Boot-Class-Path": archiveFileName.get(),
+ "Premain-Class": "org.sheinbergon.needle.agent.NeedleAgent")
+ }
+
+ finalizedBy tasks.copyAgent
+ }
+
+ test {
+ forkEvery(1)
+ dependsOn tasks.copyAgent
+ systemProperties = [
+ "jdk.attach.allowAttachSelf": true,
+ "test.agent.jar.path" : "/${tasks.shadowJar.archiveFileName.get()}"
+ ]
+ }
+
+ publish.dependsOn shadowJar
+ assemble.dependsOn shadowJar
+ build.dependsOn shadowJar
+ uploadArchives.dependsOn shadowJar
+}
+
+publishing {
+ publications {
+ agent(MavenPublication) {
+ project.shadow.component(it)
+ artifactId = "needle-agent"
+ artifact tasks.javadocJar
+ artifact tasks.sourcesJar
+ pom {
+ name = project.name
+ description = 'Feature-rich CPU affinity for the JVM - Affinity Setting Agent'
+ url = 'https://github.com/sheinbergon/needle'
+ inceptionYear = '2020'
+
+ licenses {
+ license {
+ name = 'Apache License 2.0'
+ url = 'https://github.com/sheinbergon/needle/blob/master/LICENSE'
+ distribution = 'repo'
+ }
+ }
+
+ developers {
+ developer {
+ id = 'sheinbergon'
+ name = 'Idan Sheinberg'
+ email = 'ishinberg0@gmail.com'
+ }
+ }
+
+ scm {
+ url = 'https://github.com/sheinbergon/needle'
+ connection = 'scm:https://github.com/sheinbergon/needle.git'
+ developerConnection = 'scm:git@github.com:sheinbergon/needle.git'
+ }
+ }
+
+ repositories {
+ mavenLocal()
+ maven {
+ name "oss-sonatype-nexus"
+ url nexus.url
+ credentials {
+ username = nexus.username
+ password = nexus.password
+ }
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/agent/src/main/java/org/sheinbergon/needle/agent/AffinityAdvice.java b/agent/src/main/java/org/sheinbergon/needle/agent/AffinityAdvice.java
new file mode 100644
index 0000000..5cb4b93
--- /dev/null
+++ b/agent/src/main/java/org/sheinbergon/needle/agent/AffinityAdvice.java
@@ -0,0 +1,54 @@
+package org.sheinbergon.needle.agent;
+
+import lombok.val;
+import net.bytebuddy.asm.Advice;
+import org.sheinbergon.needle.Needle;
+import org.sheinbergon.needle.Pinned;
+import org.sheinbergon.needle.agent.util.AffinityGroupMatcher;
+import org.sheinbergon.needle.util.NeedleAffinity;
+
+import javax.annotation.Nonnull;
+
+public final class AffinityAdvice {
+
+ /**
+ * Empty utility class private constructor.
+ */
+ private AffinityAdvice() {
+ }
+
+ /**
+ * Byte-Buddy advice method, this is latched on to {@link Thread#run()}, running before the actual method executes.
+ *
+ * Simply put, it search for a matching affinity group, and sets the thread's affintiy according to the group's
+ * {@link org.sheinbergon.needle.AffinityDescriptor} specified values.
+ *
+ * @see NeedleAgentConfiguration
+ * @see org.sheinbergon.needle.AffinityDescriptor
+ */
+ @Advice.OnMethodEnter
+ public static void run() {
+ try {
+ val thread = Thread.currentThread();
+ if (!excluded(thread)) {
+ val group = AffinityGroupMatcher.forThread(thread);
+ Needle.affinity(group.affinity());
+ }
+ } catch (Throwable throwable) {
+ // Do nothing if any exception were thrown, for now.
+ // TODO - Expose these via JUL logging (meant to bridged to other loggers)
+ }
+ }
+
+ /**
+ * Indicates whether this given thread should be exempt from affinity-group matching and setting.
+ * This method is public to satisfy byte-buddy class-loading requirements.
+ *
+ * @param thread The thread to inspect for exclusion from affinity-group matching.
+ * @return Boolean value, indicating if this thread is to be applied affinity-group settings
+ */
+ public static boolean excluded(final @Nonnull Thread thread) {
+ val type = thread.getClass();
+ return Pinned.class.isAssignableFrom(type) || type.isAnnotationPresent(NeedleAffinity.class);
+ }
+}
diff --git a/agent/src/main/java/org/sheinbergon/needle/agent/NeedleAgent.java b/agent/src/main/java/org/sheinbergon/needle/agent/NeedleAgent.java
new file mode 100644
index 0000000..10bf1a2
--- /dev/null
+++ b/agent/src/main/java/org/sheinbergon/needle/agent/NeedleAgent.java
@@ -0,0 +1,129 @@
+package org.sheinbergon.needle.agent;
+
+import lombok.val;
+import net.bytebuddy.agent.builder.AgentBuilder;
+import net.bytebuddy.asm.Advice;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.dynamic.ClassFileLocator;
+import net.bytebuddy.dynamic.DynamicType;
+import net.bytebuddy.dynamic.loading.ClassInjector;
+import net.bytebuddy.utility.JavaModule;
+import org.sheinbergon.needle.Pinned;
+import org.sheinbergon.needle.agent.util.AffinityGroupMatcher;
+import org.sheinbergon.needle.agent.util.YamlCodec;
+import org.sheinbergon.needle.util.NeedleAffinity;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.File;
+import java.lang.instrument.Instrumentation;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import static net.bytebuddy.matcher.ElementMatchers.is;
+import static net.bytebuddy.matcher.ElementMatchers.isAnnotatedWith;
+import static net.bytebuddy.matcher.ElementMatchers.isSubTypeOf;
+import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.not;
+
+public final class NeedleAgent {
+
+ private NeedleAgent() {
+ }
+
+ /**
+ * Static agent loading endpoint.
+ *
+ * @param arguments Agent configuration string, if specified, must be a valid JVM URL string pointing to the
+ * agent configuration file path (i.e. file:///some/file.yml).
+ * @param instrumentation JVM instrumentation interface
+ * @throws Exception any execution error encountered during instrumentation setup
+ */
+ public static void premain(
+ final String arguments,
+ final Instrumentation instrumentation) throws Exception {
+ val storage = Files.createTempDirectory("needle-agent-instrumentation").toFile();
+ setupBootstrapInjection(storage, instrumentation);
+ agentConfiguration(arguments);
+ val builder = new AgentBuilder.Default()
+ .disableClassFormatChanges()
+ .ignore(nameStartsWith("net.bytebuddy."))
+ .with(AgentBuilder.TypeStrategy.Default.REDEFINE)
+ .with(AgentBuilder.RedefinitionStrategy.REDEFINITION)
+ .with(AgentBuilder.InitializationStrategy.NoOp.INSTANCE)
+ .with(new AgentBuilder.InjectionStrategy.UsingInstrumentation(instrumentation, storage));
+ val narrowable = matchers(builder);
+ narrowable.transform(NeedleAgent::premainTransform)
+ .installOn(instrumentation);
+ }
+
+ /**
+ * Dynamic agent loading endpoint.
+ *
+ * @param arguments Agent configuration string, if specified, must be a valid JVM URL string pointing to the
+ * agent configuration file path (i.e. file:///some/file.yml).
+ * @param instrumentation JVM instrumentation interface
+ * @throws Exception any execution error encountered during instrumentation setup
+ */
+ public static void agentmain(
+ final String arguments,
+ final Instrumentation instrumentation) throws Exception {
+ agentConfiguration(arguments);
+ val builder = new AgentBuilder.Default()
+ .disableClassFormatChanges()
+ .ignore(nameStartsWith("net.bytebuddy."))
+ .with(AgentBuilder.TypeStrategy.Default.REDEFINE)
+ .with(AgentBuilder.RedefinitionStrategy.REDEFINITION)
+ .with(AgentBuilder.InitializationStrategy.NoOp.INSTANCE);
+ val narrowable = matchers(builder);
+ narrowable.transform(agentmainTransform())
+ .installOn(instrumentation);
+
+ }
+
+ private static AgentBuilder.Identified.Narrowable matchers(
+ final @Nonnull AgentBuilder builder) {
+ return builder.type(not(isSubTypeOf(Pinned.class))
+ .and(not(isAnnotatedWith(NeedleAffinity.class)))
+ .and(isSubTypeOf(Thread.class).or(is(Thread.class))));
+ }
+
+ private static DynamicType.Builder> premainTransform(
+ final @Nonnull DynamicType.Builder> builder,
+ final @Nonnull TypeDescription typeDescription,
+ final @Nullable ClassLoader classLoader,
+ final @Nonnull JavaModule module) {
+ return builder.visit(Advice.to(AffinityAdvice.class).on(named("run")));
+ }
+
+ private static AgentBuilder.Transformer agentmainTransform() {
+ return new AgentBuilder.Transformer.ForAdvice()
+ .include(NeedleAgent.class.getClassLoader())
+ .advice(named("run"), AffinityAdvice.class.getName());
+ }
+
+ private static void setupBootstrapInjection(
+ final @Nonnull File storage,
+ final @Nonnull Instrumentation instrumentation) {
+ ClassInjector.UsingInstrumentation
+ .of(storage, ClassInjector.UsingInstrumentation.Target.BOOTSTRAP, instrumentation)
+ .inject(Map.of(
+ new TypeDescription.ForLoadedType(AffinityAdvice.class),
+ ClassFileLocator.ForClassLoader.read(AffinityAdvice.class)));
+ }
+
+ private static void agentConfiguration(final @Nullable String arguments) throws MalformedURLException {
+ Supplier supplier;
+ if (arguments != null) {
+ val url = new URL(arguments);
+ supplier = () -> YamlCodec.parseConfiguration(url);
+ } else {
+ supplier = () -> NeedleAgentConfiguration.DEFAULT;
+ }
+ AffinityGroupMatcher.setConfigurationSupplier(supplier);
+ }
+}
diff --git a/agent/src/main/java/org/sheinbergon/needle/agent/NeedleAgentConfiguration.java b/agent/src/main/java/org/sheinbergon/needle/agent/NeedleAgentConfiguration.java
new file mode 100644
index 0000000..3d99e0b
--- /dev/null
+++ b/agent/src/main/java/org/sheinbergon/needle/agent/NeedleAgentConfiguration.java
@@ -0,0 +1,144 @@
+package org.sheinbergon.needle.agent;
+
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.experimental.Accessors;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.sheinbergon.needle.AffinityDescriptor;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.regex.Pattern;
+
+@Data
+@NoArgsConstructor
+@Accessors(fluent = true)
+public class NeedleAgentConfiguration {
+
+ /**
+ * Default configuration constructs, implies no-op affinity descriptor to the Needle framework.
+ * To be used in the absence of needle-agent configuration specification.
+ */
+ public static final NeedleAgentConfiguration DEFAULT = new NeedleAgentConfiguration()
+ .defaultAffinity(AffinityDescriptor.from(NumberUtils.LONG_ZERO));
+
+ @Data
+ @NoArgsConstructor
+ @Accessors(fluent = true, chain = true)
+ public static final class AffinityGroup {
+
+ public enum Qualifier {
+
+ /**
+ * Thread name based matching qualifier, as provided by {@link Thread#getName()}.
+ */
+ NAME,
+ /**
+ * Thread class FQDN based matching qualifier, as provided by {@link Class#getName()}.
+ */
+ CLASS;
+ }
+
+ @Accessors(fluent = true, chain = true)
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
+ include = JsonTypeInfo.As.EXISTING_PROPERTY, property = "type")
+ @JsonSubTypes({
+ @JsonSubTypes.Type(value = Matcher.Prefix.class, name = "PREFIX"),
+ @JsonSubTypes.Type(value = Matcher.Regex.class, name = "REGEX")
+ })
+ public interface Matcher {
+
+ @Data
+ @NoArgsConstructor
+ @Accessors(fluent = true, chain = true)
+ @EqualsAndHashCode(callSuper = false)
+ final class Prefix implements Matcher {
+
+ /**
+ * Specifies the string prefix used to match given target strings.
+ */
+ @Nonnull
+ private String prefix;
+
+ @Override
+ public boolean matches(final @Nonnull String target) {
+ return target.startsWith(prefix);
+ }
+ }
+
+ @Data
+ @NoArgsConstructor
+ @Accessors(fluent = true, chain = true)
+ @EqualsAndHashCode(callSuper = false)
+ final class Regex implements Matcher {
+
+ /**
+ * Specifies the regex expression used to match given target strings.
+ */
+ @Nonnull
+ private Pattern pattern;
+
+ @Override
+ public boolean matches(final @Nonnull String target) {
+ return pattern.matcher(target).matches();
+ }
+ }
+
+ /**
+ * @param target the match target to be matched
+ * @return Boolean value indicating whether or not this {@code Matcher} implementation matches the given
+ * match target.
+ */
+ boolean matches(@Nonnull String target);
+ }
+
+ /**
+ * This group inflated {@link AffinityDescriptor}, to be used to apply affinity settings via {@code Needle}.
+ *
+ * @see org.sheinbergon.needle.Needle
+ */
+ @Nonnull
+ private AffinityDescriptor affinity;
+ /**
+ * The match target qualifier, used to extract the match target string from a given {@code Thread}.
+ */
+ @Nullable
+ private Qualifier qualifier;
+ /**
+ * The matching logic encapsulation (determined upon deserialization).
+ */
+ @Nullable
+ private Matcher matcher;
+ /**
+ * The affinity group's identifier, meant to be used for descriptive purposes only.
+ */
+ @Nonnull
+ private String identifier;
+
+ /**
+ * @param target the match target to be matched using this affinity group's {@link AffinityGroup#matcher}.
+ * @return A boolean value indicating whether or not this group matches the given target or not.
+ */
+ public boolean matches(final @Nonnull String target) {
+ return matcher.matches(target);
+ }
+ }
+
+ /**
+ * A collection of affinity group descriptors used to match different affinity descriptors to threads upon
+ * instantiation.
+ */
+ @Nullable
+ private List affinityGroups;
+
+ /**
+ * The default affinity to use for all threads without a precise {@link AffinityGroup} match.
+ */
+ @Nonnull
+ private AffinityDescriptor defaultAffinity;
+}
diff --git a/agent/src/main/java/org/sheinbergon/needle/agent/package-info.java b/agent/src/main/java/org/sheinbergon/needle/agent/package-info.java
new file mode 100644
index 0000000..3acbc4e
--- /dev/null
+++ b/agent/src/main/java/org/sheinbergon/needle/agent/package-info.java
@@ -0,0 +1,7 @@
+/**
+ * Needle Agent core package.
+ *
+ * @author sheinbergon
+ * @since 0.2.0
+ */
+package org.sheinbergon.needle.agent;
diff --git a/agent/src/main/java/org/sheinbergon/needle/agent/util/AffinityGroupMatcher.java b/agent/src/main/java/org/sheinbergon/needle/agent/util/AffinityGroupMatcher.java
new file mode 100644
index 0000000..7b613b8
--- /dev/null
+++ b/agent/src/main/java/org/sheinbergon/needle/agent/util/AffinityGroupMatcher.java
@@ -0,0 +1,147 @@
+package org.sheinbergon.needle.agent.util;
+
+import com.google.common.collect.Lists;
+import lombok.Setter;
+import lombok.val;
+import org.apache.commons.lang3.ObjectUtils;
+import org.sheinbergon.needle.agent.NeedleAgentConfiguration;
+import org.sheinbergon.needle.util.NeedleException;
+
+import javax.annotation.Nonnull;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+public final class AffinityGroupMatcher {
+
+ /**
+ * Generated default {@link org.sheinbergon.needle.agent.NeedleAgentConfiguration.AffinityGroup} identifier.
+ */
+ private static final String DEFAULT_AFFINITY_GROUP_IDENTIFIER = "default";
+
+ /**
+ * Thread Class FQDN based matching affinity groups.
+ */
+ private static final List CLASS_MATCHING_AFFINITY_GROUPS =
+ Lists.newArrayList();
+
+ /**
+ * Thread name based matching affinity groups.
+ */
+ private static final List NAME_MATCHING_AFFINITY_GROUPS =
+ Lists.newArrayList();
+
+ /**
+ * {@link NeedleAgentConfiguration} supplier.
+ *
+ * Note: Due to bootstrapping agent class loading concerns, we use this supplier to defer configuration
+ * deserialization to ongoing JVM {@code Thread} instantiation.
+ *
+ * @see YamlCodec
+ * @see org.sheinbergon.needle.agent.NeedleAgent
+ */
+ @Setter
+ @Nonnull
+ private static volatile Supplier configurationSupplier =
+ () -> NeedleAgentConfiguration.DEFAULT;
+
+ /**
+ * This flag is used to ensure configuration is deserialized only once, during the first instantiated thread
+ * startup.
+ */
+ private static volatile boolean initialized = false;
+
+
+ /**
+ * This variable contains the default (fallback) affinity group, used to match affinity to all threads without
+ * an precisely matched affinity group.
+ */
+ private static NeedleAgentConfiguration.AffinityGroup defaultAffinityGroup = null;
+
+ /**
+ * Match an affintiy group for a given {@code Thread} according to the following logic
+ *
+ * 1. Initialize affinity group constructors/data, if not previously initialized.
+ * 2. Try and find a thread-name based matching affinity group.
+ * 3. If no match was previously found, try and find a thread-class based matching affinity group.
+ * 4. If no match was previosuly found, use the defauly affinity group.
+ *
+ * @param thread The thread for which an affinity group should be matched.
+ * @return the matching {@link NeedleAgentConfiguration.AffinityGroup}.
+ */
+ @Nonnull
+ public static NeedleAgentConfiguration.AffinityGroup forThread(final @Nonnull Thread thread) {
+ if (!initialized) {
+ initialize();
+ initialized = true;
+ }
+ return forThreadName(thread)
+ .or(() -> forThreadClass(thread))
+ .orElse(defaultAffinityGroup);
+ }
+
+ @Nonnull
+ private static Optional forThreadName(final @Nonnull Thread thread) {
+ val target = thread.getName();
+ return forTarget(target, NAME_MATCHING_AFFINITY_GROUPS);
+ }
+
+ @Nonnull
+ private static Optional forThreadClass(final @Nonnull Thread thread) {
+ val target = thread.getClass().getName();
+ return forTarget(target, CLASS_MATCHING_AFFINITY_GROUPS);
+ }
+
+ @Nonnull
+ private static Optional forTarget(
+ final @Nonnull String target,
+ final @Nonnull Collection affinityGroups) {
+ NeedleAgentConfiguration.AffinityGroup matched = null;
+ for (NeedleAgentConfiguration.AffinityGroup group : affinityGroups) {
+ if (group.matches(target)) {
+ matched = group;
+ break;
+ }
+ }
+ return Optional.ofNullable(matched);
+ }
+
+ private static void initialize() {
+ val configuration = configurationSupplier.get();
+ defaultAffinityGroup = defaultAffinityGroup(configuration);
+ val affinityGroups = affinityGroups(configuration);
+ for (NeedleAgentConfiguration.AffinityGroup group : affinityGroups) {
+ val qualifier = group.qualifier();
+ switch (qualifier) {
+ case NAME:
+ NAME_MATCHING_AFFINITY_GROUPS.add(group);
+ break;
+ case CLASS:
+ CLASS_MATCHING_AFFINITY_GROUPS.add(group);
+ break;
+ default:
+ throw new NeedleException(
+ String.format("Unsupported affinity group qualifier '%s'",
+ qualifier));
+ }
+ }
+ }
+
+ @Nonnull
+ private static NeedleAgentConfiguration.AffinityGroup defaultAffinityGroup(
+ final @Nonnull NeedleAgentConfiguration configuration) {
+ return new NeedleAgentConfiguration.AffinityGroup()
+ .identifier(DEFAULT_AFFINITY_GROUP_IDENTIFIER)
+ .affinity(configuration.defaultAffinity());
+ }
+
+ @Nonnull
+ private static List affinityGroups(
+ final @Nonnull NeedleAgentConfiguration configuration) {
+ return ObjectUtils.defaultIfNull(configuration.affinityGroups(), List.of());
+ }
+
+ private AffinityGroupMatcher() {
+ }
+}
diff --git a/agent/src/main/java/org/sheinbergon/needle/agent/util/NeedleAgentException.java b/agent/src/main/java/org/sheinbergon/needle/agent/util/NeedleAgentException.java
new file mode 100644
index 0000000..6ebce95
--- /dev/null
+++ b/agent/src/main/java/org/sheinbergon/needle/agent/util/NeedleAgentException.java
@@ -0,0 +1,26 @@
+package org.sheinbergon.needle.agent.util;
+
+import org.sheinbergon.needle.util.NeedleException;
+
+import javax.annotation.Nonnull;
+
+public final class NeedleAgentException extends NeedleException {
+
+ /**
+ * Instantiate an {@link NeedleAgentException} using the given message.
+ *
+ * @param message the error message.
+ */
+ public NeedleAgentException(final @Nonnull String message) {
+ super(message);
+ }
+
+ /**
+ * Instantiate an {@link NeedleAgentException} using the given exception.
+ *
+ * @param x the exception
+ */
+ public NeedleAgentException(final @Nonnull Exception x) {
+ super(x);
+ }
+}
diff --git a/agent/src/main/java/org/sheinbergon/needle/agent/util/YamlCodec.java b/agent/src/main/java/org/sheinbergon/needle/agent/util/YamlCodec.java
new file mode 100644
index 0000000..2f1bf2d
--- /dev/null
+++ b/agent/src/main/java/org/sheinbergon/needle/agent/util/YamlCodec.java
@@ -0,0 +1,95 @@
+package org.sheinbergon.needle.agent.util;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import lombok.val;
+import org.sheinbergon.needle.AffinityDescriptor;
+import org.sheinbergon.needle.agent.NeedleAgentConfiguration;
+import org.sheinbergon.needle.util.NeedleException;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.net.URL;
+import java.util.regex.Pattern;
+
+public final class YamlCodec {
+
+ /**
+ * Configuration deserialization jackson {@link ObjectReader} settings.
+ */
+ private static final ObjectReader JACKSON = new ObjectMapper(new YAMLFactory())
+ .setSerializationInclusion(JsonInclude.Include.NON_ABSENT)
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+ .setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.NONE)
+ .setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
+ .addMixIn(AffinityDescriptor.class, AffinityDescriptorMixIn.class)
+ .addMixIn(Pattern.class, RegexPatternMixIn.class)
+ .readerFor(NeedleAgentConfiguration.class);
+
+ /**
+ * @param url Agent configuration file URL.
+ * @return Deserialized {@link NeedleAgentConfiguration} instance.
+ * @throws NeedleAgentException - in case of configuration yaml parsing error.
+ */
+ @Nonnull
+ public static NeedleAgentConfiguration parseConfiguration(final @Nonnull URL url) throws NeedleException {
+ try {
+ return JACKSON.readValue(url);
+ } catch (IOException iox) {
+ throw new NeedleAgentException(iox);
+ }
+ }
+
+ @JsonDeserialize(using = AffinityDescriptorDeserializer.class)
+ private interface AffinityDescriptorMixIn {
+ }
+
+ @JsonDeserialize(using = RegexPatternDeserializer.class)
+ private interface RegexPatternMixIn {
+ }
+
+ private static class AffinityDescriptorDeserializer extends JsonDeserializer {
+
+ @Override
+ public AffinityDescriptor deserialize(
+ final JsonParser parser,
+ final DeserializationContext context) throws IOException {
+ val codec = parser.getCodec();
+ val node = (JsonNode) codec.readTree(parser);
+ if (node.isTextual()) {
+ return AffinityDescriptor.from(node.asText());
+ } else if (node.isIntegralNumber()) {
+ return AffinityDescriptor.from(node.asLong());
+ } else {
+ throw new NeedleException(
+ String.format("Unsupported affinity descriptor value node type - %s",
+ node.getClass().getSimpleName()));
+ }
+ }
+ }
+
+ private static class RegexPatternDeserializer extends JsonDeserializer {
+
+ @Override
+ public Pattern deserialize(
+ final JsonParser parser,
+ final DeserializationContext context) throws IOException {
+ val codec = parser.getCodec();
+ val node = (JsonNode) codec.readTree(parser);
+ return Pattern.compile(node.asText());
+ }
+ }
+
+ private YamlCodec() {
+ }
+}
diff --git a/agent/src/main/java/org/sheinbergon/needle/agent/util/package-info.java b/agent/src/main/java/org/sheinbergon/needle/agent/util/package-info.java
new file mode 100644
index 0000000..510b668
--- /dev/null
+++ b/agent/src/main/java/org/sheinbergon/needle/agent/util/package-info.java
@@ -0,0 +1,8 @@
+/**
+ * Needle Agent Utilities.
+ *
+ * @author sheinbergon
+ * @since 0.2.0
+ */
+
+package org.sheinbergon.needle.agent.util;
diff --git a/agent/src/test/kotlin/org/sheinbergon/needle/agent/NeedleAgentTest.kt b/agent/src/test/kotlin/org/sheinbergon/needle/agent/NeedleAgentTest.kt
new file mode 100644
index 0000000..8256d39
--- /dev/null
+++ b/agent/src/test/kotlin/org/sheinbergon/needle/agent/NeedleAgentTest.kt
@@ -0,0 +1,100 @@
+package org.sheinbergon.needle.agent
+
+import com.sun.tools.attach.VirtualMachine
+import org.amshove.kluent.shouldBeEqualTo
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+import org.sheinbergon.needle.*
+import org.sheinbergon.needle.util.NeedleAffinity
+import java.nio.file.Paths
+
+class AffinityAgentTest {
+
+ companion object {
+ private val AGENT_PATH = System.getProperty("test.agent.jar.path")!!
+
+ private val CONFIGURATION_PATH = AffinityAgentTest::class.java
+ .getResource("/test-configuration.yml")
+ .toString()
+
+ private const val THREAD_NAME_PREFIX = "needle-agent-thread"
+
+ private val firstCoreAffinity = AffinityDescriptor.from(`1L`)
+
+ private val secondCoreAffinity = AffinityDescriptor.from(`2L`)
+
+ private val defaultAffinity = org.sheinbergon.needle.default
+ }
+
+ @BeforeEach
+ fun setup() {
+ val agentUrl = NeedleAgent::class.java.getResource(AGENT_PATH)
+ val agentFile = Paths.get(agentUrl.toURI()).toFile().absolutePath
+ val pid = ProcessHandle.current().pid()
+ val vm = VirtualMachine.attach(pid.toString())
+ vm.loadAgent(agentFile, CONFIGURATION_PATH);
+ vm.detach()
+ }
+
+ @Test
+ fun `Verify prefix, thread-name based agent configuration`() {
+ lateinit var affinity: AffinityDescriptor
+ val thread = Thread { affinity = Needle.affinity() }
+ thread.name = "$THREAD_NAME_PREFIX-0"
+ thread.start()
+ thread.join()
+ affinity.mask() shouldBeEqualTo firstCoreAffinity.mask()
+ affinity.toString() shouldBeEqualTo firstCoreAffinity.toString()
+ }
+
+ @Test
+ fun `Verify regex, thread-class based agent configuration`() {
+ lateinit var affinity: AffinityDescriptor
+ val thread = NeedleAgentThread { affinity = Needle.affinity() }
+ thread.start()
+ thread.join()
+ affinity.mask() shouldBeEqualTo secondCoreAffinity.mask()
+ affinity.toString() shouldBeEqualTo secondCoreAffinity.toString()
+ }
+
+ @Test
+ fun `Verify default agent configuration`() {
+ lateinit var affinity: AffinityDescriptor
+ val thread = Thread { affinity = Needle.affinity() }
+ thread.start()
+ thread.join()
+ affinity.mask() shouldBeEqualTo defaultAffinity.mask()
+ affinity.toString() shouldBeEqualTo defaultAffinity.toString()
+ }
+
+
+ @Test
+ fun `Verify NeedleAffinity annotation agent exclusion`() {
+ lateinit var affinity: AffinityDescriptor
+ val thread = ExcludedAnnotatedNeedleAgentThread { affinity = Needle.affinity() }
+ thread.start()
+ thread.join()
+ affinity.mask() shouldBeEqualTo defaultAffinity.mask()
+ affinity.toString() shouldBeEqualTo defaultAffinity.toString()
+ }
+
+ @Test
+ fun `Verify PinnedThread subclassing agent exclusion`() {
+ lateinit var affinity: AffinityDescriptor
+ val thread = ExcludedPinnedNeedleAgentThread { affinity = Needle.affinity() }
+ thread.start()
+ thread.join()
+ affinity.mask() shouldBeEqualTo defaultAffinity.mask()
+ affinity.toString() shouldBeEqualTo defaultAffinity.toString()
+ }
+}
+
+// Included for affinity group due to class <-> regex matching heuristic
+class NeedleAgentThread(runnable: () -> Unit) : Thread(runnable)
+
+// Should have the same rules as above applied, but is excluded due parent class (PinnedThread, which implements Pinned)
+class ExcludedPinnedNeedleAgentThread(runnable: () -> Unit) : PinnedThread(runnable)
+
+// Should have the same rules as above applied, but is excluded due class annotation @NeedleAffinity
+@NeedleAffinity
+class ExcludedAnnotatedNeedleAgentThread(runnable: () -> Unit) : Thread(runnable)
diff --git a/agent/src/test/resources/test-configuration.yml b/agent/src/test/resources/test-configuration.yml
new file mode 100644
index 0000000..8808f04
--- /dev/null
+++ b/agent/src/test/resources/test-configuration.yml
@@ -0,0 +1,14 @@
+defaultAffinity: 0
+affinityGroups:
+ - identifier: test-name-prefix
+ affinity: "0"
+ qualifier: NAME
+ matcher:
+ type: PREFIX
+ prefix: "needle-agent-thread"
+ - identifier: test-class-regex
+ affinity: 2
+ qualifier: CLASS
+ matcher:
+ type: REGEX
+ pattern: "org\\.sheinbergon\\..+NeedleAgentThread.*"
\ No newline at end of file
diff --git a/build.gradle b/build.gradle
index 9e0fb73..84e31f0 100644
--- a/build.gradle
+++ b/build.gradle
@@ -31,7 +31,7 @@ allprojects {
ext.nexus.url = nexus.target.equals('SNAPSHOT') ? ossSnapshotsRepositoryUrl : ossReleasesRepositoryUrl
group "org.sheinbergon"
- version "0.1.4"
+ version "0.2.0"
sourceCompatibility = 11
@@ -68,20 +68,6 @@ subprojects {
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.6.2'
}
- compileTestKotlin {
- dependsOn('detekt')
- kotlinOptions {
- jvmTarget = 11
- }
- }
-
- compileKotlin {
- dependsOn('detekt')
- kotlinOptions {
- jvmTarget = 11
- }
- }
-
jacoco {
toolVersion = "0.8.5"
}
@@ -121,6 +107,20 @@ subprojects {
version = "$version-${nexus.target}"
+ compileTestKotlin {
+ dependsOn('detekt')
+ kotlinOptions {
+ jvmTarget = 11
+ }
+ }
+
+ compileKotlin {
+ dependsOn('detekt')
+ kotlinOptions {
+ jvmTarget = 11
+ }
+ }
+
signing {
required { nexus.target == 'RELEASE' }
useInMemoryPgpKeys(signing.gpgPrivateKey, signing.gpgPassphrase)
diff --git a/codecov.yml b/codecov.yml
index 8300583..082fa07 100644
--- a/codecov.yml
+++ b/codecov.yml
@@ -1,4 +1,6 @@
coverage:
precision: 2
round: up
- range: "75..95"
\ No newline at end of file
+ range: "75..95"
+ignore:
+ - "agent/**/*"
\ No newline at end of file
diff --git a/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/FixedAffinityPinnedThreadFactoryTest.kt b/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/FixedAffinityPinnedThreadFactoryTest.kt
index 81993fc..7cfe7c3 100644
--- a/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/FixedAffinityPinnedThreadFactoryTest.kt
+++ b/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/FixedAffinityPinnedThreadFactoryTest.kt
@@ -9,39 +9,39 @@ import java.util.concurrent.RecursiveAction
class FixedAffinityPinnedThreadFactoryTest {
- @Test
- fun `Initialize the factory`() {
- val factory = FixedAffinityPinnedThreadFactory(testAffinityDescriptor)
- testPinnedThreadInception(factory)
- testPinnedForkJoinWorkerThreadInception(factory)
- }
+ @Test
+ fun `Initialize the factory`() {
+ val factory = FixedAffinityPinnedThreadFactory(testAffinityDescriptor)
+ testPinnedThreadInception(factory)
+ testPinnedForkJoinWorkerThreadInception(factory)
+ }
- private fun testPinnedThreadInception(factory: PinnedThreadFactory) {
- val latch = CountDownLatch(`1`)
- val pinned = factory.newThread(task(latch))
- pinned?.start()
- latch.await()
- }
+ private fun testPinnedThreadInception(factory: PinnedThreadFactory) {
+ val latch = CountDownLatch(`1`)
+ val pinned = factory.newThread(task(latch))
+ pinned?.start()
+ latch.await()
+ }
- private fun testPinnedForkJoinWorkerThreadInception(factory: PinnedThreadFactory) {
- val latch = CountDownLatch(`1`)
- PinnedForkJoinPool(`1`, factory).use {
- val action = action(latch)
- it.submit(action)
- latch.await()
- Thread.sleep(5L)
- action.isDone.shouldBeTrue()
- }
+ private fun testPinnedForkJoinWorkerThreadInception(factory: PinnedThreadFactory) {
+ val latch = CountDownLatch(`1`)
+ PinnedForkJoinPool(`1`, factory).use {
+ val action = action(latch)
+ it.submit(action)
+ latch.await()
+ Thread.sleep(5L)
+ action.isDone.shouldBeTrue()
}
+ }
- private fun action(latch: CountDownLatch) = object : RecursiveAction() {
- override fun compute() = task(latch).run()
- }
+ private fun action(latch: CountDownLatch) = object : RecursiveAction() {
+ override fun compute() = task(latch).run()
+ }
- private fun task(latch: CountDownLatch) = Runnable {
- val self = Thread.currentThread() as Pinned
- self.affinity().mask() shouldBeEqualTo binaryTestMask
- self.affinity().toString() shouldBeEqualTo textTestMask
- latch.countDown()
- }
+ private fun task(latch: CountDownLatch) = Runnable {
+ val self = Thread.currentThread() as Pinned
+ self.affinity().mask() shouldBeEqualTo binaryTestMask
+ self.affinity().toString() shouldBeEqualTo textTestMask
+ latch.countDown()
+ }
}
diff --git a/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/GovernedAffinityPinnedThreadFactoryTest.kt b/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/GovernedAffinityPinnedThreadFactoryTest.kt
index 0c09843..d0b9055 100644
--- a/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/GovernedAffinityPinnedThreadFactoryTest.kt
+++ b/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/GovernedAffinityPinnedThreadFactoryTest.kt
@@ -9,116 +9,116 @@ import java.util.concurrent.RecursiveAction
class GovernedAffinityPinnedThreadFactoryTest {
- private lateinit var latch: ResettableOneOffLatch
+ private lateinit var latch: ResettableOneOffLatch
- private fun unlatchAndSleepTask() = Runnable {
- latch.fire()
- runCatching { Thread.sleep(1000L) }
- }
+ private fun unlatchAndSleepTask() = Runnable {
+ latch.fire()
+ runCatching { Thread.sleep(1000L) }
+ }
- private inner class UnlatchAndSleepAction : RecursiveAction() {
+ private inner class UnlatchAndSleepAction : RecursiveAction() {
- lateinit var pinned: Pinned
- private set
+ lateinit var pinned: Pinned
+ private set
- override fun compute() {
- pinned = Thread.currentThread() as PinnedThread.ForkJoinWorker
- unlatchAndSleepTask().run()
- }
+ override fun compute() {
+ pinned = Thread.currentThread() as PinnedThread.ForkJoinWorker
+ unlatchAndSleepTask().run()
}
+ }
- @BeforeEach
- fun setup() {
- latch = ResettableOneOffLatch(true)
- }
+ @BeforeEach
+ fun setup() {
+ latch = ResettableOneOffLatch(true)
+ }
- @Test
- fun `Initialize the factory without a mask and alter the affinity of a created pinned thread`() {
- val factory = GovernedAffinityPinnedThreadFactory()
- val pinned = factory.newThread(unlatchAndSleepTask())
- pinned!!.start()
- latch.await(true)
- val original = pinned.affinity()
- original.mask() shouldBeEqualTo default.mask()
- factory.alter(negatedTestAffinityDescriptor, true)
- val altered = pinned.affinity()
- altered.mask() shouldBeEqualTo negatedBinaryTestMask
- }
+ @Test
+ fun `Initialize the factory without a mask and alter the affinity of a created pinned thread`() {
+ val factory = GovernedAffinityPinnedThreadFactory()
+ val pinned = factory.newThread(unlatchAndSleepTask())
+ pinned!!.start()
+ latch.await(true)
+ val original = pinned.affinity()
+ original.mask() shouldBeEqualTo default.mask()
+ factory.alter(negatedTestAffinityDescriptor, true)
+ val altered = pinned.affinity()
+ altered.mask() shouldBeEqualTo negatedBinaryTestMask
+ }
- @Test
- fun `Initialize the factory using a binary mask and alter the affinity of newly created pinned threads`() {
- val factory = GovernedAffinityPinnedThreadFactory(testAffinityDescriptor)
- val pinned1 = factory.newThread(unlatchAndSleepTask())
- pinned1!!.start()
- latch.await(true)
- val original = pinned1.affinity()
- original.mask() shouldBeEqualTo binaryTestMask
- factory.alter(negatedTestAffinityDescriptor, false)
- val unaltered = pinned1.affinity()
- unaltered.mask() shouldBeEqualTo binaryTestMask
- val pinned2 = factory.newThread(unlatchAndSleepTask())
- pinned2!!.start()
- latch.await(false)
- val altered = pinned2.affinity()
- altered.mask() shouldBeEqualTo negatedBinaryTestMask
- }
+ @Test
+ fun `Initialize the factory using a binary mask and alter the affinity of newly created pinned threads`() {
+ val factory = GovernedAffinityPinnedThreadFactory(testAffinityDescriptor)
+ val pinned1 = factory.newThread(unlatchAndSleepTask())
+ pinned1!!.start()
+ latch.await(true)
+ val original = pinned1.affinity()
+ original.mask() shouldBeEqualTo binaryTestMask
+ factory.alter(negatedTestAffinityDescriptor, false)
+ val unaltered = pinned1.affinity()
+ unaltered.mask() shouldBeEqualTo binaryTestMask
+ val pinned2 = factory.newThread(unlatchAndSleepTask())
+ pinned2!!.start()
+ latch.await(false)
+ val altered = pinned2.affinity()
+ altered.mask() shouldBeEqualTo negatedBinaryTestMask
+ }
- @Test
- fun `Verify governed pinned threads factory behavior`() {
- val factory = GovernedAffinityPinnedThreadFactory(testAffinityDescriptor)
- factory.governed() shouldBeEqualTo `0`
- val pinned1 = factory.newThread(unlatchAndSleepTask())
- pinned1!!.start()
- factory.governed() shouldBeEqualTo `1`
- latch.await(true)
- val original = pinned1.affinity()
- original.mask() shouldBeEqualTo binaryTestMask
- factory.alter(negatedTestAffinityDescriptor, true)
- val altered = pinned1.affinity()
- altered.mask() shouldBeEqualTo negatedBinaryTestMask
- val pinned2 = factory.newThread(unlatchAndSleepTask())
- pinned2!!.start()
- factory.governed() shouldBeEqualTo `2`
- latch.await(false)
- Thread.sleep(2000L)
- factory.governed() shouldBeEqualTo `0`
- }
+ @Test
+ fun `Verify governed pinned threads factory behavior`() {
+ val factory = GovernedAffinityPinnedThreadFactory(testAffinityDescriptor)
+ factory.governed() shouldBeEqualTo `0`
+ val pinned1 = factory.newThread(unlatchAndSleepTask())
+ pinned1!!.start()
+ factory.governed() shouldBeEqualTo `1`
+ latch.await(true)
+ val original = pinned1.affinity()
+ original.mask() shouldBeEqualTo binaryTestMask
+ factory.alter(negatedTestAffinityDescriptor, true)
+ val altered = pinned1.affinity()
+ altered.mask() shouldBeEqualTo negatedBinaryTestMask
+ val pinned2 = factory.newThread(unlatchAndSleepTask())
+ pinned2!!.start()
+ factory.governed() shouldBeEqualTo `2`
+ latch.await(false)
+ Thread.sleep(2000L)
+ factory.governed() shouldBeEqualTo `0`
+ }
- @Test
- fun `Initialize the factory without a mask and alter the affinity of created pinned fork-join threads`() {
- val factory = GovernedAffinityPinnedThreadFactory()
- PinnedForkJoinPool(`1`, factory).use { pool ->
- val action = UnlatchAndSleepAction()
- pool.execute(action)
- latch.await(true)
- val pinned = action.pinned
- val original = pinned.affinity()
- original.mask() shouldBeEqualTo default.mask()
- factory.alter(negatedTestAffinityDescriptor, true)
- val altered = pinned.affinity()
- altered.mask() shouldBeEqualTo negatedBinaryTestMask
- }
+ @Test
+ fun `Initialize the factory without a mask and alter the affinity of created pinned fork-join threads`() {
+ val factory = GovernedAffinityPinnedThreadFactory()
+ PinnedForkJoinPool(`1`, factory).use { pool ->
+ val action = UnlatchAndSleepAction()
+ pool.execute(action)
+ latch.await(true)
+ val pinned = action.pinned
+ val original = pinned.affinity()
+ original.mask() shouldBeEqualTo default.mask()
+ factory.alter(negatedTestAffinityDescriptor, true)
+ val altered = pinned.affinity()
+ altered.mask() shouldBeEqualTo negatedBinaryTestMask
}
+ }
- @Test
- fun `Initialize the factory using a mask and alter the affinity of newly created pinned fork-join threads`() {
- val factory = GovernedAffinityPinnedThreadFactory(testAffinityDescriptor)
- PinnedForkJoinPool(`2`, factory).use { pool ->
- val action1 = UnlatchAndSleepAction()
- pool.execute(action1)
- latch.await(true)
- val pinned1 = action1.pinned
- val original = pinned1.affinity()
- original.mask() shouldBeEqualTo binaryTestMask
- factory.alter(negatedTestAffinityDescriptor, false)
- val unaltered = pinned1.affinity()
- unaltered.mask() shouldBeEqualTo binaryTestMask
- val action2 = UnlatchAndSleepAction()
- pool.execute(action2)
- latch.await(false)
- val pinned2 = action2.pinned
- val altered = pinned2.affinity()
- altered.mask() shouldBeEqualTo negatedBinaryTestMask
- }
+ @Test
+ fun `Initialize the factory using a mask and alter the affinity of newly created pinned fork-join threads`() {
+ val factory = GovernedAffinityPinnedThreadFactory(testAffinityDescriptor)
+ PinnedForkJoinPool(`2`, factory).use { pool ->
+ val action1 = UnlatchAndSleepAction()
+ pool.execute(action1)
+ latch.await(true)
+ val pinned1 = action1.pinned
+ val original = pinned1.affinity()
+ original.mask() shouldBeEqualTo binaryTestMask
+ factory.alter(negatedTestAffinityDescriptor, false)
+ val unaltered = pinned1.affinity()
+ unaltered.mask() shouldBeEqualTo binaryTestMask
+ val action2 = UnlatchAndSleepAction()
+ pool.execute(action2)
+ latch.await(false)
+ val pinned2 = action2.pinned
+ val altered = pinned2.affinity()
+ altered.mask() shouldBeEqualTo negatedBinaryTestMask
}
+ }
}
diff --git a/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/PinnedForkJoinPoolTest.kt b/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/PinnedForkJoinPoolTest.kt
index 48b58da..2019df7 100644
--- a/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/PinnedForkJoinPoolTest.kt
+++ b/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/PinnedForkJoinPoolTest.kt
@@ -9,23 +9,23 @@ import java.util.concurrent.CountDownLatch
class PinnedForkJoinPoolTest {
- @Test
- fun `Fixed affinity PinnedForkJoinPool behavior`() {
- val pool = PinnedForkJoinPool(availableCores, TestMaskPinnedThreadFactory)
- pool.use {
- testPinnedThreadExecutor(availableCores, pool)
- }
+ @Test
+ fun `Fixed affinity PinnedForkJoinPool behavior`() {
+ val pool = PinnedForkJoinPool(availableCores, TestMaskPinnedThreadFactory)
+ pool.use {
+ testPinnedThreadExecutor(availableCores, pool)
}
+ }
- private fun testPinnedThreadExecutor(concurrency: Int, pool: PinnedForkJoinPool) = pool.use {
- val visited = Sets.newConcurrentHashSet()
- val latch = CountDownLatch(concurrency)
- pool.parallelism shouldBeEqualTo concurrency
- val actions = (`0` until concurrency).map { recursiveAction(latch, visited) }
- val tasks = actions.map { pool.submit(it) }
- latch.await()
- Thread.sleep(5L)
- tasks.forEach { it.isDone shouldBe true }
- visited.size shouldBeEqualTo concurrency
- }
+ private fun testPinnedThreadExecutor(concurrency: Int, pool: PinnedForkJoinPool) = pool.use {
+ val visited = Sets.newConcurrentHashSet()
+ val latch = CountDownLatch(concurrency)
+ pool.parallelism shouldBeEqualTo concurrency
+ val actions = (`0` until concurrency).map { recursiveAction(latch, visited) }
+ val tasks = actions.map { pool.submit(it) }
+ latch.await()
+ Thread.sleep(5L)
+ tasks.forEach { it.isDone shouldBe true }
+ visited.size shouldBeEqualTo concurrency
+ }
}
diff --git a/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/PinnedThreadPoolExecutorTest.kt b/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/PinnedThreadPoolExecutorTest.kt
index 1dd5125..06f2bf6 100644
--- a/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/PinnedThreadPoolExecutorTest.kt
+++ b/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/PinnedThreadPoolExecutorTest.kt
@@ -9,27 +9,27 @@ import java.util.concurrent.CountDownLatch
class PinnedThreadPoolExecutorTest {
- @Test
- fun `Single pinnned thread executor`() {
- val executor = PinnedThreadPoolExecutor.newSinglePinnedThreadExecutor(TestMaskPinnedThreadFactory)
- testPinnedThreadExecutor(`1`, executor as PinnedThreadPoolExecutor)
- }
+ @Test
+ fun `Single pinnned thread executor`() {
+ val executor = PinnedThreadPoolExecutor.newSinglePinnedThreadExecutor(TestMaskPinnedThreadFactory)
+ testPinnedThreadExecutor(`1`, executor as PinnedThreadPoolExecutor)
+ }
- @Test
- fun `Fixed pinned thread pool executor`() {
- val executor = PinnedThreadPoolExecutor.newFixedPinnedThreadPool(availableCores, TestMaskPinnedThreadFactory)
- testPinnedThreadExecutor(availableCores, executor as PinnedThreadPoolExecutor)
- }
+ @Test
+ fun `Fixed pinned thread pool executor`() {
+ val executor = PinnedThreadPoolExecutor.newFixedPinnedThreadPool(availableCores, TestMaskPinnedThreadFactory)
+ testPinnedThreadExecutor(availableCores, executor as PinnedThreadPoolExecutor)
+ }
- private fun testPinnedThreadExecutor(concurrency: Int, pool: PinnedThreadPoolExecutor) = pool.use {
- val visited = Sets.newConcurrentHashSet()
- val latch = CountDownLatch(concurrency)
- pool.corePoolSize shouldBeEqualTo concurrency
- val tasks = (`0` until concurrency).map { callableTask(latch, visited) }
- val futures = pool.invokeAll(tasks)
- latch.await()
- Thread.sleep(5L)
- futures.forEach { it.isDone shouldBe true }
- visited.size shouldBeEqualTo concurrency
- }
+ private fun testPinnedThreadExecutor(concurrency: Int, pool: PinnedThreadPoolExecutor) = pool.use {
+ val visited = Sets.newConcurrentHashSet()
+ val latch = CountDownLatch(concurrency)
+ pool.corePoolSize shouldBeEqualTo concurrency
+ val tasks = (`0` until concurrency).map { callableTask(latch, visited) }
+ val futures = pool.invokeAll(tasks)
+ latch.await()
+ Thread.sleep(5L)
+ futures.forEach { it.isDone shouldBe true }
+ visited.size shouldBeEqualTo concurrency
+ }
}
diff --git a/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/ScheduledPinnedThreadPoolExecutorTest.kt b/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/ScheduledPinnedThreadPoolExecutorTest.kt
index 03d4eac..972e8fd 100644
--- a/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/ScheduledPinnedThreadPoolExecutorTest.kt
+++ b/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/ScheduledPinnedThreadPoolExecutorTest.kt
@@ -9,33 +9,33 @@ import java.util.concurrent.TimeUnit
class ScheduledPinnedThreadPoolExecutorTest {
- @Test
- fun `Single pinned thread scheduled executor`() {
- ScheduledPinnedThreadPoolExecutor
- .newSinglePinnedThreadScheduledExecutor(TestMaskPinnedThreadFactory)
- .let { testPinnedThreadExecutor(`1`, it as ScheduledPinnedThreadPoolExecutor) }
- }
+ @Test
+ fun `Single pinned thread scheduled executor`() {
+ ScheduledPinnedThreadPoolExecutor
+ .newSinglePinnedThreadScheduledExecutor(TestMaskPinnedThreadFactory)
+ .let { testPinnedThreadExecutor(`1`, it as ScheduledPinnedThreadPoolExecutor) }
+ }
- @Test
- fun `Pooled pinned thread scheduled executor`() {
- ScheduledPinnedThreadPoolExecutor
- .newScheduledPinnedThreadPool(availableCores, TestMaskPinnedThreadFactory)
- .let { testPinnedThreadExecutor(availableCores, it as ScheduledPinnedThreadPoolExecutor) }
- }
+ @Test
+ fun `Pooled pinned thread scheduled executor`() {
+ ScheduledPinnedThreadPoolExecutor
+ .newScheduledPinnedThreadPool(availableCores, TestMaskPinnedThreadFactory)
+ .let { testPinnedThreadExecutor(availableCores, it as ScheduledPinnedThreadPoolExecutor) }
+ }
- private fun testPinnedThreadExecutor(
- concurrency: Int,
- scheduler: ScheduledPinnedThreadPoolExecutor
- ) = scheduler.use {
- val visited = Sets.newConcurrentHashSet()
- val latch = CountDownLatch(concurrency)
- scheduler.corePoolSize shouldBeEqualTo concurrency
- val futures = (`0` until concurrency)
- .map { runnableTask(latch, visited) }
- .map { scheduler.schedule(it, SCHEDULING_DELAY, TimeUnit.MILLISECONDS) }
- latch.await()
- Thread.sleep(5L)
- visited.size shouldBeEqualTo concurrency
- futures.forEach { it.isDone shouldBeEqualTo true }
- }
+ private fun testPinnedThreadExecutor(
+ concurrency: Int,
+ scheduler: ScheduledPinnedThreadPoolExecutor
+ ) = scheduler.use {
+ val visited = Sets.newConcurrentHashSet()
+ val latch = CountDownLatch(concurrency)
+ scheduler.corePoolSize shouldBeEqualTo concurrency
+ val futures = (`0` until concurrency)
+ .map { runnableTask(latch, visited) }
+ .map { scheduler.schedule(it, SCHEDULING_DELAY, TimeUnit.MILLISECONDS) }
+ latch.await()
+ Thread.sleep(5L)
+ visited.size shouldBeEqualTo concurrency
+ futures.forEach { it.isDone shouldBeEqualTo true }
+ }
}
diff --git a/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/Support.kt b/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/Support.kt
index 42c9446..e70a0b8 100644
--- a/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/Support.kt
+++ b/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/Support.kt
@@ -8,9 +8,9 @@ import java.util.concurrent.*
internal const val SCHEDULING_DELAY = 500L
internal object TestMaskPinnedThreadFactory : PinnedThreadFactory {
- override fun newThread(r: Runnable) = PinnedThread(r, testAffinityDescriptor)
+ override fun newThread(r: Runnable) = PinnedThread(r, testAffinityDescriptor)
- override fun newThread(pool: ForkJoinPool) = PinnedThread.ForkJoinWorker(pool, testAffinityDescriptor)
+ override fun newThread(pool: ForkJoinPool) = PinnedThread.ForkJoinWorker(pool, testAffinityDescriptor)
}
@Suppress("UNCHECKED_CAST")
@@ -18,14 +18,14 @@ internal fun callableTask(latch: CountDownLatch, visited: MutableSet): C
Executors.callable { runnableTask(latch, visited).run() } as Callable
internal fun recursiveAction(latch: CountDownLatch, visited: MutableSet) = object : RecursiveAction() {
- override fun compute() = runnableTask(latch, visited).run()
+ override fun compute() = runnableTask(latch, visited).run()
}
internal fun runnableTask(latch: CountDownLatch, visited: MutableSet) = Runnable {
- val self = Thread.currentThread() as Pinned
- visited.add(self) shouldBe true
- self.affinity().mask() shouldBeEqualTo binaryTestMask
- self.affinity().toString() shouldBeEqualTo textTestMask
- Thread.sleep(SCHEDULING_DELAY)
- latch.countDown()
+ val self = Thread.currentThread() as Pinned
+ visited.add(self) shouldBe true
+ self.affinity().mask() shouldBeEqualTo binaryTestMask
+ self.affinity().toString() shouldBeEqualTo textTestMask
+ Thread.sleep(SCHEDULING_DELAY)
+ latch.countDown()
}
diff --git a/core/src/main/java/org/sheinbergon/needle/util/NeedleAffinity.java b/core/src/main/java/org/sheinbergon/needle/util/NeedleAffinity.java
new file mode 100644
index 0000000..f0bc769
--- /dev/null
+++ b/core/src/main/java/org/sheinbergon/needle/util/NeedleAffinity.java
@@ -0,0 +1,12 @@
+package org.sheinbergon.needle.util;
+
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface NeedleAffinity {
+}
diff --git a/core/src/main/java/org/sheinbergon/needle/util/NeedleException.java b/core/src/main/java/org/sheinbergon/needle/util/NeedleException.java
index f79e547..f4d1fb1 100644
--- a/core/src/main/java/org/sheinbergon/needle/util/NeedleException.java
+++ b/core/src/main/java/org/sheinbergon/needle/util/NeedleException.java
@@ -4,11 +4,20 @@
public class NeedleException extends RuntimeException {
/**
- * Instantiate an {@code AffinityDescriptorException} using the specified error message.
+ * Instantiate an {@link NeedleException} using the specified error message.
*
* @param message the error message.
*/
public NeedleException(final @Nonnull String message) {
super(message);
}
+
+ /**
+ * Instantiate an {@link NeedleException} using the given exception.
+ *
+ * @param x the exception
+ */
+ public NeedleException(final @Nonnull Exception x) {
+ super(x);
+ }
}
diff --git a/core/src/test/kotlin/org/sheinbergon/needle/AffinityDescriptorTest.kt b/core/src/test/kotlin/org/sheinbergon/needle/AffinityDescriptorTest.kt
index 30a2452..ee35b3f 100644
--- a/core/src/test/kotlin/org/sheinbergon/needle/AffinityDescriptorTest.kt
+++ b/core/src/test/kotlin/org/sheinbergon/needle/AffinityDescriptorTest.kt
@@ -12,62 +12,62 @@ import java.util.*
class AffinityDescriptorTest {
- @Test
- fun `Illegal affinity-descriptor specification - invalid range`() {
- { AffinityDescriptor.from("10-2") } shouldThrow AffinityDescriptorException::class
- }
+ @Test
+ fun `Illegal affinity-descriptor specification - invalid range`() {
+ { AffinityDescriptor.from("10-2") } shouldThrow AffinityDescriptorException::class
+ }
- @Test
- fun `Illegal affinity-descriptor specification - invalid range, mixed specifications`() {
- { AffinityDescriptor.from("5,10-2") } shouldThrow AffinityDescriptorException::class
- }
+ @Test
+ fun `Illegal affinity-descriptor specification - invalid range, mixed specifications`() {
+ { AffinityDescriptor.from("5,10-2") } shouldThrow AffinityDescriptorException::class
+ }
- @Test
- fun `Illegal affinity-descriptor specification - negative value`() {
- { AffinityDescriptor.from(NumberUtils.LONG_MINUS_ONE) } shouldThrow AffinityDescriptorException::class
- }
+ @Test
+ fun `Illegal affinity-descriptor specification - negative value`() {
+ { AffinityDescriptor.from(NumberUtils.LONG_MINUS_ONE) } shouldThrow AffinityDescriptorException::class
+ }
- @Test
- fun `Illegal affinity-descriptor specification - out of bounds mask`() {
- val mask = AffinityDescriptor.MASK_UPPER_BOUND + `1L`
- { AffinityDescriptor.from(mask) } shouldThrow AffinityDescriptorException::class
- }
+ @Test
+ fun `Illegal affinity-descriptor specification - out of bounds mask`() {
+ val mask = AffinityDescriptor.MASK_UPPER_BOUND + `1L`
+ { AffinityDescriptor.from(mask) } shouldThrow AffinityDescriptorException::class
+ }
- @Test
- fun `Illegal affinity-descriptor specification - rubbish`() {
- { AffinityDescriptor.from("needle") } shouldThrow AffinityDescriptorException::class
- }
+ @Test
+ fun `Illegal affinity-descriptor specification - rubbish`() {
+ { AffinityDescriptor.from("needle") } shouldThrow AffinityDescriptorException::class
+ }
- @Test
- fun `Unsupported core set traits`() {
- AffinityDescriptor.UNSUPPORTED.mask() shouldBeEqualTo NumberUtils.LONG_MINUS_ONE
- AffinityDescriptor.UNSUPPORTED.toString() shouldBe null
- }
+ @Test
+ fun `Unsupported core set traits`() {
+ AffinityDescriptor.UNSUPPORTED.mask() shouldBeEqualTo NumberUtils.LONG_MINUS_ONE
+ AffinityDescriptor.UNSUPPORTED.toString() shouldBe null
+ }
- @Test
- fun `Empty mask core set instantiation`() {
- AffinityDescriptor.from(StringUtils.EMPTY) shouldBeEqualTo AffinityDescriptor.process()
- AffinityDescriptor.from(NumberUtils.LONG_ZERO) shouldBeEqualTo AffinityDescriptor.process()
- }
+ @Test
+ fun `Empty mask core set instantiation`() {
+ AffinityDescriptor.from(StringUtils.EMPTY) shouldBeEqualTo AffinityDescriptor.process()
+ AffinityDescriptor.from(NumberUtils.LONG_ZERO) shouldBeEqualTo AffinityDescriptor.process()
+ }
- @Test
- fun `Core set equality`() {
- val ad1 = AffinityDescriptor.from(textTestMask)
- val ad2 = AffinityDescriptor.from(negatedBinaryTestMask)
- val ad3 = AffinityDescriptor.from(textTestMask)
- ad1 shouldNotBeEqualTo null
- ad1 shouldNotBeEqualTo ad2
- ad1 shouldNotBeEqualTo this
- ad1 shouldBeEqualTo ad3
- ad1 shouldBeEqualTo ad1
- }
+ @Test
+ fun `Core set equality`() {
+ val ad1 = AffinityDescriptor.from(textTestMask)
+ val ad2 = AffinityDescriptor.from(negatedBinaryTestMask)
+ val ad3 = AffinityDescriptor.from(textTestMask)
+ ad1 shouldNotBeEqualTo null
+ ad1 shouldNotBeEqualTo ad2
+ ad1 shouldNotBeEqualTo this
+ ad1 shouldBeEqualTo ad3
+ ad1 shouldBeEqualTo ad1
+ }
- @Test
- fun `Core set hashCode`() {
- val ad1 = AffinityDescriptor.from(textTestMask)
- val ad2 = AffinityDescriptor.from(negatedBinaryTestMask)
- ad1.hashCode() shouldBeEqualTo Objects.hashCode(binaryTestMask)
- ad2.hashCode() shouldBeEqualTo Objects.hashCode(negatedBinaryTestMask)
- ad1.hashCode() shouldNotBeEqualTo ad2.hashCode()
- }
+ @Test
+ fun `Core set hashCode`() {
+ val ad1 = AffinityDescriptor.from(textTestMask)
+ val ad2 = AffinityDescriptor.from(negatedBinaryTestMask)
+ ad1.hashCode() shouldBeEqualTo Objects.hashCode(binaryTestMask)
+ ad2.hashCode() shouldBeEqualTo Objects.hashCode(negatedBinaryTestMask)
+ ad1.hashCode() shouldNotBeEqualTo ad2.hashCode()
+ }
}
diff --git a/core/src/test/kotlin/org/sheinbergon/needle/AffinityResolverTest.kt b/core/src/test/kotlin/org/sheinbergon/needle/AffinityResolverTest.kt
index 6aeaf5a..32620e8 100644
--- a/core/src/test/kotlin/org/sheinbergon/needle/AffinityResolverTest.kt
+++ b/core/src/test/kotlin/org/sheinbergon/needle/AffinityResolverTest.kt
@@ -5,8 +5,8 @@ import org.junit.jupiter.api.Test
class AffinityResolverTest {
- @Test
- fun `Process affinity`() {
- AffinityResolver.NoOp.INSTANCE.process() shouldBeEqualTo AffinityDescriptor.UNSUPPORTED
- }
+ @Test
+ fun `Process affinity`() {
+ AffinityResolver.NoOp.INSTANCE.process() shouldBeEqualTo AffinityDescriptor.UNSUPPORTED
+ }
}
diff --git a/core/src/test/kotlin/org/sheinbergon/needle/NeedleTest.kt b/core/src/test/kotlin/org/sheinbergon/needle/NeedleTest.kt
index 727edcf..c42b800 100644
--- a/core/src/test/kotlin/org/sheinbergon/needle/NeedleTest.kt
+++ b/core/src/test/kotlin/org/sheinbergon/needle/NeedleTest.kt
@@ -18,52 +18,52 @@ import kotlin.concurrent.thread
@RunWith(JUnitPlatform::class)
class NeedleTest {
- private lateinit var latch: CountDownLatch
- private lateinit var binaryResult: AtomicLong
- private lateinit var textResult: AtomicRef
+ private lateinit var latch: CountDownLatch
+ private lateinit var binaryResult: AtomicLong
+ private lateinit var textResult: AtomicRef
- @BeforeEach
- fun setup() {
- textResult = atomic(StringUtils.EMPTY)
- binaryResult = atomic(0x1111111111111111)
- latch = CountDownLatch(1)
- }
+ @BeforeEach
+ fun setup() {
+ textResult = atomic(StringUtils.EMPTY)
+ binaryResult = atomic(0x1111111111111111)
+ latch = CountDownLatch(1)
+ }
- @Test
- fun `Unsupported platform behavior - Thread access`() {
- unsupportedPlatform {
- Needle.affinity() shouldBe AffinityDescriptor.UNSUPPORTED
- Needle.affinity(firstCoreAffinityDescriptor)
- Needle.affinity() shouldBe AffinityDescriptor.UNSUPPORTED
- Needle.self() shouldBeEqualTo NumberUtils.LONG_MINUS_ONE
- }
+ @Test
+ fun `Unsupported platform behavior - Thread access`() {
+ unsupportedPlatform {
+ Needle.affinity() shouldBe AffinityDescriptor.UNSUPPORTED
+ Needle.affinity(firstCoreAffinityDescriptor)
+ Needle.affinity() shouldBe AffinityDescriptor.UNSUPPORTED
+ Needle.self() shouldBeEqualTo NumberUtils.LONG_MINUS_ONE
}
+ }
- @Test
- fun `Set affinity for a JVM Thread using a binary mask`() {
- val textMask = textTestMask
- val binaryMask: Long = binaryTestMask
- val affinity = testAffinityDescriptor
- val thread = thread(start = false, block = setAffinityRunnable(affinity))
- launchAndVerify(thread, binaryMask, textMask)
- }
+ @Test
+ fun `Set affinity for a JVM Thread using a binary mask`() {
+ val textMask = textTestMask
+ val binaryMask: Long = binaryTestMask
+ val affinity = testAffinityDescriptor
+ val thread = thread(start = false, block = setAffinityRunnable(affinity))
+ launchAndVerify(thread, binaryMask, textMask)
+ }
- private fun launchAndVerify(thread: Thread, binaryMask: Long, textMask: String) {
- try {
- thread.start()
- latch.await()
- binaryResult.value `should be equal to` binaryMask
- textResult.value `should be equal to` textMask
- } finally {
- thread.interrupt()
- }
+ private fun launchAndVerify(thread: Thread, binaryMask: Long, textMask: String) {
+ try {
+ thread.start()
+ latch.await()
+ binaryResult.value `should be equal to` binaryMask
+ textResult.value `should be equal to` textMask
+ } finally {
+ thread.interrupt()
}
+ }
- private fun setAffinityRunnable(affinity: AffinityDescriptor): () -> Unit = {
- Needle.affinity(affinity)
- val descriptor = Needle.affinity()
- binaryResult.value = descriptor.mask()
- textResult.value = descriptor.toString()
- latch.countDown()
- }
+ private fun setAffinityRunnable(affinity: AffinityDescriptor): () -> Unit = {
+ Needle.affinity(affinity)
+ val descriptor = Needle.affinity()
+ binaryResult.value = descriptor.mask()
+ textResult.value = descriptor.toString()
+ latch.countDown()
+ }
}
diff --git a/core/src/test/kotlin/org/sheinbergon/needle/PinnedThreadTest.kt b/core/src/test/kotlin/org/sheinbergon/needle/PinnedThreadTest.kt
index 8ba5a04..d2e13cb 100644
--- a/core/src/test/kotlin/org/sheinbergon/needle/PinnedThreadTest.kt
+++ b/core/src/test/kotlin/org/sheinbergon/needle/PinnedThreadTest.kt
@@ -14,211 +14,211 @@ import java.util.concurrent.RecursiveAction
class PinnedThreadTest {
- private lateinit var latch: CountDownLatch
-
- @BeforeEach
- fun setup() {
- latch = CountDownLatch(1)
+ private lateinit var latch: CountDownLatch
+
+ @BeforeEach
+ fun setup() {
+ latch = CountDownLatch(1)
+ }
+
+ @Test
+ fun `Access a PinnedThread affinity information without starting it`() {
+ val runnable = unlatchAndSleepRunnable()
+ val pinned = PinnedThread(runnable)
+ Assertions.assertThrows(NeedleException::class.java) { pinned.affinity() }
+ }
+
+ @Test
+ fun `Start a PinnedThread without an affinity descriptor`() {
+ val runnable = unlatchAndSleepRunnable()
+ val pinned = PinnedThread(runnable)
+ try {
+ pinned.start()
+ latch.await()
+ pinned.nativeId().shouldNotBeNull()
+ pinned.affinity() shouldBeEqualTo default
+ } finally {
+ pinned.interrupt()
}
-
- @Test
- fun `Access a PinnedThread affinity information without starting it`() {
- val runnable = unlatchAndSleepRunnable()
- val pinned = PinnedThread(runnable)
- Assertions.assertThrows(NeedleException::class.java) { pinned.affinity() }
+ }
+
+ @Test
+ fun `Start a named PinnedThread without an affinity descriptor`() {
+ val runnable = unlatchAndSleepRunnable()
+ val pinned = PinnedThread(runnable, NEEDLE)
+ try {
+ pinned.start()
+ pinned.name shouldBeEqualTo NEEDLE
+ latch.await()
+ pinned.nativeId().shouldNotBeNull()
+ pinned.affinity() shouldBeEqualTo default
+ } finally {
+ pinned.interrupt()
}
-
- @Test
- fun `Start a PinnedThread without an affinity descriptor`() {
- val runnable = unlatchAndSleepRunnable()
- val pinned = PinnedThread(runnable)
- try {
- pinned.start()
- latch.await()
- pinned.nativeId().shouldNotBeNull()
- pinned.affinity() shouldBeEqualTo default
- } finally {
- pinned.interrupt()
- }
+ }
+
+ @Test
+ fun `Start a PinnedThread with an affinity descriptor set`() {
+ val desiredMask = textTestMask
+ val desiredAffinityDescriptor = testAffinityDescriptor
+ val runnable = unlatchAndSleepRunnable()
+ val pinned = PinnedThread(runnable, desiredAffinityDescriptor)
+ try {
+ pinned.start()
+ latch.await()
+ pinned.nativeId().shouldNotBeNull()
+ pinned.affinity().toString() shouldBeEqualTo desiredMask
+ } finally {
+ pinned.interrupt()
}
-
- @Test
- fun `Start a named PinnedThread without an affinity descriptor`() {
- val runnable = unlatchAndSleepRunnable()
- val pinned = PinnedThread(runnable, NEEDLE)
- try {
- pinned.start()
- pinned.name shouldBeEqualTo NEEDLE
- latch.await()
- pinned.nativeId().shouldNotBeNull()
- pinned.affinity() shouldBeEqualTo default
- } finally {
- pinned.interrupt()
- }
+ }
+
+ @Test
+ fun `Start a named PinnedThread with an affinity descriptor set`() {
+ val desiredMask = textTestMask
+ val desiredAffinityDescriptor = testAffinityDescriptor
+ val runnable = unlatchAndSleepRunnable()
+ val pinned = PinnedThread(runnable, NEEDLE, desiredAffinityDescriptor)
+ try {
+ pinned.start()
+ pinned.name shouldBeEqualTo NEEDLE
+ latch.await()
+ pinned.nativeId().shouldNotBeNull()
+ pinned.affinity().toString() shouldBeEqualTo desiredMask
+ } finally {
+ pinned.interrupt()
}
-
- @Test
- fun `Start a PinnedThread with an affinity descriptor set`() {
- val desiredMask = textTestMask
- val desiredAffinityDescriptor = testAffinityDescriptor
- val runnable = unlatchAndSleepRunnable()
- val pinned = PinnedThread(runnable, desiredAffinityDescriptor)
- try {
- pinned.start()
- latch.await()
- pinned.nativeId().shouldNotBeNull()
- pinned.affinity().toString() shouldBeEqualTo desiredMask
- } finally {
- pinned.interrupt()
- }
+ }
+
+ @Test
+ fun `Change the affinity of a PinnedThread during runtime`() {
+ val desiredTextMask = textTestMask
+ val desiredBinaryMask = binaryTestMask
+ val desiredAffinityDescriptor = testAffinityDescriptor
+ val runnable = unlatchAndSleepRunnable()
+ val pinned = PinnedThread(runnable)
+ try {
+ pinned.start()
+ latch.await()
+ pinned.nativeId().shouldNotBeNull()
+ pinned.affinity() shouldBeEqualTo default
+ pinned.affinity(desiredAffinityDescriptor)
+ pinned.affinity() shouldNotBeEqualTo default
+ pinned.affinity().mask() shouldBeEqualTo desiredBinaryMask
+ pinned.affinity().toString() shouldBeEqualTo desiredTextMask
+ } finally {
+ pinned.interrupt()
}
-
- @Test
- fun `Start a named PinnedThread with an affinity descriptor set`() {
- val desiredMask = textTestMask
- val desiredAffinityDescriptor = testAffinityDescriptor
- val runnable = unlatchAndSleepRunnable()
- val pinned = PinnedThread(runnable, NEEDLE, desiredAffinityDescriptor)
- try {
- pinned.start()
- pinned.name shouldBeEqualTo NEEDLE
- latch.await()
- pinned.nativeId().shouldNotBeNull()
- pinned.affinity().toString() shouldBeEqualTo desiredMask
- } finally {
- pinned.interrupt()
- }
+ }
+
+ @Test
+ fun `Utilizing a Pinned ForkJoinWorker thread using explicitly defined affinity`() {
+ val desiredTextMask = textTestMask
+ val desiredBinaryMask = binaryTestMask
+ val desiredAffinityDescriptor = testAffinityDescriptor
+ val factory = SingleThreadedForkJoinWorkerThreadFactory(desiredAffinityDescriptor)
+ val pool = ForkJoinPool(`1`, factory, null, false)
+ val action = unlatchAndSleepAction()
+ try {
+ pool.invoke(action)
+ latch.await()
+ val pinned = factory[pool]!!
+ pinned.nativeId().shouldNotBeNull()
+ pinned.affinity().mask() shouldBeEqualTo desiredBinaryMask
+ pinned.affinity().toString() shouldBeEqualTo desiredTextMask
+ } finally {
+ pool.shutdownNow()
}
-
- @Test
- fun `Change the affinity of a PinnedThread during runtime`() {
- val desiredTextMask = textTestMask
- val desiredBinaryMask = binaryTestMask
- val desiredAffinityDescriptor = testAffinityDescriptor
- val runnable = unlatchAndSleepRunnable()
- val pinned = PinnedThread(runnable)
- try {
- pinned.start()
- latch.await()
- pinned.nativeId().shouldNotBeNull()
- pinned.affinity() shouldBeEqualTo default
- pinned.affinity(desiredAffinityDescriptor)
- pinned.affinity() shouldNotBeEqualTo default
- pinned.affinity().mask() shouldBeEqualTo desiredBinaryMask
- pinned.affinity().toString() shouldBeEqualTo desiredTextMask
- } finally {
- pinned.interrupt()
- }
+ }
+
+ @Test
+ fun `Utilizing a Pinned ForkJoinWorker thread without an explicitly defined affinity`() {
+ val desiredTextMask = default.toString()
+ val desiredBinaryMask = default.mask()
+ val factory = SingleThreadedForkJoinWorkerThreadFactory()
+ val pool = ForkJoinPool(`1`, factory, null, false)
+ val action = unlatchAndSleepAction()
+ try {
+ pool.invoke(action)
+ latch.await()
+ val pinned = factory[pool]!!
+ pinned.nativeId().shouldNotBeNull()
+ pinned.affinity().mask() shouldBeEqualTo desiredBinaryMask
+ pinned.affinity().toString() shouldBeEqualTo desiredTextMask
+ } finally {
+ pool.shutdownNow()
}
-
- @Test
- fun `Utilizing a Pinned ForkJoinWorker thread using explicitly defined affinity`() {
- val desiredTextMask = textTestMask
- val desiredBinaryMask = binaryTestMask
- val desiredAffinityDescriptor = testAffinityDescriptor
- val factory = SingleThreadedForkJoinWorkerThreadFactory(desiredAffinityDescriptor)
- val pool = ForkJoinPool(`1`, factory, null, false)
- val action = unlatchAndSleepAction()
- try {
- pool.invoke(action)
- latch.await()
- val pinned = factory[pool]!!
- pinned.nativeId().shouldNotBeNull()
- pinned.affinity().mask() shouldBeEqualTo desiredBinaryMask
- pinned.affinity().toString() shouldBeEqualTo desiredTextMask
- } finally {
- pool.shutdownNow()
- }
+ }
+
+ @Test
+ fun `Extend a PinnedThread`() {
+ val desiredTextMask = textTestMask
+ val desiredBinaryMask = binaryTestMask
+ val desiredAffinityDescriptor = testAffinityDescriptor
+ val runnable = unlatchAndSleepRunnable(true)
+ val pinned = ExtendedPinnedThread(desiredAffinityDescriptor, runnable)
+ try {
+ pinned.start()
+ latch.await()
+ pinned.nativeId().shouldNotBeNull()
+ pinned.affinity().mask() shouldBeEqualTo desiredBinaryMask
+ pinned.affinity().toString() shouldBeEqualTo desiredTextMask
+ } finally {
+ pinned.interrupt()
}
-
- @Test
- fun `Utilizing a Pinned ForkJoinWorker thread without an explicitly defined affinity`() {
- val desiredTextMask = default.toString()
- val desiredBinaryMask = default.mask()
- val factory = SingleThreadedForkJoinWorkerThreadFactory()
- val pool = ForkJoinPool(`1`, factory, null, false)
- val action = unlatchAndSleepAction()
- try {
- pool.invoke(action)
- latch.await()
- val pinned = factory[pool]!!
- pinned.nativeId().shouldNotBeNull()
- pinned.affinity().mask() shouldBeEqualTo desiredBinaryMask
- pinned.affinity().toString() shouldBeEqualTo desiredTextMask
- } finally {
- pool.shutdownNow()
- }
- }
-
- @Test
- fun `Extend a PinnedThread`() {
- val desiredTextMask = textTestMask
- val desiredBinaryMask = binaryTestMask
- val desiredAffinityDescriptor = testAffinityDescriptor
- val runnable = unlatchAndSleepRunnable(true)
- val pinned = ExtendedPinnedThread(desiredAffinityDescriptor, runnable)
- try {
- pinned.start()
- latch.await()
- pinned.nativeId().shouldNotBeNull()
- pinned.affinity().mask() shouldBeEqualTo desiredBinaryMask
- pinned.affinity().toString() shouldBeEqualTo desiredTextMask
- } finally {
- pinned.interrupt()
- }
- }
-
- @Test
- fun `Unsupported platform behavior - PinnedThread access`() {
- unsupportedPlatform {
- val desiredAffinityDescriptor = testAffinityDescriptor
- val runnable = unlatchAndSleepRunnable()
- val pinned = PinnedThread(runnable)
- try {
- pinned.start()
- latch.await()
- pinned.nativeId() shouldBeEqualTo NumberUtils.LONG_MINUS_ONE
- pinned.affinity() shouldBeEqualTo AffinityDescriptor.UNSUPPORTED
- pinned.affinity(desiredAffinityDescriptor)
- pinned.affinity() shouldBeEqualTo AffinityDescriptor.UNSUPPORTED
- } finally {
- pinned.interrupt()
- }
- }
+ }
+
+ @Test
+ fun `Unsupported platform behavior - PinnedThread access`() {
+ unsupportedPlatform {
+ val desiredAffinityDescriptor = testAffinityDescriptor
+ val runnable = unlatchAndSleepRunnable()
+ val pinned = PinnedThread(runnable)
+ try {
+ pinned.start()
+ latch.await()
+ pinned.nativeId() shouldBeEqualTo NumberUtils.LONG_MINUS_ONE
+ pinned.affinity() shouldBeEqualTo AffinityDescriptor.UNSUPPORTED
+ pinned.affinity(desiredAffinityDescriptor)
+ pinned.affinity() shouldBeEqualTo AffinityDescriptor.UNSUPPORTED
+ } finally {
+ pinned.interrupt()
+ }
}
-
- private fun unlatchAndSleepRunnable(setup: Boolean = false) = Runnable {
- if (setup) (Thread.currentThread() as? PinnedThread)?.initialize()
- latch.countDown()
- runCatching { Thread.sleep(1000L) }
+ }
+
+ private fun unlatchAndSleepRunnable(setup: Boolean = false) = Runnable {
+ if (setup) (Thread.currentThread() as? PinnedThread)?.initialize()
+ latch.countDown()
+ runCatching { Thread.sleep(1000L) }
+ }
+
+ private fun unlatchAndSleepAction() = object : RecursiveAction() {
+ override fun compute() {
+ latch.countDown()
+ runCatching { Thread.sleep(1000L) }
}
+ }
- private fun unlatchAndSleepAction() = object : RecursiveAction() {
- override fun compute() {
- latch.countDown()
- runCatching { Thread.sleep(1000L) }
- }
- }
+ private class ExtendedPinnedThread(
+ affinity: AffinityDescriptor,
+ private val runnable: Runnable
+ ) : PinnedThread(affinity) {
+ override fun run() = runnable.run()
+ }
- private class ExtendedPinnedThread(
- affinity: AffinityDescriptor,
- private val runnable: Runnable
- ) : PinnedThread(affinity) {
- override fun run() = runnable.run()
- }
+ private class SingleThreadedForkJoinWorkerThreadFactory(private val affinity: AffinityDescriptor? = null)
+ : ForkJoinPool.ForkJoinWorkerThreadFactory {
- private class SingleThreadedForkJoinWorkerThreadFactory(private val affinity: AffinityDescriptor? = null)
- : ForkJoinPool.ForkJoinWorkerThreadFactory {
+ private val threads = mutableMapOf()
- private val threads = mutableMapOf()
+ operator fun get(pool: ForkJoinPool) = threads[pool]
- operator fun get(pool: ForkJoinPool) = threads[pool]
-
- override fun newThread(pool: ForkJoinPool) = threads
- .computeIfAbsent(pool) {
- affinity
- ?.let { PinnedThread.ForkJoinWorker(pool, it) }
- ?: PinnedThread.ForkJoinWorker(pool)
- }
- }
+ override fun newThread(pool: ForkJoinPool) = threads
+ .computeIfAbsent(pool) {
+ affinity
+ ?.let { PinnedThread.ForkJoinWorker(pool, it) }
+ ?: PinnedThread.ForkJoinWorker(pool)
+ }
+ }
}
diff --git a/core/src/test/kotlin/org/sheinbergon/needle/Support.kt b/core/src/test/kotlin/org/sheinbergon/needle/Support.kt
index c6965bd..dc20ea5 100644
--- a/core/src/test/kotlin/org/sheinbergon/needle/Support.kt
+++ b/core/src/test/kotlin/org/sheinbergon/needle/Support.kt
@@ -21,8 +21,8 @@ val maskUpperBound = AffinityDescriptor.MASK_UPPER_BOUND
val default = Needle.affinity()
val binaryTestMask by lazy {
- if (availableCores > `1`) maskUpperBound - `2L`
- else `1L`
+ if (availableCores > `1`) maskUpperBound - `2L`
+ else `1L`
}
val firstCoreAffinityDescriptor = AffinityDescriptor.from(`1L`)
@@ -30,72 +30,72 @@ val firstCoreAffinityDescriptor = AffinityDescriptor.from(`1L`)
val testAffinityDescriptor = AffinityDescriptor.from(binaryTestMask)
val negatedBinaryTestMask by lazy {
- binaryTestMask.xor(maskUpperBound - `1L`)
+ binaryTestMask.xor(maskUpperBound - `1L`)
}
val negatedTestAffinityDescriptor = AffinityDescriptor.from(negatedBinaryTestMask)
val textTestMask by lazy {
- binaryMaskRanges(binaryTestMask).textMask()
+ binaryMaskRanges(binaryTestMask).textMask()
}
val negatedTextTestMask by lazy {
- binaryMaskRanges(negatedBinaryTestMask).textMask()
+ binaryMaskRanges(negatedBinaryTestMask).textMask()
}
private fun List.textMask() = this.map {
- with(it) {
- if (first == last) "$start"
- else "$first${AffinityDescriptor.RANGE_DELIMITER}$last"
- }
+ with(it) {
+ if (first == last) "$start"
+ else "$first${AffinityDescriptor.RANGE_DELIMITER}$last"
+ }
}.joinToString(separator = AffinityDescriptor.SPECIFICATION_DELIMITER)
private fun binaryMaskRanges(mask: Long): List {
- var start = `-1`
- var binaryMask = mask
- return mutableListOf().apply {
- for (index in `0` until availableCores) {
- if (binaryMask.and(`1L`) == `1L`) {
- if (start == `-1`) {
- start = index
- }
- if (index + `1` == availableCores) {
- add(start..index)
- }
- } else if (start != `-1`) {
- add(start until index)
- start = `-1`
- }
- binaryMask = binaryMask.shr(`1`)
+ var start = `-1`
+ var binaryMask = mask
+ return mutableListOf().apply {
+ for (index in `0` until availableCores) {
+ if (binaryMask.and(`1L`) == `1L`) {
+ if (start == `-1`) {
+ start = index
+ }
+ if (index + `1` == availableCores) {
+ add(start..index)
}
+ } else if (start != `-1`) {
+ add(start until index)
+ start = `-1`
+ }
+ binaryMask = binaryMask.shr(`1`)
}
+ }
}
fun unsupportedPlatform(action: () -> Unit) {
- setPlatform(AffinityResolver.NoOp.INSTANCE)
- try {
- action()
- } finally {
- resetPlatform()
- }
+ setPlatform(AffinityResolver.NoOp.INSTANCE)
+ try {
+ action()
+ } finally {
+ resetPlatform()
+ }
}
private fun resetPlatform() {
- val resolver = if (Platform.isWindows()) {
- Win32AffinityResolver.INSTANCE
- } else if (Platform.isLinux()) {
- LinuxAffinityResolver.INSTANCE
- } else {
- AffinityResolver.NoOp.INSTANCE
- }
- setPlatform(resolver)
+ val resolver = if (Platform.isWindows()) {
+ Win32AffinityResolver.INSTANCE
+ } else if (Platform.isLinux()) {
+ LinuxAffinityResolver.INSTANCE
+ } else {
+ AffinityResolver.NoOp.INSTANCE
+ }
+ setPlatform(resolver)
}
private fun setPlatform(resolver: AffinityResolver<*>) {
- val affinityResolver = Needle::class.java.getDeclaredField("affinityResolver")
- val modifiers = Field::class.java.getDeclaredField("modifiers")
- affinityResolver.trySetAccessible()
- modifiers.trySetAccessible()
- modifiers.setInt(affinityResolver, affinityResolver.getModifiers() and Modifier.FINAL.inv())
- affinityResolver.set(null, resolver)
+ val affinityResolver = Needle::class.java.getDeclaredField("affinityResolver")
+ val modifiers = Field::class.java.getDeclaredField("modifiers")
+ affinityResolver.trySetAccessible()
+ modifiers.trySetAccessible()
+ modifiers.setInt(affinityResolver, affinityResolver.getModifiers() and Modifier.FINAL.inv())
+ affinityResolver.set(null, resolver)
}
diff --git a/detekt.yml b/detekt.yml
index 4de9399..770da73 100644
--- a/detekt.yml
+++ b/detekt.yml
@@ -241,10 +241,10 @@ formatting:
autoCorrect: true
layout: 'idea'
Indentation:
- active: false
+ active: true
autoCorrect: true
indentSize: 2
- continuationIndentSize: 2
+ continuationIndentSize: 4
MaximumLineLength:
active: true
maxLineLength: 150
@@ -479,7 +479,7 @@ potential-bugs:
UnreachableCode:
active: true
UnsafeCallOnNullableType:
- active: true
+ active: false
UnsafeCast:
active: false
UselessPostfixExpression:
diff --git a/knitter/build.gradle b/knitter/build.gradle
index e1dd152..8799f76 100644
--- a/knitter/build.gradle
+++ b/knitter/build.gradle
@@ -3,6 +3,8 @@ plugins {
}
dependencies {
+ implementation "org.jetbrains.kotlin:kotlin-stdlib"
+
api project(':needle-core')
implementation project(':needle-concurrent')
diff --git a/knitter/src/main/kotlin/org/sheinbergon/needle/knitter/PinnedThreads.kt b/knitter/src/main/kotlin/org/sheinbergon/needle/knitter/PinnedThreads.kt
index 4e95cd7..ab38d77 100644
--- a/knitter/src/main/kotlin/org/sheinbergon/needle/knitter/PinnedThreads.kt
+++ b/knitter/src/main/kotlin/org/sheinbergon/needle/knitter/PinnedThreads.kt
@@ -15,18 +15,18 @@ fun pinnedThread(
affinity: AffinityDescriptor = AffinityDescriptor.from(`0L`),
block: () -> Unit
): PinnedThread {
- val pinnedThread = PinnedThread(block::invoke, affinity)
- if (isDaemon) {
- pinnedThread.isDaemon = true
- }
- if (name != null) {
- pinnedThread.name = name
- }
- if (contextClassLoader != null) {
- pinnedThread.contextClassLoader = contextClassLoader
- }
- if (start) {
- pinnedThread.start()
- }
- return pinnedThread
+ val pinnedThread = PinnedThread(block::invoke, affinity)
+ if (isDaemon) {
+ pinnedThread.isDaemon = true
+ }
+ if (name != null) {
+ pinnedThread.name = name
+ }
+ if (contextClassLoader != null) {
+ pinnedThread.contextClassLoader = contextClassLoader
+ }
+ if (start) {
+ pinnedThread.start()
+ }
+ return pinnedThread
}
diff --git a/knitter/src/main/kotlin/org/sheinbergon/needle/knitter/coroutines/GovernedAffinityDispatcher.kt b/knitter/src/main/kotlin/org/sheinbergon/needle/knitter/coroutines/GovernedAffinityDispatcher.kt
index c55730e..e852d39 100644
--- a/knitter/src/main/kotlin/org/sheinbergon/needle/knitter/coroutines/GovernedAffinityDispatcher.kt
+++ b/knitter/src/main/kotlin/org/sheinbergon/needle/knitter/coroutines/GovernedAffinityDispatcher.kt
@@ -5,5 +5,5 @@ import org.sheinbergon.needle.AffinityDescriptor
abstract class GovernedAffinityDispatcher : CoroutineDispatcher() {
- abstract fun alter(affinity: AffinityDescriptor)
+ abstract fun alter(affinity: AffinityDescriptor)
}
diff --git a/knitter/src/main/kotlin/org/sheinbergon/needle/knitter/coroutines/PinnedDispatchers.kt b/knitter/src/main/kotlin/org/sheinbergon/needle/knitter/coroutines/PinnedDispatchers.kt
index 6f81845..434e62c 100644
--- a/knitter/src/main/kotlin/org/sheinbergon/needle/knitter/coroutines/PinnedDispatchers.kt
+++ b/knitter/src/main/kotlin/org/sheinbergon/needle/knitter/coroutines/PinnedDispatchers.kt
@@ -1,6 +1,7 @@
package org.sheinbergon.needle.knitter.coroutines
import kotlinx.coroutines.CoroutineDispatcher
+import kotlinx.coroutines.Runnable
import kotlinx.coroutines.asCoroutineDispatcher
import org.sheinbergon.needle.AffinityDescriptor
import org.sheinbergon.needle.concurrent.FixedAffinityPinnedThreadFactory
@@ -8,6 +9,7 @@ import org.sheinbergon.needle.concurrent.GovernedAffinityPinnedThreadFactory
import org.sheinbergon.needle.concurrent.PinnedThreadPoolExecutor
import kotlin.coroutines.CoroutineContext
+
private const val `1` = 1
private class GovernedAffinityDelegatingDispatcher(
@@ -15,18 +17,19 @@ private class GovernedAffinityDelegatingDispatcher(
affinity: AffinityDescriptor
) : GovernedAffinityDispatcher() {
- val factory: GovernedAffinityPinnedThreadFactory = GovernedAffinityPinnedThreadFactory(affinity)
+ val factory: GovernedAffinityPinnedThreadFactory
- private val delegate: CoroutineDispatcher
+ val delegate: CoroutineDispatcher
- init {
- val executor = PinnedThreadPoolExecutor.newFixedPinnedThreadPool(parallelism, factory)
- delegate = executor.asCoroutineDispatcher()
- }
+ init {
+ factory = GovernedAffinityPinnedThreadFactory(affinity)
+ val executor = PinnedThreadPoolExecutor.newFixedPinnedThreadPool(parallelism, factory)
+ delegate = executor.asCoroutineDispatcher()
+ }
- override fun alter(affinity: AffinityDescriptor) = factory.alter(affinity, true)
+ override fun alter(affinity: AffinityDescriptor) = factory.alter(affinity, true)
- override fun dispatch(context: CoroutineContext, block: Runnable) = delegate.dispatch(context, block)
+ override fun dispatch(context: CoroutineContext, block: Runnable) = delegate.dispatch(context, block)
}
fun governedAffinitySingleThread(affinity: AffinityDescriptor): GovernedAffinityDispatcher =
@@ -39,7 +42,7 @@ fun fixedAffinitySingleThread(affinity: AffinityDescriptor) =
fixedAffinityThreadPool(`1`, affinity)
fun fixedAffinityThreadPool(parallelism: Int, affinity: AffinityDescriptor): CoroutineDispatcher {
- val factory = FixedAffinityPinnedThreadFactory(affinity)
- val executor = PinnedThreadPoolExecutor.newFixedPinnedThreadPool(parallelism, factory)
- return executor.asCoroutineDispatcher()
+ val factory = FixedAffinityPinnedThreadFactory(affinity)
+ val executor = PinnedThreadPoolExecutor.newFixedPinnedThreadPool(parallelism, factory)
+ return executor.asCoroutineDispatcher()
}
diff --git a/knitter/src/test/kotlin/org/sheinbergon/needle/knitter/PinnedThreadsTest.kt b/knitter/src/test/kotlin/org/sheinbergon/needle/knitter/PinnedThreadsTest.kt
index 4504969..84dd8a6 100644
--- a/knitter/src/test/kotlin/org/sheinbergon/needle/knitter/PinnedThreadsTest.kt
+++ b/knitter/src/test/kotlin/org/sheinbergon/needle/knitter/PinnedThreadsTest.kt
@@ -8,37 +8,37 @@ import java.util.concurrent.CountDownLatch
class PinnedThreadsTest {
- @Test
- fun `PinnedThread creation - Kotlin interface - Parameterized`() {
- val latch = CountDownLatch(`1`)
- val pinned = pinnedThread(
- start = false,
- name = NEEDLE,
- contextClassLoader = ClassLoader.getPlatformClassLoader(),
- isDaemon = true,
- affinity = testAffinityDescriptor) {
- latch.countDown()
- runCatching { Thread.sleep(1250) }
- }
- pinned.isAlive shouldBe false
- pinned.start()
- latch.await()
- pinned.contextClassLoader shouldBe ClassLoader.getPlatformClassLoader()
- pinned.isDaemon shouldBe true
- pinned.name shouldBeEqualTo NEEDLE
- pinned.affinity().mask() shouldBeEqualTo binaryTestMask
+ @Test
+ fun `PinnedThread creation - Kotlin interface - Parameterized`() {
+ val latch = CountDownLatch(`1`)
+ val pinned = pinnedThread(
+ start = false,
+ name = NEEDLE,
+ contextClassLoader = ClassLoader.getPlatformClassLoader(),
+ isDaemon = true,
+ affinity = testAffinityDescriptor) {
+ latch.countDown()
+ runCatching { Thread.sleep(1250) }
}
+ pinned.isAlive shouldBe false
+ pinned.start()
+ latch.await()
+ pinned.contextClassLoader shouldBe ClassLoader.getPlatformClassLoader()
+ pinned.isDaemon shouldBe true
+ pinned.name shouldBeEqualTo NEEDLE
+ pinned.affinity().mask() shouldBeEqualTo binaryTestMask
+ }
- @Test
- fun `PinnedThread creation - Kotlin interface - Default`() {
- val latch = CountDownLatch(`1`)
- val pinned = pinnedThread {
- latch.countDown()
- runCatching { Thread.sleep(1250) }
- }
- pinned.isAlive shouldBe true
- latch.await()
- pinned.isDaemon shouldBe false
- pinned.affinity().mask() shouldBeEqualTo default.mask()
+ @Test
+ fun `PinnedThread creation - Kotlin interface - Default`() {
+ val latch = CountDownLatch(`1`)
+ val pinned = pinnedThread {
+ latch.countDown()
+ runCatching { Thread.sleep(1250) }
}
+ pinned.isAlive shouldBe true
+ latch.await()
+ pinned.isDaemon shouldBe false
+ pinned.affinity().mask() shouldBeEqualTo default.mask()
+ }
}
diff --git a/knitter/src/test/kotlin/org/sheinbergon/needle/knitter/coroutines/PinnedDispatchersTest.kt b/knitter/src/test/kotlin/org/sheinbergon/needle/knitter/coroutines/PinnedDispatchersTest.kt
index 591a950..7e4a9b7 100644
--- a/knitter/src/test/kotlin/org/sheinbergon/needle/knitter/coroutines/PinnedDispatchersTest.kt
+++ b/knitter/src/test/kotlin/org/sheinbergon/needle/knitter/coroutines/PinnedDispatchersTest.kt
@@ -8,72 +8,72 @@ import org.sheinbergon.needle.*
class PinnedDispatchersTest {
- @Test
- fun `Fixed affinity single threaded dispatcher`() {
- val dispatcher = fixedAffinitySingleThread(testAffinityDescriptor)
- val deferred = deferredAffinitySingleAsync(dispatcher)
- runBlocking { blockingAssertSingle(deferred, binaryTestMask, textTestMask) }
- }
+ @Test
+ fun `Fixed affinity single threaded dispatcher`() {
+ val dispatcher = fixedAffinitySingleThread(testAffinityDescriptor)
+ val deferred = deferredAffinitySingleAsync(dispatcher)
+ runBlocking { blockingAssertSingle(deferred, binaryTestMask, textTestMask) }
+ }
- @Test
- fun `Fixed affinity thread-pool dispatcher`() {
- val dispatcher = fixedAffinityThreadPool(availableCores, negatedTestAffinityDescriptor)
- val deferred = deferredAffinityPoolAsync(availableCores, dispatcher)
- runBlocking { blockingAssertPool(availableCores, deferred, negatedBinaryTestMask, negatedTextTestMask) }
- }
+ @Test
+ fun `Fixed affinity thread-pool dispatcher`() {
+ val dispatcher = fixedAffinityThreadPool(availableCores, negatedTestAffinityDescriptor)
+ val deferred = deferredAffinityPoolAsync(availableCores, dispatcher)
+ runBlocking { blockingAssertPool(availableCores, deferred, negatedBinaryTestMask, negatedTextTestMask) }
+ }
- @Test
- fun `Governed affinity single threaded dispatcher`() {
- val dispatcher = governedAffinitySingleThread(testAffinityDescriptor)
- val deferred1 = deferredAffinitySingleAsync(dispatcher)
- runBlocking { blockingAssertSingle(deferred1, binaryTestMask, textTestMask) }
- dispatcher.alter(negatedTestAffinityDescriptor)
- val deferred2 = deferredAffinitySingleAsync(dispatcher)
- runBlocking { blockingAssertSingle(deferred2, negatedBinaryTestMask, negatedTextTestMask) }
- }
+ @Test
+ fun `Governed affinity single threaded dispatcher`() {
+ val dispatcher = governedAffinitySingleThread(testAffinityDescriptor)
+ val deferred1 = deferredAffinitySingleAsync(dispatcher)
+ runBlocking { blockingAssertSingle(deferred1, binaryTestMask, textTestMask) }
+ dispatcher.alter(negatedTestAffinityDescriptor)
+ val deferred2 = deferredAffinitySingleAsync(dispatcher)
+ runBlocking { blockingAssertSingle(deferred2, negatedBinaryTestMask, negatedTextTestMask) }
+ }
- @Test
- fun `Governed affinity thread-pool dispatcher`() {
- val dispatcher = governedAffinityThreadPool(availableCores, negatedTestAffinityDescriptor)
- val deferred1 = deferredAffinityPoolAsync(availableCores, dispatcher)
- runBlocking { blockingAssertPool(availableCores, deferred1, negatedBinaryTestMask, negatedTextTestMask) }
- dispatcher.alter(testAffinityDescriptor)
- val deferred2 = deferredAffinityPoolAsync(availableCores, dispatcher)
- runBlocking { blockingAssertPool(availableCores, deferred2, binaryTestMask, textTestMask) }
- }
+ @Test
+ fun `Governed affinity thread-pool dispatcher`() {
+ val dispatcher = governedAffinityThreadPool(availableCores, negatedTestAffinityDescriptor)
+ val deferred1 = deferredAffinityPoolAsync(availableCores, dispatcher)
+ runBlocking { blockingAssertPool(availableCores, deferred1, negatedBinaryTestMask, negatedTextTestMask) }
+ dispatcher.alter(testAffinityDescriptor)
+ val deferred2 = deferredAffinityPoolAsync(availableCores, dispatcher)
+ runBlocking { blockingAssertPool(availableCores, deferred2, binaryTestMask, textTestMask) }
+ }
- private fun deferredAffinitySingleAsync(dispatcher: CoroutineDispatcher) =
- GlobalScope.async(dispatcher) { Needle.affinity() }
+ private fun deferredAffinitySingleAsync(dispatcher: CoroutineDispatcher) =
+ GlobalScope.async(dispatcher) { Needle.affinity() }
- private fun deferredAffinityPoolAsync(cores: Int, dispatcher: CoroutineDispatcher) = (`1`..cores)
- .map {
- GlobalScope.async(dispatcher) {
- Thread.currentThread() to Needle.affinity()
- }
+ private fun deferredAffinityPoolAsync(cores: Int, dispatcher: CoroutineDispatcher) = (`1`..cores)
+ .map {
+ GlobalScope.async(dispatcher) {
+ Thread.currentThread() to Needle.affinity()
}
+ }
- private suspend fun blockingAssertSingle(
- deferred: Deferred,
- binaryMask: Long,
- textMask: String
- ) {
- val affinity = deferred.await()
- affinity.mask() shouldBeEqualTo binaryMask
- affinity.toString() shouldBeEqualTo textMask
- }
+ private suspend fun blockingAssertSingle(
+ deferred: Deferred,
+ binaryMask: Long,
+ textMask: String
+ ) {
+ val affinity = deferred.await()
+ affinity.mask() shouldBeEqualTo binaryMask
+ affinity.toString() shouldBeEqualTo textMask
+ }
- private suspend fun blockingAssertPool(
- cores: Int,
- deferred: List>>,
- binaryMask: Long,
- textMask: String
- ) {
- val results = deferred.awaitAll()
- val threads = results.mapTo(mutableSetOf(), Pair::first)
- threads.size shouldBeLessOrEqualTo cores
- results.forEach { (_, affinity) ->
- affinity.mask() shouldBeEqualTo binaryMask
- affinity.toString() shouldBeEqualTo textMask
- }
+ private suspend fun blockingAssertPool(
+ cores: Int,
+ deferred: List>>,
+ binaryMask: Long,
+ textMask: String
+ ) {
+ val results = deferred.awaitAll()
+ val threads = results.mapTo(mutableSetOf(), Pair::first)
+ threads.size shouldBeLessOrEqualTo cores
+ results.forEach { (_, affinity) ->
+ affinity.mask() shouldBeEqualTo binaryMask
+ affinity.toString() shouldBeEqualTo textMask
}
+ }
}
diff --git a/settings.gradle b/settings.gradle
index 9b41a12..79bd991 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -3,4 +3,6 @@ findProject(':core')?.name = 'needle-core'
include 'knitter'
findProject(':knitter')?.name = 'needle-knitter'
include 'concurrent'
-findProject(':concurrent')?.name = 'needle-concurrent'
\ No newline at end of file
+findProject(':concurrent')?.name = 'needle-concurrent'
+include 'agent'
+findProject(':agent')?.name = 'needle-agent'
\ No newline at end of file