From bab30b2551beea8033d9f533a300fc88868d8783 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 3 Jun 2021 21:41:37 +1000 Subject: [PATCH 01/21] Rename EWYK The AdaptiveExecutionStrategy Rename EWYK The AdaptiveExecutionStrategy, which better represents the nature of the strategy. Signed-off-by: Greg Wilkins --- .../eclipse/jetty/http2/HTTP2Connection.java | 10 +- .../org/eclipse/jetty/io/ManagedSelector.java | 4 +- .../util/thread/strategy/EatWhatYouKill.java | 481 ------------------ .../jetty/util/thread/EatWhatYouKillTest.java | 144 ------ .../strategy/ExecuteProduceConsumeTest.java | 54 +- .../strategy/ExecutionStrategyTest.java | 2 +- .../thread/strategy/jmh/EWYKBenchmark.java | 194 ------- 7 files changed, 32 insertions(+), 857 deletions(-) delete mode 100644 jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java delete mode 100644 jetty-util/src/test/java/org/eclipse/jetty/util/thread/EatWhatYouKillTest.java delete mode 100644 tests/jetty-jmh/src/main/java/org/eclipse/jetty/util/thread/strategy/jmh/EWYKBenchmark.java diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java index 5b9a7b691fb5..02dabdb702a9 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java @@ -33,8 +33,7 @@ import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.thread.AutoLock; import org.eclipse.jetty.util.thread.ExecutionStrategy; -import org.eclipse.jetty.util.thread.TryExecutor; -import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; +import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,9 +41,6 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher. { protected static final Logger LOG = LoggerFactory.getLogger(HTTP2Connection.class); - // TODO remove this once we are sure EWYK is OK for http2 - private static final boolean PEC_MODE = Boolean.getBoolean("org.eclipse.jetty.http2.PEC_MODE"); - private final AutoLock lock = new AutoLock(); private final Queue tasks = new ArrayDeque<>(); private final HTTP2Producer producer = new HTTP2Producer(); @@ -64,9 +60,7 @@ public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoin this.parser = parser; this.session = session; this.bufferSize = bufferSize; - if (PEC_MODE) - executor = new TryExecutor.NoTryExecutor(executor); - this.strategy = new EatWhatYouKill(producer, executor); + this.strategy = new AdaptiveExecutionStrategy(producer, executor); LifeCycle.start(strategy); parser.init(ParserListener::new); } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index 631335a353c3..9ec7fad40c84 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -49,7 +49,7 @@ import org.eclipse.jetty.util.thread.AutoLock; import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.Scheduler; -import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; +import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,7 +95,7 @@ public ManagedSelector(SelectorManager selectorManager, int id) _id = id; SelectorProducer producer = new SelectorProducer(); Executor executor = selectorManager.getExecutor(); - _strategy = new EatWhatYouKill(producer, executor); + _strategy = new AdaptiveExecutionStrategy(producer, executor); addBean(_strategy, true); } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java deleted file mode 100644 index 7fd11bd9f5e3..000000000000 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java +++ /dev/null @@ -1,481 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License v. 2.0 which is available at -// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// ======================================================================== -// - -package org.eclipse.jetty.util.thread.strategy; - -import java.io.Closeable; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; -import java.util.concurrent.Executor; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.atomic.LongAdder; - -import org.eclipse.jetty.util.annotation.ManagedAttribute; -import org.eclipse.jetty.util.annotation.ManagedObject; -import org.eclipse.jetty.util.annotation.ManagedOperation; -import org.eclipse.jetty.util.component.ContainerLifeCycle; -import org.eclipse.jetty.util.thread.AutoLock; -import org.eclipse.jetty.util.thread.ExecutionStrategy; -import org.eclipse.jetty.util.thread.Invocable; -import org.eclipse.jetty.util.thread.TryExecutor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - *

A strategy where the thread that produces will run the resulting task if it - * is possible to do so without thread starvation.

- * - *

This strategy preemptively dispatches a thread as a pending producer, so that - * when a thread produces a task it can immediately run the task and let the pending - * producer thread take over production. When operating in this way, the sub-strategy - * is called Execute Produce Consume (EPC).

- *

However, if the task produced uses the {@link Invocable} API to indicate that - * it will not block, then the strategy will run it directly, regardless of the - * presence of a pending producer thread and then resume production after the - * task has completed. When operating in this pattern, the sub-strategy is called - * ProduceConsume (PC).

- *

If there is no pending producer thread available and if the task has not - * indicated it is non-blocking, then this strategy will dispatch the execution of - * the task and immediately continue production. When operating in this pattern, the - * sub-strategy is called ProduceExecuteConsume (PEC).

- */ -@ManagedObject("eat what you kill execution strategy") -public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrategy, Runnable -{ - private static final Logger LOG = LoggerFactory.getLogger(EatWhatYouKill.class); - - private enum State - { - IDLE, PRODUCING, REPRODUCING - } - - /* The modes this strategy can work in */ - private enum Mode - { - PRODUCE_CONSUME, - PRODUCE_INVOKE_CONSUME, // This is PRODUCE_CONSUME an EITHER task with NON_BLOCKING invocation - PRODUCE_EXECUTE_CONSUME, - EXECUTE_PRODUCE_CONSUME // Eat What You Kill! - } - - private final AutoLock _lock = new AutoLock(); - private final LongAdder _pcMode = new LongAdder(); - private final LongAdder _picMode = new LongAdder(); - private final LongAdder _pecMode = new LongAdder(); - private final LongAdder _epcMode = new LongAdder(); - private final Producer _producer; - private final Executor _executor; - private final TryExecutor _tryExecutor; - private State _state = State.IDLE; - private boolean _pending; - - public EatWhatYouKill(Producer producer, Executor executor) - { - _producer = producer; - _executor = executor; - _tryExecutor = TryExecutor.asTryExecutor(executor); - addBean(_producer); - addBean(_tryExecutor); - if (LOG.isDebugEnabled()) - LOG.debug("{} created", this); - } - - @Override - public void dispatch() - { - boolean execute = false; - try (AutoLock l = _lock.lock()) - { - switch (_state) - { - case IDLE: - if (!_pending) - { - _pending = true; - execute = true; - } - break; - - case PRODUCING: - _state = State.REPRODUCING; - break; - - default: - break; - } - } - if (LOG.isDebugEnabled()) - LOG.debug("{} dispatch {}", this, execute); - if (execute) - _executor.execute(this); - } - - @Override - public void run() - { - tryProduce(true); - } - - @Override - public void produce() - { - tryProduce(false); - } - - private void tryProduce(boolean wasPending) - { - if (LOG.isDebugEnabled()) - LOG.debug("{} tryProduce {}", this, wasPending); - - try (AutoLock l = _lock.lock()) - { - if (wasPending) - _pending = false; - - switch (_state) - { - case IDLE: - // Enter PRODUCING - _state = State.PRODUCING; - break; - - case PRODUCING: - // Keep other Thread producing - _state = State.REPRODUCING; - return; - - default: - return; - } - } - - boolean nonBlocking = Invocable.isNonBlockingInvocation(); - - while (isRunning()) - { - try - { - if (doProduce(nonBlocking)) - continue; - return; - } - catch (Throwable th) - { - LOG.warn("Unable to produce", th); - } - } - } - - private boolean doProduce(boolean nonBlocking) - { - Runnable task = produceTask(); - - if (task == null) - { - try (AutoLock l = _lock.lock()) - { - // Could another task just have been queued with a produce call? - switch (_state) - { - case PRODUCING: - _state = State.IDLE; - return false; - - case REPRODUCING: - _state = State.PRODUCING; - return true; - - default: - throw new IllegalStateException(toStringLocked()); - } - } - } - - Mode mode; - if (nonBlocking) - { - // The calling thread cannot block, so we only have a choice between PC and PEC modes, - // based on the invocation type of the task - switch (Invocable.getInvocationType(task)) - { - case NON_BLOCKING: - mode = Mode.PRODUCE_CONSUME; - break; - - case EITHER: - mode = Mode.PRODUCE_INVOKE_CONSUME; - break; - - default: - mode = Mode.PRODUCE_EXECUTE_CONSUME; - break; - } - } - else - { - // The calling thread can block, so we can choose between PC, PEC and EPC modes, - // based on the invocation type of the task and if a reserved thread is available - switch (Invocable.getInvocationType(task)) - { - case NON_BLOCKING: - mode = Mode.PRODUCE_CONSUME; - break; - - case BLOCKING: - // The task is blocking, so PC is not an option. Thus we choose - // between EPC and PEC based on the availability of a reserved thread. - try (AutoLock l = _lock.lock()) - { - if (_pending) - { - _state = State.IDLE; - mode = Mode.EXECUTE_PRODUCE_CONSUME; - } - else if (_tryExecutor.tryExecute(this)) - { - _pending = true; - _state = State.IDLE; - mode = Mode.EXECUTE_PRODUCE_CONSUME; - } - else - { - mode = Mode.PRODUCE_EXECUTE_CONSUME; - } - } - break; - - case EITHER: - // The task may be non blocking, so PC is an option. Thus we choose - // between EPC and PC based on the availability of a reserved thread. - try (AutoLock l = _lock.lock()) - { - if (_pending) - { - _state = State.IDLE; - mode = Mode.EXECUTE_PRODUCE_CONSUME; - } - else if (_tryExecutor.tryExecute(this)) - { - _pending = true; - _state = State.IDLE; - mode = Mode.EXECUTE_PRODUCE_CONSUME; - } - else - { - // PC mode, but we must consume with non-blocking invocation - // as we may be the last thread and we cannot block - mode = Mode.PRODUCE_INVOKE_CONSUME; - } - } - break; - - default: - throw new IllegalStateException(toString()); - } - } - - if (LOG.isDebugEnabled()) - LOG.debug("{} m={} t={}/{}", this, mode, task, Invocable.getInvocationType(task)); - - // Consume or execute task - switch (mode) - { - case PRODUCE_CONSUME: - _pcMode.increment(); - runTask(task); - return true; - - case PRODUCE_INVOKE_CONSUME: - _picMode.increment(); - invokeTask(task); - return true; - - case PRODUCE_EXECUTE_CONSUME: - _pecMode.increment(); - execute(task); - return true; - - case EXECUTE_PRODUCE_CONSUME: - _epcMode.increment(); - runTask(task); - - // Try to produce again? - try (AutoLock l = _lock.lock()) - { - if (_state == State.IDLE) - { - // We beat the pending producer, so we will become the producer instead - _state = State.PRODUCING; - return true; - } - } - return false; - - default: - throw new IllegalStateException(toString()); - } - } - - private void runTask(Runnable task) - { - try - { - task.run(); - } - catch (Throwable x) - { - LOG.warn("Task run failed", x); - } - } - - private void invokeTask(Runnable task) - { - try - { - Invocable.invokeNonBlocking(task); - } - catch (Throwable x) - { - LOG.warn("Task invoke failed", x); - } - } - - private Runnable produceTask() - { - try - { - return _producer.produce(); - } - catch (Throwable e) - { - LOG.warn("Task produce failed", e); - return null; - } - } - - private void execute(Runnable task) - { - try - { - _executor.execute(task); - } - catch (RejectedExecutionException e) - { - if (isRunning()) - LOG.warn("Execute failed", e); - else - LOG.trace("IGNORED", e); - - if (task instanceof Closeable) - { - try - { - ((Closeable)task).close(); - } - catch (Throwable e2) - { - LOG.trace("IGNORED", e2); - } - } - } - } - - @ManagedAttribute(value = "number of tasks consumed with PC mode", readonly = true) - public long getPCTasksConsumed() - { - return _pcMode.longValue(); - } - - @ManagedAttribute(value = "number of tasks executed with PIC mode", readonly = true) - public long getPICTasksExecuted() - { - return _picMode.longValue(); - } - - @ManagedAttribute(value = "number of tasks executed with PEC mode", readonly = true) - public long getPECTasksExecuted() - { - return _pecMode.longValue(); - } - - @ManagedAttribute(value = "number of tasks consumed with EPC mode", readonly = true) - public long getEPCTasksConsumed() - { - return _epcMode.longValue(); - } - - @ManagedAttribute(value = "whether this execution strategy is idle", readonly = true) - public boolean isIdle() - { - try (AutoLock l = _lock.lock()) - { - return _state == State.IDLE; - } - } - - @ManagedOperation(value = "resets the task counts", impact = "ACTION") - public void reset() - { - _pcMode.reset(); - _epcMode.reset(); - _pecMode.reset(); - _picMode.reset(); - } - - @Override - public String toString() - { - try (AutoLock l = _lock.lock()) - { - return toStringLocked(); - } - } - - public String toStringLocked() - { - StringBuilder builder = new StringBuilder(); - getString(builder); - getState(builder); - return builder.toString(); - } - - private void getString(StringBuilder builder) - { - builder.append(getClass().getSimpleName()); - builder.append('@'); - builder.append(Integer.toHexString(hashCode())); - builder.append('/'); - builder.append(_producer); - builder.append('/'); - } - - private void getState(StringBuilder builder) - { - builder.append(_state); - builder.append("/p="); - builder.append(_pending); - builder.append('/'); - builder.append(_tryExecutor); - builder.append("[pc="); - builder.append(getPCTasksConsumed()); - builder.append(",pic="); - builder.append(getPICTasksExecuted()); - builder.append(",pec="); - builder.append(getPECTasksExecuted()); - builder.append(",epc="); - builder.append(getEPCTasksConsumed()); - builder.append("]"); - builder.append("@"); - builder.append(DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); - } -} diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/EatWhatYouKillTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/EatWhatYouKillTest.java deleted file mode 100644 index 9d4f12094b5a..000000000000 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/EatWhatYouKillTest.java +++ /dev/null @@ -1,144 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License v. 2.0 which is available at -// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// ======================================================================== -// - -package org.eclipse.jetty.util.thread; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import org.eclipse.jetty.logging.StacklessLogging; -import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertNull; - -public class EatWhatYouKillTest -{ - private EatWhatYouKill ewyk; - - private void startEWYK(ExecutionStrategy.Producer producer) throws Exception - { - QueuedThreadPool executor = new QueuedThreadPool(); - ewyk = new EatWhatYouKill(producer, executor); - ewyk.start(); - ReservedThreadExecutor tryExecutor = executor.getBean(ReservedThreadExecutor.class); - // Prime the executor so that there is a reserved thread. - executor.tryExecute(() -> - { - }); - while (tryExecutor.getAvailable() == 0) - { - Thread.sleep(10); - } - } - - @AfterEach - public void dispose() throws Exception - { - if (ewyk != null) - ewyk.stop(); - } - - @Test - public void testExceptionThrownByTask() throws Exception - { - try (StacklessLogging ignored = new StacklessLogging(EatWhatYouKill.class)) - { - AtomicReference detector = new AtomicReference<>(); - CountDownLatch latch = new CountDownLatch(2); - BlockingQueue tasks = new LinkedBlockingQueue<>(); - startEWYK(() -> - { - boolean proceed = detector.compareAndSet(null, new Throwable()); - if (proceed) - { - try - { - latch.countDown(); - return tasks.poll(1, TimeUnit.SECONDS); - } - catch (InterruptedException x) - { - x.printStackTrace(); - return null; - } - finally - { - detector.set(null); - } - } - else - { - return null; - } - }); - - // Start production in another thread. - ewyk.dispatch(); - - tasks.offer(new Task(() -> - { - try - { - // While thread1 runs this task, simulate - // that thread2 starts producing. - ewyk.dispatch(); - // Wait for thread2 to block in produce(). - latch.await(); - // Throw to verify that exceptions are handled correctly. - throw new RuntimeException(); - } - catch (InterruptedException x) - { - x.printStackTrace(); - } - }, Invocable.InvocationType.BLOCKING)); - - // Wait until EWYK is idle. - while (!ewyk.isIdle()) - { - Thread.sleep(10); - } - - assertNull(detector.get()); - } - } - - private static class Task implements Runnable, Invocable - { - private final Runnable task; - private final InvocationType invocationType; - - private Task(Runnable task, InvocationType invocationType) - { - this.task = task; - this.invocationType = invocationType; - } - - @Override - public void run() - { - task.run(); - } - - @Override - public InvocationType getInvocationType() - { - return invocationType; - } - } -} diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsumeTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsumeTest.java index 304e1eb59c43..d832e66445ca 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsumeTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsumeTest.java @@ -36,7 +36,7 @@ public class ExecuteProduceConsumeTest private final BlockingQueue _produce = new LinkedBlockingQueue<>(); private final Queue _executions = new LinkedBlockingQueue<>(); - private ExecuteProduceConsume _ewyk; + private ExecuteProduceConsume _exStrategy; private volatile Thread _producer; @BeforeEach @@ -67,7 +67,7 @@ public void before() Executor executor = _executions::add; - _ewyk = new ExecuteProduceConsume(producer, executor); + _exStrategy = new ExecuteProduceConsume(producer, executor); } @AfterEach @@ -82,7 +82,7 @@ public void after() public void testIdle() { _produce.add(NULLTASK); - _ewyk.produce(); + _exStrategy.produce(); } @Test @@ -91,9 +91,9 @@ public void testProduceOneNonBlockingTask() Task t0 = new Task(); _produce.add(t0); _produce.add(NULLTASK); - _ewyk.produce(); + _exStrategy.produce(); assertThat(t0.hasRun(), Matchers.equalTo(true)); - assertEquals(_ewyk, _executions.poll()); + assertEquals(_exStrategy, _executions.poll()); } @Test @@ -106,13 +106,13 @@ public void testProduceManyNonBlockingTask() _produce.add(tasks[i]); } _produce.add(NULLTASK); - _ewyk.produce(); + _exStrategy.produce(); for (Task task : tasks) { assertThat(task.hasRun(), Matchers.equalTo(true)); } - assertEquals(_ewyk, _executions.poll()); + assertEquals(_exStrategy, _executions.poll()); } @Test @@ -126,7 +126,7 @@ public void run() { _produce.add(t0); _produce.add(NULLTASK); - _ewyk.produce(); + _exStrategy.produce(); } }; thread.start(); @@ -136,10 +136,10 @@ public void run() assertEquals(thread, t0.getThread()); // Should have dispatched only one helper - assertEquals(_ewyk, _executions.poll()); + assertEquals(_exStrategy, _executions.poll()); // which is make us idle - _ewyk.run(); - assertThat(_ewyk.isIdle(), Matchers.equalTo(true)); + _exStrategy.run(); + assertThat(_exStrategy.isIdle(), Matchers.equalTo(true)); // unblock task t0.unblock(); @@ -158,7 +158,7 @@ public void run() { _produce.add(t0); _produce.add(NULLTASK); - _ewyk.produce(); + _exStrategy.produce(); } }; thread.start(); @@ -167,16 +167,16 @@ public void run() t0.awaitRun(); // Should have dispatched only one helper - assertEquals(_ewyk, _executions.poll()); + assertEquals(_exStrategy, _executions.poll()); // unblock task t0.unblock(); // will run to completion because are become idle thread.join(); - assertThat(_ewyk.isIdle(), Matchers.equalTo(true)); + assertThat(_exStrategy.isIdle(), Matchers.equalTo(true)); // because we are idle, dispatched thread is noop - _ewyk.run(); + _exStrategy.run(); } @Test @@ -189,7 +189,7 @@ public void testBlockedInProduce() throws Exception public void run() { _produce.add(t0); - _ewyk.produce(); + _exStrategy.produce(); } }; thread0.start(); @@ -199,10 +199,10 @@ public void run() assertEquals(thread0, t0.getThread()); // Should have dispatched another helper - assertEquals(_ewyk, _executions.poll()); + assertEquals(_exStrategy, _executions.poll()); // dispatched thread will block in produce - Thread thread1 = new Thread(_ewyk); + Thread thread1 = new Thread(_exStrategy); thread1.start(); // Spin @@ -215,10 +215,10 @@ public void run() assertEquals(thread1, _producer); // because we are producing, any other dispatched threads are noops - _ewyk.run(); + _exStrategy.run(); // ditto with execute - _ewyk.produce(); + _exStrategy.produce(); // Now if unblock the production by the dispatched thread final Task t1 = new Task(true); @@ -229,7 +229,7 @@ public void run() assertEquals(thread1, t1.getThread()); // and another thread will have been requested - assertEquals(_ewyk, _executions.poll()); + assertEquals(_exStrategy, _executions.poll()); // If we unblock t1, it will overtake t0 and try to produce again! t1.unblock(); @@ -246,7 +246,7 @@ public void run() thread0.join(); // If the requested extra thread turns up, it is also noop because we are producing - _ewyk.run(); + _exStrategy.run(); // Give the idle job _produce.add(NULLTASK); @@ -266,7 +266,7 @@ public void testExecuteWhileIdling() throws Exception public void run() { _produce.add(t0); - _ewyk.produce(); + _exStrategy.produce(); } }; thread0.start(); @@ -276,13 +276,13 @@ public void run() assertEquals(thread0, t0.getThread()); // Should have dispatched another helper - assertEquals(_ewyk, _executions.poll()); + assertEquals(_exStrategy, _executions.poll()); // We will go idle when we next produce _produce.add(NULLTASK); // execute will return immediately because it did not yet see the idle. - _ewyk.produce(); + _exStrategy.produce(); // When we unblock t0, thread1 will see the idle, t0.unblock(); @@ -298,8 +298,8 @@ public void run() // When the dispatched thread turns up, it will see the second idle _produce.add(NULLTASK); - _ewyk.run(); - assertThat(_ewyk.isIdle(), Matchers.equalTo(true)); + _exStrategy.run(); + assertThat(_exStrategy.isIdle(), Matchers.equalTo(true)); // So that when t1 completes it does not produce again. t1.unblock(); diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java index 2c2800840c9f..52d7e14d64be 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java @@ -44,7 +44,7 @@ public static Stream strategies() return Stream.of( ProduceExecuteConsume.class, ExecuteProduceConsume.class, - EatWhatYouKill.class + AdaptiveExecutionStrategy.class ).map(Arguments::of); } diff --git a/tests/jetty-jmh/src/main/java/org/eclipse/jetty/util/thread/strategy/jmh/EWYKBenchmark.java b/tests/jetty-jmh/src/main/java/org/eclipse/jetty/util/thread/strategy/jmh/EWYKBenchmark.java deleted file mode 100644 index 8c9bc153d5f8..000000000000 --- a/tests/jetty-jmh/src/main/java/org/eclipse/jetty/util/thread/strategy/jmh/EWYKBenchmark.java +++ /dev/null @@ -1,194 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License v. 2.0 which is available at -// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// ======================================================================== -// - -package org.eclipse.jetty.util.thread.strategy.jmh; - -import java.io.File; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - -import org.eclipse.jetty.util.IO; -import org.eclipse.jetty.util.component.LifeCycle; -import org.eclipse.jetty.util.thread.ExecutionStrategy; -import org.eclipse.jetty.util.thread.Invocable; -import org.eclipse.jetty.util.thread.ReservedThreadExecutor; -import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; -import org.eclipse.jetty.util.thread.strategy.ProduceConsume; -import org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.BenchmarkMode; -import org.openjdk.jmh.annotations.Level; -import org.openjdk.jmh.annotations.Mode; -import org.openjdk.jmh.annotations.Param; -import org.openjdk.jmh.annotations.Scope; -import org.openjdk.jmh.annotations.Setup; -import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.TearDown; -import org.openjdk.jmh.runner.Runner; -import org.openjdk.jmh.runner.RunnerException; -import org.openjdk.jmh.runner.options.Options; -import org.openjdk.jmh.runner.options.OptionsBuilder; -import org.openjdk.jmh.runner.options.TimeValue; - -@State(Scope.Benchmark) -public class EWYKBenchmark -{ - static TestServer server; - static ReservedThreadExecutor reserved; - static Path directory; - - @Param({"PC", "PEC", "EWYK"}) - public static String strategyName; - - @Param({"true", "false"}) - public static boolean sleeping; - - @Param({"true", "false"}) - public static boolean nonBlocking; - - @Setup(Level.Trial) - public static void setupServer() throws Exception - { - // Make a test directory - directory = Files.createTempDirectory("ewyk"); - - // Make some test files - for (int i = 0; i < 75; i++) - { - File.createTempFile("ewyk_benchmark", i + ".txt", directory.toFile()); - } - - server = new TestServer(directory.toFile()); - server.start(); - reserved = new ReservedThreadExecutor(server, 20); - reserved.start(); - } - - @TearDown(Level.Trial) - public static void stopServer() throws Exception - { - try - { - IO.delete(directory.toFile()); - } - catch (Exception e) - { - System.out.println("cannot delete directory:" + directory); - } - reserved.stop(); - server.stop(); - } - - @State(Scope.Thread) - public static class ThreadState implements Runnable - { - final TestConnection connection = new TestConnection(server, sleeping); - final ExecutionStrategy strategy; - - { - switch (strategyName) - { - case "PC": - strategy = new ProduceConsume(connection, server); - break; - - case "PEC": - strategy = new ProduceExecuteConsume(connection, server); - break; - - case "EWYK": - strategy = new EatWhatYouKill(connection, server); - break; - - default: - throw new IllegalStateException(); - } - - LifeCycle.start(strategy); - } - - @Override - public void run() - { - strategy.produce(); - } - } - - @Benchmark - @BenchmarkMode({Mode.Throughput}) - public long testStrategy(ThreadState state) throws Exception - { - int r; - switch (server.getRandom(8)) - { - case 0: - r = 4; - break; - case 1: - case 2: - r = 2; - break; - default: - r = 1; - break; - } - - List> results = new ArrayList<>(r); - for (int i = 0; i < r; i++) - { - CompletableFuture result = new CompletableFuture(); - results.add(result); - state.connection.submit(result); - } - - if (nonBlocking) - Invocable.invokeNonBlocking(state); - else - state.run(); - - long hash = 0; - for (CompletableFuture result : results) - { - hash ^= result.get().hashCode(); - } - - return hash; - } - - public static void main(String[] args) throws RunnerException - { - Options opt = new OptionsBuilder() - .include(EWYKBenchmark.class.getSimpleName()) - .warmupIterations(2) - .measurementIterations(3) - .forks(1) - .threads(400) - // .syncIterations(true) // Don't start all threads at same time - .warmupTime(new TimeValue(10000, TimeUnit.MILLISECONDS)) - .measurementTime(new TimeValue(10000, TimeUnit.MILLISECONDS)) - // .addProfiler(CompilerProfiler.class) - // .addProfiler(LinuxPerfProfiler.class) - // .addProfiler(LinuxPerfNormProfiler.class) - // .addProfiler(LinuxPerfAsmProfiler.class) - // .resultFormat(ResultFormatType.CSV) - .build(); - - new Runner(opt).run(); - } -} - - From f479c4de821336aa0e3d171287a7286513a25bbc Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 3 Jun 2021 21:47:26 +1000 Subject: [PATCH 02/21] Rename EWYK The AdaptiveExecutionStrategy Rename EWYK The AdaptiveExecutionStrategy, which better represents the nature of the strategy. Signed-off-by: Greg Wilkins --- .../strategy/AdaptiveExecutionStrategy.java | 498 ++++++++++++++++++ .../thread/AdaptiveExecutionStrategyTest.java | 144 +++++ .../AdaptiveExecutionStrategyBenchmark.java | 194 +++++++ 3 files changed, 836 insertions(+) create mode 100644 jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java create mode 100644 jetty-util/src/test/java/org/eclipse/jetty/util/thread/AdaptiveExecutionStrategyTest.java create mode 100644 tests/jetty-jmh/src/main/java/org/eclipse/jetty/util/thread/strategy/jmh/AdaptiveExecutionStrategyBenchmark.java diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java new file mode 100644 index 000000000000..b0987c32c0a4 --- /dev/null +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java @@ -0,0 +1,498 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.util.thread.strategy; + +import java.io.Closeable; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.LongAdder; + +import org.eclipse.jetty.util.annotation.ManagedAttribute; +import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.util.annotation.ManagedOperation; +import org.eclipse.jetty.util.component.ContainerLifeCycle; +import org.eclipse.jetty.util.thread.AutoLock; +import org.eclipse.jetty.util.thread.ExecutionStrategy; +import org.eclipse.jetty.util.thread.Invocable; +import org.eclipse.jetty.util.thread.TryExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + *

An adaptive execution strategy that uses the {@link Invocable} status + * of both the task and the current execution to select an optimal strategy + * that prioritizes executing the task immediately in the current thread + * if can be done so without thread starvation issues.

+ * + *

If a produced task has used the {@link Invocable} API to indicate that + * it is {@link Invocable.InvocationType#NON_BLOCKING}, + * then the task will always be run directly and production resumed afterwards. + * When operating in this pattern, the sub-strategy is called ProduceConsume (PC).

+ * + *

If the producing thread has indicated that it is itself + * {@link Invocable.InvocationType#NON_BLOCKING}, then produced tasks that + * are {@link Invocable.InvocationType#EITHER}, are invoked directly as + * {@link Invocable.InvocationType#NON_BLOCKING} and then production resumed. + * This sub-strategy is called ProduceInvokeConsume (PIC).

+ * + *

For all other tasks produced by a {@link Invocable.InvocationType#NON_BLOCKING} + * thread, the task is dispatched for execution by a thread pool and the current + * producing thread immediately continues production. + * This sub-strategy is called ProduceExecuteConsume (PEC).

+ * + *

If the producing thread may block, then for produced task that also may block, + * the strategy may attempts to dispatch another producing thread via a {@link TryExecutor}. + * If either there is already a producer thread pending or the call + * to {@link TryExecutor#tryExecute(Runnable)} succeeds, then the task + * is executed directly and the pending producer will take over production. + * This sub-strategy is called Execute Produce Consume (EPC), but was previously + * named EatWhatYouKill (EWYK) after a hunting proverb, in the sense that one + * should kill(produce) only to eat(consume).

+ * + *

If there is no pending producer thread available and the produce task + * is {@link Invocable.InvocationType#EITHER}, then the PIC sub-strategy is + * used. Otherwise the PEC sub-strategy is used.

+ */ +@ManagedObject("Adaptive execution strategy") +public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements ExecutionStrategy, Runnable +{ + private static final Logger LOG = LoggerFactory.getLogger(AdaptiveExecutionStrategy.class); + + private enum State + { + IDLE, PRODUCING, REPRODUCING + } + + /* The modes this strategy can work in */ + private enum Mode + { + PRODUCE_CONSUME, + PRODUCE_INVOKE_CONSUME, // This is PRODUCE_CONSUME an EITHER task with NON_BLOCKING invocation + PRODUCE_EXECUTE_CONSUME, + EXECUTE_PRODUCE_CONSUME // Eat What You Kill! + } + + private final AutoLock _lock = new AutoLock(); + private final LongAdder _pcMode = new LongAdder(); + private final LongAdder _picMode = new LongAdder(); + private final LongAdder _pecMode = new LongAdder(); + private final LongAdder _epcMode = new LongAdder(); + private final Producer _producer; + private final Executor _executor; + private final TryExecutor _tryExecutor; + private State _state = State.IDLE; + private boolean _pending; + + public AdaptiveExecutionStrategy(Producer producer, Executor executor) + { + _producer = producer; + _executor = executor; + _tryExecutor = TryExecutor.asTryExecutor(executor); + addBean(_producer); + addBean(_tryExecutor); + if (LOG.isDebugEnabled()) + LOG.debug("{} created", this); + } + + @Override + public void dispatch() + { + boolean execute = false; + try (AutoLock l = _lock.lock()) + { + switch (_state) + { + case IDLE: + if (!_pending) + { + _pending = true; + execute = true; + } + break; + + case PRODUCING: + _state = State.REPRODUCING; + break; + + default: + break; + } + } + if (LOG.isDebugEnabled()) + LOG.debug("{} dispatch {}", this, execute); + if (execute) + _executor.execute(this); + } + + @Override + public void run() + { + tryProduce(true); + } + + @Override + public void produce() + { + tryProduce(false); + } + + private void tryProduce(boolean wasPending) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} tryProduce {}", this, wasPending); + + try (AutoLock l = _lock.lock()) + { + if (wasPending) + _pending = false; + + switch (_state) + { + case IDLE: + // Enter PRODUCING + _state = State.PRODUCING; + break; + + case PRODUCING: + // Keep other Thread producing + _state = State.REPRODUCING; + return; + + default: + return; + } + } + + boolean nonBlocking = Invocable.isNonBlockingInvocation(); + + while (isRunning()) + { + try + { + if (doProduce(nonBlocking)) + continue; + return; + } + catch (Throwable th) + { + LOG.warn("Unable to produce", th); + } + } + } + + private boolean doProduce(boolean nonBlocking) + { + Runnable task = produceTask(); + + if (task == null) + { + try (AutoLock l = _lock.lock()) + { + // Could another task just have been queued with a produce call? + switch (_state) + { + case PRODUCING: + _state = State.IDLE; + return false; + + case REPRODUCING: + _state = State.PRODUCING; + return true; + + default: + throw new IllegalStateException(toStringLocked()); + } + } + } + + Mode mode; + if (nonBlocking) + { + // The calling thread cannot block, so we only have a choice between PC and PEC modes, + // based on the invocation type of the task + switch (Invocable.getInvocationType(task)) + { + case NON_BLOCKING: + mode = Mode.PRODUCE_CONSUME; + break; + + case EITHER: + mode = Mode.PRODUCE_INVOKE_CONSUME; + break; + + default: + mode = Mode.PRODUCE_EXECUTE_CONSUME; + break; + } + } + else + { + // The calling thread can block, so we can choose between PC, PEC and EPC modes, + // based on the invocation type of the task and if a reserved thread is available + switch (Invocable.getInvocationType(task)) + { + case NON_BLOCKING: + mode = Mode.PRODUCE_CONSUME; + break; + + case BLOCKING: + // The task is blocking, so PC is not an option. Thus we choose + // between EPC and PEC based on the availability of a reserved thread. + try (AutoLock l = _lock.lock()) + { + if (_pending) + { + _state = State.IDLE; + mode = Mode.EXECUTE_PRODUCE_CONSUME; + } + else if (_tryExecutor.tryExecute(this)) + { + _pending = true; + _state = State.IDLE; + mode = Mode.EXECUTE_PRODUCE_CONSUME; + } + else + { + mode = Mode.PRODUCE_EXECUTE_CONSUME; + } + } + break; + + case EITHER: + // The task may be non blocking, so PC is an option. Thus we choose + // between EPC and PC based on the availability of a reserved thread. + try (AutoLock l = _lock.lock()) + { + if (_pending) + { + _state = State.IDLE; + mode = Mode.EXECUTE_PRODUCE_CONSUME; + } + else if (_tryExecutor.tryExecute(this)) + { + _pending = true; + _state = State.IDLE; + mode = Mode.EXECUTE_PRODUCE_CONSUME; + } + else + { + // PC mode, but we must consume with non-blocking invocation + // as we may be the last thread and we cannot block + mode = Mode.PRODUCE_INVOKE_CONSUME; + } + } + break; + + default: + throw new IllegalStateException(toString()); + } + } + + if (LOG.isDebugEnabled()) + LOG.debug("{} m={} t={}/{}", this, mode, task, Invocable.getInvocationType(task)); + + // Consume or execute task + switch (mode) + { + case PRODUCE_CONSUME: + _pcMode.increment(); + runTask(task); + return true; + + case PRODUCE_INVOKE_CONSUME: + _picMode.increment(); + invokeTask(task); + return true; + + case PRODUCE_EXECUTE_CONSUME: + _pecMode.increment(); + execute(task); + return true; + + case EXECUTE_PRODUCE_CONSUME: + _epcMode.increment(); + runTask(task); + + // Try to produce again? + try (AutoLock l = _lock.lock()) + { + if (_state == State.IDLE) + { + // We beat the pending producer, so we will become the producer instead + _state = State.PRODUCING; + return true; + } + } + return false; + + default: + throw new IllegalStateException(toString()); + } + } + + private void runTask(Runnable task) + { + try + { + task.run(); + } + catch (Throwable x) + { + LOG.warn("Task run failed", x); + } + } + + private void invokeTask(Runnable task) + { + try + { + Invocable.invokeNonBlocking(task); + } + catch (Throwable x) + { + LOG.warn("Task invoke failed", x); + } + } + + private Runnable produceTask() + { + try + { + return _producer.produce(); + } + catch (Throwable e) + { + LOG.warn("Task produce failed", e); + return null; + } + } + + private void execute(Runnable task) + { + try + { + _executor.execute(task); + } + catch (RejectedExecutionException e) + { + if (isRunning()) + LOG.warn("Execute failed", e); + else + LOG.trace("IGNORED", e); + + if (task instanceof Closeable) + { + try + { + ((Closeable)task).close(); + } + catch (Throwable e2) + { + LOG.trace("IGNORED", e2); + } + } + } + } + + @ManagedAttribute(value = "number of tasks consumed with PC mode", readonly = true) + public long getPCTasksConsumed() + { + return _pcMode.longValue(); + } + + @ManagedAttribute(value = "number of tasks executed with PIC mode", readonly = true) + public long getPICTasksExecuted() + { + return _picMode.longValue(); + } + + @ManagedAttribute(value = "number of tasks executed with PEC mode", readonly = true) + public long getPECTasksExecuted() + { + return _pecMode.longValue(); + } + + @ManagedAttribute(value = "number of tasks consumed with EPC mode", readonly = true) + public long getEPCTasksConsumed() + { + return _epcMode.longValue(); + } + + @ManagedAttribute(value = "whether this execution strategy is idle", readonly = true) + public boolean isIdle() + { + try (AutoLock l = _lock.lock()) + { + return _state == State.IDLE; + } + } + + @ManagedOperation(value = "resets the task counts", impact = "ACTION") + public void reset() + { + _pcMode.reset(); + _epcMode.reset(); + _pecMode.reset(); + _picMode.reset(); + } + + @Override + public String toString() + { + try (AutoLock l = _lock.lock()) + { + return toStringLocked(); + } + } + + public String toStringLocked() + { + StringBuilder builder = new StringBuilder(); + getString(builder); + getState(builder); + return builder.toString(); + } + + private void getString(StringBuilder builder) + { + builder.append(getClass().getSimpleName()); + builder.append('@'); + builder.append(Integer.toHexString(hashCode())); + builder.append('/'); + builder.append(_producer); + builder.append('/'); + } + + private void getState(StringBuilder builder) + { + builder.append(_state); + builder.append("/p="); + builder.append(_pending); + builder.append('/'); + builder.append(_tryExecutor); + builder.append("[pc="); + builder.append(getPCTasksConsumed()); + builder.append(",pic="); + builder.append(getPICTasksExecuted()); + builder.append(",pec="); + builder.append(getPECTasksExecuted()); + builder.append(",epc="); + builder.append(getEPCTasksConsumed()); + builder.append("]"); + builder.append("@"); + builder.append(DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); + } +} diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/AdaptiveExecutionStrategyTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/AdaptiveExecutionStrategyTest.java new file mode 100644 index 000000000000..6e8055f67e88 --- /dev/null +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/AdaptiveExecutionStrategyTest.java @@ -0,0 +1,144 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.util.thread; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.jetty.logging.StacklessLogging; +import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertNull; + +public class AdaptiveExecutionStrategyTest +{ + private AdaptiveExecutionStrategy aes; + + private void startAES(ExecutionStrategy.Producer producer) throws Exception + { + QueuedThreadPool executor = new QueuedThreadPool(); + aes = new AdaptiveExecutionStrategy(producer, executor); + aes.start(); + ReservedThreadExecutor tryExecutor = executor.getBean(ReservedThreadExecutor.class); + // Prime the executor so that there is a reserved thread. + executor.tryExecute(() -> + { + }); + while (tryExecutor.getAvailable() == 0) + { + Thread.sleep(10); + } + } + + @AfterEach + public void dispose() throws Exception + { + if (aes != null) + aes.stop(); + } + + @Test + public void testExceptionThrownByTask() throws Exception + { + try (StacklessLogging ignored = new StacklessLogging(AdaptiveExecutionStrategy.class)) + { + AtomicReference detector = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(2); + BlockingQueue tasks = new LinkedBlockingQueue<>(); + startAES(() -> + { + boolean proceed = detector.compareAndSet(null, new Throwable()); + if (proceed) + { + try + { + latch.countDown(); + return tasks.poll(1, TimeUnit.SECONDS); + } + catch (InterruptedException x) + { + x.printStackTrace(); + return null; + } + finally + { + detector.set(null); + } + } + else + { + return null; + } + }); + + // Start production in another thread. + aes.dispatch(); + + tasks.offer(new Task(() -> + { + try + { + // While thread1 runs this task, simulate + // that thread2 starts producing. + aes.dispatch(); + // Wait for thread2 to block in produce(). + latch.await(); + // Throw to verify that exceptions are handled correctly. + throw new RuntimeException(); + } + catch (InterruptedException x) + { + x.printStackTrace(); + } + }, Invocable.InvocationType.BLOCKING)); + + // Wait until AES is idle. + while (!aes.isIdle()) + { + Thread.sleep(10); + } + + assertNull(detector.get()); + } + } + + private static class Task implements Runnable, Invocable + { + private final Runnable task; + private final InvocationType invocationType; + + private Task(Runnable task, InvocationType invocationType) + { + this.task = task; + this.invocationType = invocationType; + } + + @Override + public void run() + { + task.run(); + } + + @Override + public InvocationType getInvocationType() + { + return invocationType; + } + } +} diff --git a/tests/jetty-jmh/src/main/java/org/eclipse/jetty/util/thread/strategy/jmh/AdaptiveExecutionStrategyBenchmark.java b/tests/jetty-jmh/src/main/java/org/eclipse/jetty/util/thread/strategy/jmh/AdaptiveExecutionStrategyBenchmark.java new file mode 100644 index 000000000000..724f7ac5f957 --- /dev/null +++ b/tests/jetty-jmh/src/main/java/org/eclipse/jetty/util/thread/strategy/jmh/AdaptiveExecutionStrategyBenchmark.java @@ -0,0 +1,194 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.util.thread.strategy.jmh; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.thread.ExecutionStrategy; +import org.eclipse.jetty.util.thread.Invocable; +import org.eclipse.jetty.util.thread.ReservedThreadExecutor; +import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy; +import org.eclipse.jetty.util.thread.strategy.ProduceConsume; +import org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.TimeValue; + +@State(Scope.Benchmark) +public class AdaptiveExecutionStrategyBenchmark +{ + static TestServer server; + static ReservedThreadExecutor reserved; + static Path directory; + + @Param({"PC", "PEC", "AES"}) + public static String strategyName; + + @Param({"true", "false"}) + public static boolean sleeping; + + @Param({"true", "false"}) + public static boolean nonBlocking; + + @Setup(Level.Trial) + public static void setupServer() throws Exception + { + // Make a test directory + directory = Files.createTempDirectory("AES"); + + // Make some test files + for (int i = 0; i < 75; i++) + { + File.createTempFile("AES_benchmark", i + ".txt", directory.toFile()); + } + + server = new TestServer(directory.toFile()); + server.start(); + reserved = new ReservedThreadExecutor(server, 20); + reserved.start(); + } + + @TearDown(Level.Trial) + public static void stopServer() throws Exception + { + try + { + IO.delete(directory.toFile()); + } + catch (Exception e) + { + System.out.println("cannot delete directory:" + directory); + } + reserved.stop(); + server.stop(); + } + + @State(Scope.Thread) + public static class ThreadState implements Runnable + { + final TestConnection connection = new TestConnection(server, sleeping); + final ExecutionStrategy strategy; + + { + switch (strategyName) + { + case "PC": + strategy = new ProduceConsume(connection, server); + break; + + case "PEC": + strategy = new ProduceExecuteConsume(connection, server); + break; + + case "AES": + strategy = new AdaptiveExecutionStrategy(connection, server); + break; + + default: + throw new IllegalStateException(); + } + + LifeCycle.start(strategy); + } + + @Override + public void run() + { + strategy.produce(); + } + } + + @Benchmark + @BenchmarkMode({Mode.Throughput}) + public long testStrategy(ThreadState state) throws Exception + { + int r; + switch (server.getRandom(8)) + { + case 0: + r = 4; + break; + case 1: + case 2: + r = 2; + break; + default: + r = 1; + break; + } + + List> results = new ArrayList<>(r); + for (int i = 0; i < r; i++) + { + CompletableFuture result = new CompletableFuture(); + results.add(result); + state.connection.submit(result); + } + + if (nonBlocking) + Invocable.invokeNonBlocking(state); + else + state.run(); + + long hash = 0; + for (CompletableFuture result : results) + { + hash ^= result.get().hashCode(); + } + + return hash; + } + + public static void main(String[] args) throws RunnerException + { + Options opt = new OptionsBuilder() + .include(AdaptiveExecutionStrategyBenchmark.class.getSimpleName()) + .warmupIterations(2) + .measurementIterations(3) + .forks(1) + .threads(400) + // .syncIterations(true) // Don't start all threads at same time + .warmupTime(new TimeValue(10000, TimeUnit.MILLISECONDS)) + .measurementTime(new TimeValue(10000, TimeUnit.MILLISECONDS)) + // .addProfiler(CompilerProfiler.class) + // .addProfiler(LinuxPerfProfiler.class) + // .addProfiler(LinuxPerfNormProfiler.class) + // .addProfiler(LinuxPerfAsmProfiler.class) + // .resultFormat(ResultFormatType.CSV) + .build(); + + new Runner(opt).run(); + } +} + + From 1d7b6143836a5c944e08b8a6bb8ca71499c6f78a Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Fri, 4 Jun 2021 09:18:21 +1000 Subject: [PATCH 03/21] Rename EWYK The AdaptiveExecutionStrategy Updated the documentation from review, but in so doing realised that the sub-strategy selection could be more simply represented in code, so that was also updated. Signed-off-by: Greg Wilkins --- .../strategy/AdaptiveExecutionStrategy.java | 152 ++++++------------ 1 file changed, 47 insertions(+), 105 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java index b0987c32c0a4..bccbf872055c 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java @@ -33,38 +33,42 @@ /** *

An adaptive execution strategy that uses the {@link Invocable} status - * of both the task and the current execution to select an optimal strategy - * that prioritizes executing the task immediately in the current thread - * if can be done so without thread starvation issues.

+ * of both the task and the current thread to select an optimal strategy + * that prioritizes executing the task immediately in the current + * producing thread if it can be done so without thread starvation issues.

* - *

If a produced task has used the {@link Invocable} API to indicate that - * it is {@link Invocable.InvocationType#NON_BLOCKING}, - * then the task will always be run directly and production resumed afterwards. - * When operating in this pattern, the sub-strategy is called ProduceConsume (PC).

+ * This strategy selects between the following sub-strategies: + *
+ *
ProduceConsume(PC)
+ *
The producing thread consumes the task by executing it directly and then + * continues to produce.
+ *
ProduceInvokeConsume(PIC)
+ *
The producing thread consumes the task by invoking it with {@link Invocable#invokeNonBlocking(Runnable)} + * and then continues to produce.
+ *
ProduceExecuteConsume(PEC)
+ *
The producing thread dispatches the task to a thread pool to be executed and then immediately resumes + * producing.
+ *
ExecuteProduceConsume(EPC)
+ *
The producing thread consumes the task by executing it directly (as in PC mode) but then races with + * a pending producer thread to take over production. + *
+ *
+ * The sub-strategy is selected as follows: + *
+ *
PC
If the produced task has used the {@link Invocable} API to indicate that + * it is {@link Invocable.InvocationType#NON_BLOCKING}.
+ *
EPC
If the producing thread is not {@link Invocable.InvocationType#NON_BLOCKING} + * and a pending producer thread is available, either because there is already a pending producer + * or one is successfully started with {@link TryExecutor#tryExecute(Runnable)}.
+ *
PIC
If the produced task has used the {@link Invocable} API to indicate that + * it is {@link Invocable.InvocationType#EITHER}.
+ *
PEC
Otherwise.
+ *
* - *

If the producing thread has indicated that it is itself - * {@link Invocable.InvocationType#NON_BLOCKING}, then produced tasks that - * are {@link Invocable.InvocationType#EITHER}, are invoked directly as - * {@link Invocable.InvocationType#NON_BLOCKING} and then production resumed. - * This sub-strategy is called ProduceInvokeConsume (PIC).

+ *

This strategy was previously named EatWhatYouKill (EWYK) because its preference to for + * a producer to directly consume a task was similar to the hunting proverb, in the sense that one + * eat(consume) what they kill(produce).

* - *

For all other tasks produced by a {@link Invocable.InvocationType#NON_BLOCKING} - * thread, the task is dispatched for execution by a thread pool and the current - * producing thread immediately continues production. - * This sub-strategy is called ProduceExecuteConsume (PEC).

- * - *

If the producing thread may block, then for produced task that also may block, - * the strategy may attempts to dispatch another producing thread via a {@link TryExecutor}. - * If either there is already a producer thread pending or the call - * to {@link TryExecutor#tryExecute(Runnable)} succeeds, then the task - * is executed directly and the pending producer will take over production. - * This sub-strategy is called Execute Produce Consume (EPC), but was previously - * named EatWhatYouKill (EWYK) after a hunting proverb, in the sense that one - * should kill(produce) only to eat(consume).

- * - *

If there is no pending producer thread available and the produce task - * is {@link Invocable.InvocationType#EITHER}, then the PIC sub-strategy is - * used. Otherwise the PEC sub-strategy is used.

*/ @ManagedObject("Adaptive execution strategy") public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements ExecutionStrategy, Runnable @@ -219,86 +223,24 @@ private boolean doProduce(boolean nonBlocking) } Mode mode; - if (nonBlocking) + Invocable.InvocationType taskType = Invocable.getInvocationType(task); + if (taskType == Invocable.InvocationType.NON_BLOCKING) { - // The calling thread cannot block, so we only have a choice between PC and PEC modes, - // based on the invocation type of the task - switch (Invocable.getInvocationType(task)) - { - case NON_BLOCKING: - mode = Mode.PRODUCE_CONSUME; - break; - - case EITHER: - mode = Mode.PRODUCE_INVOKE_CONSUME; - break; - - default: - mode = Mode.PRODUCE_EXECUTE_CONSUME; - break; - } + mode = Mode.PRODUCE_CONSUME; + } + else if (!nonBlocking && (_pending || _tryExecutor.tryExecute(this))) + { + _pending = true; + _state = State.IDLE; + mode = Mode.EXECUTE_PRODUCE_CONSUME; + } + else if (taskType == Invocable.InvocationType.EITHER) + { + mode = Mode.PRODUCE_INVOKE_CONSUME; } else { - // The calling thread can block, so we can choose between PC, PEC and EPC modes, - // based on the invocation type of the task and if a reserved thread is available - switch (Invocable.getInvocationType(task)) - { - case NON_BLOCKING: - mode = Mode.PRODUCE_CONSUME; - break; - - case BLOCKING: - // The task is blocking, so PC is not an option. Thus we choose - // between EPC and PEC based on the availability of a reserved thread. - try (AutoLock l = _lock.lock()) - { - if (_pending) - { - _state = State.IDLE; - mode = Mode.EXECUTE_PRODUCE_CONSUME; - } - else if (_tryExecutor.tryExecute(this)) - { - _pending = true; - _state = State.IDLE; - mode = Mode.EXECUTE_PRODUCE_CONSUME; - } - else - { - mode = Mode.PRODUCE_EXECUTE_CONSUME; - } - } - break; - - case EITHER: - // The task may be non blocking, so PC is an option. Thus we choose - // between EPC and PC based on the availability of a reserved thread. - try (AutoLock l = _lock.lock()) - { - if (_pending) - { - _state = State.IDLE; - mode = Mode.EXECUTE_PRODUCE_CONSUME; - } - else if (_tryExecutor.tryExecute(this)) - { - _pending = true; - _state = State.IDLE; - mode = Mode.EXECUTE_PRODUCE_CONSUME; - } - else - { - // PC mode, but we must consume with non-blocking invocation - // as we may be the last thread and we cannot block - mode = Mode.PRODUCE_INVOKE_CONSUME; - } - } - break; - - default: - throw new IllegalStateException(toString()); - } + mode = Mode.PRODUCE_EXECUTE_CONSUME; } if (LOG.isDebugEnabled()) From 28bc9335559a2c396deeb4e761031b81fee67ca9 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Fri, 4 Jun 2021 09:20:51 +1000 Subject: [PATCH 04/21] Rename EWYK The AdaptiveExecutionStrategy Updated the documentation from review, but in so doing realised that the sub-strategy selection could be more simply represented in code, so that was also updated. Signed-off-by: Greg Wilkins --- .../jetty/util/thread/strategy/AdaptiveExecutionStrategy.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java index bccbf872055c..46e7f2e99a72 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java @@ -55,8 +55,8 @@ * * The sub-strategy is selected as follows: *
- *
PC
If the produced task has used the {@link Invocable} API to indicate that - * it is {@link Invocable.InvocationType#NON_BLOCKING}.
+ *
PC
If the produced task has been invoked with {@link Invocable#invokeNonBlocking(Runnable) + * to indicate that it is {@link Invocable.InvocationType#NON_BLOCKING}.
*
EPC
If the producing thread is not {@link Invocable.InvocationType#NON_BLOCKING} * and a pending producer thread is available, either because there is already a pending producer * or one is successfully started with {@link TryExecutor#tryExecute(Runnable)}.
From cc0efd2e64409a10964e49ee73ef22273bcc64dc Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Fri, 4 Jun 2021 09:24:51 +1000 Subject: [PATCH 05/21] Rename EWYK The AdaptiveExecutionStrategy Updated the documentation from review, but in so doing realised that the sub-strategy selection could be more simply represented in code, so that was also updated. Signed-off-by: Greg Wilkins --- .../jetty/util/thread/strategy/AdaptiveExecutionStrategy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java index 46e7f2e99a72..863ff10d9f48 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java @@ -67,7 +67,7 @@ * *

This strategy was previously named EatWhatYouKill (EWYK) because its preference to for * a producer to directly consume a task was similar to the hunting proverb, in the sense that one - * eat(consume) what they kill(produce).

+ * should eat(consume) what they kill(produce).

* */ @ManagedObject("Adaptive execution strategy") From 194e63874ad9b176d9bb25ac4a1dd6c47bc7af24 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Fri, 4 Jun 2021 09:53:12 +1000 Subject: [PATCH 06/21] Rename EWYK The AdaptiveExecutionStrategy Added notes about chaining strategies and thread starvation Signed-off-by: Greg Wilkins --- .../strategy/AdaptiveExecutionStrategy.java | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java index 863ff10d9f48..5b8a894b621f 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java @@ -60,11 +60,23 @@ *
EPC
If the producing thread is not {@link Invocable.InvocationType#NON_BLOCKING} * and a pending producer thread is available, either because there is already a pending producer * or one is successfully started with {@link TryExecutor#tryExecute(Runnable)}.
- *
PIC
If the produced task has used the {@link Invocable} API to indicate that - * it is {@link Invocable.InvocationType#EITHER}.
+ *
PIC
If the produced task has used the {@link Invocable#getInvocationType()} API to + * indicate that it is {@link Invocable.InvocationType#EITHER}.
*
PEC
Otherwise.
*
- * + *

Because of the preference for {@code PC} mode, on a multicore machine with many + * many {@link Invocable.InvocationType#NON_BLOCKING} tasks, multiple instances of the strategy may be + * required to keep all CPUs on the system busy.

+ *

Since the producing thread may be invoked with {@link Invocable#invokeNonBlocking(Runnable) + * this allows {@link AdaptiveExecutionStrategy}s to be efficiently and safely chain: so that a task + * produced by one execution strategy may become itself become a producer in a second execution strategy + * (e.g. an IO selector may use an execution strategy to handle multiple connections and each + * connection may use a execution strategy to handle multiplexed channels/streams within the connection). + * If a task containing another execution strategy identifies as {@link Invocable.InvocationType#EITHER} + * then the first strategy may invoke it as {@link Invocable.InvocationType#NON_BLOCKING} when it has + * no pending producer threads available. This avoids thread starvation as the production on the second + * strategy can always be executed, but without t he risk that it may block the last available producer + * for the first strategy.

*

This strategy was previously named EatWhatYouKill (EWYK) because its preference to for * a producer to directly consume a task was similar to the hunting proverb, in the sense that one * should eat(consume) what they kill(produce).

From 21e70b8c3223d3c066054ab307faa1ae2bc16650 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Fri, 4 Jun 2021 09:59:59 +1000 Subject: [PATCH 07/21] Rename EWYK The AdaptiveExecutionStrategy Fixed formatting and made a little clearer Signed-off-by: Greg Wilkins --- .../strategy/AdaptiveExecutionStrategy.java | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java index 5b8a894b621f..9f8c022b74ab 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java @@ -55,7 +55,7 @@ * * The sub-strategy is selected as follows: *
- *
PC
If the produced task has been invoked with {@link Invocable#invokeNonBlocking(Runnable) + *
PC
If the produced task has been invoked with {@link Invocable#invokeNonBlocking(Runnable)} * to indicate that it is {@link Invocable.InvocationType#NON_BLOCKING}.
*
EPC
If the producing thread is not {@link Invocable.InvocationType#NON_BLOCKING} * and a pending producer thread is available, either because there is already a pending producer @@ -64,19 +64,22 @@ * indicate that it is {@link Invocable.InvocationType#EITHER}.
*
PEC
Otherwise.
*
+ * *

Because of the preference for {@code PC} mode, on a multicore machine with many * many {@link Invocable.InvocationType#NON_BLOCKING} tasks, multiple instances of the strategy may be * required to keep all CPUs on the system busy.

- *

Since the producing thread may be invoked with {@link Invocable#invokeNonBlocking(Runnable) - * this allows {@link AdaptiveExecutionStrategy}s to be efficiently and safely chain: so that a task - * produced by one execution strategy may become itself become a producer in a second execution strategy + * + *

Since the producing thread may be invoked with {@link Invocable#invokeNonBlocking(Runnable)} + * this allows {@link AdaptiveExecutionStrategy}s to be efficiently and safely chained: so that a task + * produced by one execution strategy may become itself be a producer in a second execution strategy * (e.g. an IO selector may use an execution strategy to handle multiple connections and each * connection may use a execution strategy to handle multiplexed channels/streams within the connection). - * If a task containing another execution strategy identifies as {@link Invocable.InvocationType#EITHER} - * then the first strategy may invoke it as {@link Invocable.InvocationType#NON_BLOCKING} when it has - * no pending producer threads available. This avoids thread starvation as the production on the second - * strategy can always be executed, but without t he risk that it may block the last available producer - * for the first strategy.

+ * A task containing another {@link AdaptiveExecutionStrategy} should identify as + * {@link Invocable.InvocationType#EITHER} so when there are no pending producers threads available to + * the first strategy, then it may invoke the second as {@link Invocable.InvocationType#NON_BLOCKING}. + * This avoids starvation as the production on the second strategy can always be executed, + * but without the risk that it may block the last available producer for the first strategy.

+ * *

This strategy was previously named EatWhatYouKill (EWYK) because its preference to for * a producer to directly consume a task was similar to the hunting proverb, in the sense that one * should eat(consume) what they kill(produce).

From 325719cfd0f2da962dbccbbfa8ee4d5728b7ad14 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Fri, 4 Jun 2021 10:11:03 +1000 Subject: [PATCH 08/21] Rename EWYK The AdaptiveExecutionStrategy Added deprecated version of EWYK Signed-off-by: Greg Wilkins --- .../util/thread/strategy/EatWhatYouKill.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java new file mode 100644 index 000000000000..8a5613c5d426 --- /dev/null +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java @@ -0,0 +1,27 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== + +package org.eclipse.jetty.util.thread.strategy; + +import java.util.concurrent.Executor; + +/** + * @deprecated This class has been renamed to {@link AdaptiveExecutionStrategy} + */ +@Deprecated +public class EatWhatYouKill extends AdaptiveExecutionStrategy +{ + public EatWhatYouKill(Producer producer, Executor executor) + { + super(producer, executor); + } +} From e2bbcf002b8be0b97139fac66fc75d4943c89558 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Fri, 4 Jun 2021 13:40:58 +1000 Subject: [PATCH 09/21] Update from review --- .../strategy/AdaptiveExecutionStrategy.java | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java index 9f8c022b74ab..64df1a663cff 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java @@ -243,19 +243,29 @@ private boolean doProduce(boolean nonBlocking) { mode = Mode.PRODUCE_CONSUME; } - else if (!nonBlocking && (_pending || _tryExecutor.tryExecute(this))) + else if (!nonBlocking) { - _pending = true; - _state = State.IDLE; - mode = Mode.EXECUTE_PRODUCE_CONSUME; - } - else if (taskType == Invocable.InvocationType.EITHER) - { - mode = Mode.PRODUCE_INVOKE_CONSUME; + try (AutoLock l = _lock.lock()) + { + if (_pending || _tryExecutor.tryExecute(this)) + { + _pending = true; + _state = State.IDLE; + mode = Mode.EXECUTE_PRODUCE_CONSUME; + } + else + { + mode = taskType == Invocable.InvocationType.EITHER + ? Mode.PRODUCE_INVOKE_CONSUME + : Mode.PRODUCE_EXECUTE_CONSUME; + } + } } else { - mode = Mode.PRODUCE_EXECUTE_CONSUME; + mode = taskType == Invocable.InvocationType.EITHER + ? Mode.PRODUCE_INVOKE_CONSUME + : Mode.PRODUCE_EXECUTE_CONSUME; } if (LOG.isDebugEnabled()) From 68a7391fa7127e5dcab1b936b4dd3aba1d8e2b00 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Fri, 4 Jun 2021 14:10:47 +1000 Subject: [PATCH 10/21] Fixed format --- .../org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java | 1 + 1 file changed, 1 insertion(+) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java index 8a5613c5d426..a1d3a5f9aa6d 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java @@ -9,6 +9,7 @@ // // SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 // ======================================================================== +// package org.eclipse.jetty.util.thread.strategy; From 9374220a60e5e82279a4572628d55a3049ac684d Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Tue, 8 Jun 2021 12:31:26 +1000 Subject: [PATCH 11/21] Rename EWYK The AdaptiveExecutionStrategy Updates from review Signed-off-by: Greg Wilkins --- .../eclipse/jetty/util/thread/AutoLock.java | 32 +++++++++ .../strategy/AdaptiveExecutionStrategy.java | 69 ++++++++++++------- .../util/thread/strategy/EatWhatYouKill.java | 2 +- 3 files changed, 76 insertions(+), 27 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/AutoLock.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/AutoLock.java index 51a568f0e2fa..25d1693c9440 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/AutoLock.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/AutoLock.java @@ -34,6 +34,38 @@ public class AutoLock implements AutoCloseable, Serializable private final ReentrantLock _lock = new ReentrantLock(); + public static final AutoLock NO_LOCK = new AutoLock() + { + @Override + public AutoLock lock() + { + return this; + } + + @Override + public boolean isHeldByCurrentThread() + { + return false; + } + + @Override + public Condition newCondition() + { + throw new UnsupportedOperationException(); + } + + @Override + boolean isLocked() + { + return false; + } + + @Override + public void close() + { + } + }; + /** *

Acquires the lock.

* diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java index 64df1a663cff..20eb36e22cba 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java @@ -37,7 +37,7 @@ * that prioritizes executing the task immediately in the current * producing thread if it can be done so without thread starvation issues.

* - * This strategy selects between the following sub-strategies: + *

This strategy selects between the following sub-strategies:

*
*
ProduceConsume(PC)
*
The producing thread consumes the task by executing it directly and then @@ -53,16 +53,20 @@ * a pending producer thread to take over production. *
*
- * The sub-strategy is selected as follows: + *

The sub-strategy is selected as follows:

*
- *
PC
If the produced task has been invoked with {@link Invocable#invokeNonBlocking(Runnable)} + *
PC
+ *
If the produced task has been invoked with {@link Invocable#invokeNonBlocking(Runnable)} * to indicate that it is {@link Invocable.InvocationType#NON_BLOCKING}.
- *
EPC
If the producing thread is not {@link Invocable.InvocationType#NON_BLOCKING} + *
EPC
+ *
If the producing thread is not {@link Invocable.InvocationType#NON_BLOCKING} * and a pending producer thread is available, either because there is already a pending producer * or one is successfully started with {@link TryExecutor#tryExecute(Runnable)}.
- *
PIC
If the produced task has used the {@link Invocable#getInvocationType()} API to + *
PIC
+ *
If the produced task has used the {@link Invocable#getInvocationType()} API to * indicate that it is {@link Invocable.InvocationType#EITHER}.
- *
PEC
Otherwise.
+ *
PEC
+ *
Otherwise.
*
* *

Because of the preference for {@code PC} mode, on a multicore machine with many @@ -70,19 +74,19 @@ * required to keep all CPUs on the system busy.

* *

Since the producing thread may be invoked with {@link Invocable#invokeNonBlocking(Runnable)} - * this allows {@link AdaptiveExecutionStrategy}s to be efficiently and safely chained: so that a task + * this allows {@link AdaptiveExecutionStrategy}s to be efficiently and safely chained: a task * produced by one execution strategy may become itself be a producer in a second execution strategy * (e.g. an IO selector may use an execution strategy to handle multiple connections and each - * connection may use a execution strategy to handle multiplexed channels/streams within the connection). - * A task containing another {@link AdaptiveExecutionStrategy} should identify as + * connection may use a execution strategy to handle multiplexed channels/streams within the connection).

+ *

A task containing another {@link AdaptiveExecutionStrategy} should identify as * {@link Invocable.InvocationType#EITHER} so when there are no pending producers threads available to * the first strategy, then it may invoke the second as {@link Invocable.InvocationType#NON_BLOCKING}. * This avoids starvation as the production on the second strategy can always be executed, * but without the risk that it may block the last available producer for the first strategy.

* - *

This strategy was previously named EatWhatYouKill (EWYK) because its preference to for - * a producer to directly consume a task was similar to the hunting proverb, in the sense that one - * should eat(consume) what they kill(produce).

+ *

This strategy was previously named EatWhatYouKill (EWYK) because its preference for a + * producer to directly consume the tasks that it produces is similar to a hunting proverb + * that says that a hunter should eat (i.e. consume) what they kill (i.e. produced).

* */ @ManagedObject("Adaptive execution strategy") @@ -241,37 +245,48 @@ private boolean doProduce(boolean nonBlocking) Invocable.InvocationType taskType = Invocable.getInvocationType(task); if (taskType == Invocable.InvocationType.NON_BLOCKING) { + // The produced task will never block, so we directly consume the task with PC + // and then resume production. mode = Mode.PRODUCE_CONSUME; } - else if (!nonBlocking) + else { - try (AutoLock l = _lock.lock()) + // The produced task might block. Is this producing thread allowed to block? + boolean mayBlock = !nonBlocking; + + // only if this thread may block do we need to take the lock so we can + // atomically check/mutate the state and pending status. + try (AutoLock l = mayBlock ? _lock.lock() : AutoLock.NO_LOCK) { - if (_pending || _tryExecutor.tryExecute(this)) + if (mayBlock && (_pending || _tryExecutor.tryExecute(this))) { + // This producing thread may block and there is a pending producer available + // so we use EPC: the producer will directly consume the task and then + // race with the pending producer to take over production. _pending = true; _state = State.IDLE; mode = Mode.EXECUTE_PRODUCE_CONSUME; } + else if (taskType == Invocable.InvocationType.EITHER) + { + // Either this thread is not allowed to not block or there were no pending + // producer threads available. But we are able to invoke the task non blocking + // so we use PIC to directly consume the task and then resume production. + mode = Mode.PRODUCE_INVOKE_CONSUME; + } else { - mode = taskType == Invocable.InvocationType.EITHER - ? Mode.PRODUCE_INVOKE_CONSUME - : Mode.PRODUCE_EXECUTE_CONSUME; + // Otherwise we use PEC: this thread continues to produce and the task is + // consumed by the executor + mode = Mode.PRODUCE_EXECUTE_CONSUME; } } } - else - { - mode = taskType == Invocable.InvocationType.EITHER - ? Mode.PRODUCE_INVOKE_CONSUME - : Mode.PRODUCE_EXECUTE_CONSUME; - } if (LOG.isDebugEnabled()) LOG.debug("{} m={} t={}/{}", this, mode, task, Invocable.getInvocationType(task)); - // Consume or execute task + // Consume and/or execute task according to the selected mode. switch (mode) { case PRODUCE_CONSUME: @@ -293,12 +308,14 @@ else if (!nonBlocking) _epcMode.increment(); runTask(task); - // Try to produce again? + // Race the pending producer to produce again try (AutoLock l = _lock.lock()) { if (_state == State.IDLE) { // We beat the pending producer, so we will become the producer instead + // The pending produce will become a noop if it arrives whilst we are producing, + // or it may take over if we subsequently do another EPC consumption _state = State.PRODUCING; return true; } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java index a1d3a5f9aa6d..350df4101e64 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java @@ -18,7 +18,7 @@ /** * @deprecated This class has been renamed to {@link AdaptiveExecutionStrategy} */ -@Deprecated +@Deprecated(forRemoval = true) public class EatWhatYouKill extends AdaptiveExecutionStrategy { public EatWhatYouKill(Producer producer, Executor executor) From 0593f13d648160720397f75328108f0cbf2f6310 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 10 Jun 2021 07:41:01 +1000 Subject: [PATCH 12/21] Rename EWYK The AdaptiveExecutionStrategy Updates from review Signed-off-by: Greg Wilkins --- .../eclipse/jetty/util/thread/AutoLock.java | 32 ---------- .../strategy/AdaptiveExecutionStrategy.java | 60 ++++++++++++------- 2 files changed, 38 insertions(+), 54 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/AutoLock.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/AutoLock.java index 25d1693c9440..51a568f0e2fa 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/AutoLock.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/AutoLock.java @@ -34,38 +34,6 @@ public class AutoLock implements AutoCloseable, Serializable private final ReentrantLock _lock = new ReentrantLock(); - public static final AutoLock NO_LOCK = new AutoLock() - { - @Override - public AutoLock lock() - { - return this; - } - - @Override - public boolean isHeldByCurrentThread() - { - return false; - } - - @Override - public Condition newCondition() - { - throw new UnsupportedOperationException(); - } - - @Override - boolean isLocked() - { - return false; - } - - @Override - public void close() - { - } - }; - /** *

Acquires the lock.

* diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java index 20eb36e22cba..2716eb189b76 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java @@ -254,33 +254,49 @@ private boolean doProduce(boolean nonBlocking) // The produced task might block. Is this producing thread allowed to block? boolean mayBlock = !nonBlocking; - // only if this thread may block do we need to take the lock so we can + // if this thread may block do we need to take the lock so we can // atomically check/mutate the state and pending status. - try (AutoLock l = mayBlock ? _lock.lock() : AutoLock.NO_LOCK) + if (mayBlock) { - if (mayBlock && (_pending || _tryExecutor.tryExecute(this))) - { - // This producing thread may block and there is a pending producer available - // so we use EPC: the producer will directly consume the task and then - // race with the pending producer to take over production. - _pending = true; - _state = State.IDLE; - mode = Mode.EXECUTE_PRODUCE_CONSUME; - } - else if (taskType == Invocable.InvocationType.EITHER) - { - // Either this thread is not allowed to not block or there were no pending - // producer threads available. But we are able to invoke the task non blocking - // so we use PIC to directly consume the task and then resume production. - mode = Mode.PRODUCE_INVOKE_CONSUME; - } - else + try (AutoLock l = _lock.lock()) { - // Otherwise we use PEC: this thread continues to produce and the task is - // consumed by the executor - mode = Mode.PRODUCE_EXECUTE_CONSUME; + if (_pending || _tryExecutor.tryExecute(this)) + { + // This producing thread may block and there is a pending producer available + // so we use EPC: the producer will directly consume the task and then + // race with the pending producer to take over production. + _pending = true; + _state = State.IDLE; + mode = Mode.EXECUTE_PRODUCE_CONSUME; + } + else if (taskType == Invocable.InvocationType.EITHER) + { + // Either this thread is not allowed to not block or there were no pending + // producer threads available. But we are able to invoke the task non blocking + // so we use PIC to directly consume the task and then resume production. + mode = Mode.PRODUCE_INVOKE_CONSUME; + } + else + { + // Otherwise we use PEC: this thread continues to produce and the task is + // consumed by the executor + mode = Mode.PRODUCE_EXECUTE_CONSUME; + } } } + else if (taskType == Invocable.InvocationType.EITHER) + { + // Either this thread is not allowed to not block or there were no pending + // producer threads available. But we are able to invoke the task non blocking + // so we use PIC to directly consume the task and then resume production. + mode = Mode.PRODUCE_INVOKE_CONSUME; + } + else + { + // Otherwise we use PEC: this thread continues to produce and the task is + // consumed by the executor + mode = Mode.PRODUCE_EXECUTE_CONSUME; + } } if (LOG.isDebugEnabled()) From 59dab005d25dd2ccade1622c93e67d62cdc3aa1b Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 10 Jun 2021 07:57:10 +1000 Subject: [PATCH 13/21] Rename EWYK The AdaptiveExecutionStrategy version without code duplication Signed-off-by: Greg Wilkins --- .../strategy/AdaptiveExecutionStrategy.java | 84 +++++++++---------- 1 file changed, 38 insertions(+), 46 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java index 2716eb189b76..8ebf4d3ae6cf 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java @@ -242,61 +242,53 @@ private boolean doProduce(boolean nonBlocking) } Mode mode; + // Switch on the invocation type of the produced task Invocable.InvocationType taskType = Invocable.getInvocationType(task); - if (taskType == Invocable.InvocationType.NON_BLOCKING) + switch (taskType) { - // The produced task will never block, so we directly consume the task with PC - // and then resume production. - mode = Mode.PRODUCE_CONSUME; - } - else - { - // The produced task might block. Is this producing thread allowed to block? - boolean mayBlock = !nonBlocking; - - // if this thread may block do we need to take the lock so we can - // atomically check/mutate the state and pending status. - if (mayBlock) - { - try (AutoLock l = _lock.lock()) + case NON_BLOCKING: + // The produced task will never block, so we directly consume the task with PC + // and then resume production. + mode = Mode.PRODUCE_CONSUME; + break; + + case BLOCKING: + case EITHER: + default: + // if this thread may block then we need to take the lock so we can + // atomically check/mutate the state and pending status. + if (!nonBlocking) { - if (_pending || _tryExecutor.tryExecute(this)) - { - // This producing thread may block and there is a pending producer available - // so we use EPC: the producer will directly consume the task and then - // race with the pending producer to take over production. - _pending = true; - _state = State.IDLE; - mode = Mode.EXECUTE_PRODUCE_CONSUME; - } - else if (taskType == Invocable.InvocationType.EITHER) - { - // Either this thread is not allowed to not block or there were no pending - // producer threads available. But we are able to invoke the task non blocking - // so we use PIC to directly consume the task and then resume production. - mode = Mode.PRODUCE_INVOKE_CONSUME; - } - else + try (AutoLock l = _lock.lock()) { - // Otherwise we use PEC: this thread continues to produce and the task is - // consumed by the executor - mode = Mode.PRODUCE_EXECUTE_CONSUME; + if (_pending || _tryExecutor.tryExecute(this)) + { + // This producing thread may block and there is a pending producer available + // so we use EPC: the producer will directly consume the task and then + // race with the pending producer to take over production. + _pending = true; + _state = State.IDLE; + mode = Mode.EXECUTE_PRODUCE_CONSUME; + break; + } } } - } - else if (taskType == Invocable.InvocationType.EITHER) - { - // Either this thread is not allowed to not block or there were no pending - // producer threads available. But we are able to invoke the task non blocking - // so we use PIC to directly consume the task and then resume production. - mode = Mode.PRODUCE_INVOKE_CONSUME; - } - else - { + + // To arrive here, either this thread is not allowed to not block + // or there were no pending producers available. + + // if the task can be run non blocking + if (taskType == Invocable.InvocationType.EITHER) + { + // Use PIC to directly consume the task and then resume production. + mode = Mode.PRODUCE_INVOKE_CONSUME; + break; + } + // Otherwise we use PEC: this thread continues to produce and the task is // consumed by the executor mode = Mode.PRODUCE_EXECUTE_CONSUME; - } + break; } if (LOG.isDebugEnabled()) From feca6eb70580cf4655f8a9bfaeb2c485f7f140a4 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 10 Jun 2021 11:13:42 +1000 Subject: [PATCH 14/21] Rename EWYK The AdaptiveExecutionStrategy Even more comments Some duplicated code to avoid double test on taskType Signed-off-by: Greg Wilkins --- .../strategy/AdaptiveExecutionStrategy.java | 88 +++++++++++++------ 1 file changed, 63 insertions(+), 25 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java index 8ebf4d3ae6cf..71cba95bf736 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java @@ -172,40 +172,54 @@ public void produce() tryProduce(false); } + /** + * Try to become the producing thread. + * @param wasPending True if this thread was started as a pending producer + */ private void tryProduce(boolean wasPending) { if (LOG.isDebugEnabled()) LOG.debug("{} tryProduce {}", this, wasPending); + // Take the lock to atomically check if this thread can produce try (AutoLock l = _lock.lock()) { + // If this thread was the pending producer, there is no longer one pending if (wasPending) _pending = false; switch (_state) { case IDLE: - // Enter PRODUCING + // The strategy was IDLE, so this thread can become the producer _state = State.PRODUCING; break; case PRODUCING: - // Keep other Thread producing + // The strategy is already producing, so another thread must be the producer + // However, it may be just about to stop being the producer so we set the + // REPRODUCING state to force it to call #doProduce at least once more. _state = State.REPRODUCING; return; - default: + case REPRODUCING: + // Another thread is already producing and will already try another #doProduce return; + + default: + throw new IllegalStateException(); } } + // Determine this threads invocation type once outside of the production loop boolean nonBlocking = Invocable.isNonBlockingInvocation(); while (isRunning()) { try { - if (doProduce(nonBlocking)) + // Produce a task and then continue producing only if the strategy used returns true + if (!doProduce(nonBlocking)) continue; return; } @@ -216,22 +230,30 @@ private void tryProduce(boolean wasPending) } } + /** + * @param nonBlocking True if this thread has been invoked in non-blocking mode + * @return true if this thread should keep producing + */ private boolean doProduce(boolean nonBlocking) { Runnable task = produceTask(); + // If we did not produce a task if (task == null) { + // the we need take the lock to atomically determine if we should keep producing try (AutoLock l = _lock.lock()) { - // Could another task just have been queued with a produce call? switch (_state) { case PRODUCING: + // This thread was the only producer, so it is now Idle _state = State.IDLE; return false; case REPRODUCING: + // Another thread may have queued a task and tried to produce + // so this thread should try to produce again. _state = State.PRODUCING; return true; @@ -241,31 +263,31 @@ private boolean doProduce(boolean nonBlocking) } } + // Determine the execution mode by a function of the tasks invocation type, this threads invocation type + // and the availability of a pending producer thread: Mode mode; - // Switch on the invocation type of the produced task Invocable.InvocationType taskType = Invocable.getInvocationType(task); switch (taskType) { case NON_BLOCKING: - // The produced task will never block, so we directly consume the task with PC + // The produced task will not block, so use PC: consume task directly // and then resume production. mode = Mode.PRODUCE_CONSUME; break; - case BLOCKING: case EITHER: - default: - // if this thread may block then we need to take the lock so we can - // atomically check/mutate the state and pending status. + // If this producing thread may block then it can directly consume + // the task in blocking mode if a pending producer is available. if (!nonBlocking) { + // We need to take the lock so we can atomically check if a pending producer is available. try (AutoLock l = _lock.lock()) { + // If a pending producer is available or one can be started if (_pending || _tryExecutor.tryExecute(this)) { - // This producing thread may block and there is a pending producer available - // so we use EPC: the producer will directly consume the task and then - // race with the pending producer to take over production. + // use EPC: directly consume the task and then race + // with the pending producer to resume production. _pending = true; _state = State.IDLE; mode = Mode.EXECUTE_PRODUCE_CONSUME; @@ -274,27 +296,42 @@ private boolean doProduce(boolean nonBlocking) } } - // To arrive here, either this thread is not allowed to not block - // or there were no pending producers available. + // otherwise use PIC: directly consume the task in non-blocking mode and then resume production. + mode = Mode.PRODUCE_INVOKE_CONSUME; + break; - // if the task can be run non blocking - if (taskType == Invocable.InvocationType.EITHER) + case BLOCKING: + // If this producing thread may block then it can directly consume + // the blocking task if a pending producer is available. + if (!nonBlocking) { - // Use PIC to directly consume the task and then resume production. - mode = Mode.PRODUCE_INVOKE_CONSUME; - break; + // We need to take the lock so we can atomically check if a pending producer is available. + try (AutoLock l = _lock.lock()) + { + // If a pending producer is available or one can be started + if (_pending || _tryExecutor.tryExecute(this)) + { + // use EPC: directly consume the task and then race + // with the pending producer to resume production. + _pending = true; + _state = State.IDLE; + mode = Mode.EXECUTE_PRODUCE_CONSUME; + break; + } + } } - // Otherwise we use PEC: this thread continues to produce and the task is - // consumed by the executor + // Otherwise use PEC: the task is consumed by the executor and this thread continues to produce mode = Mode.PRODUCE_EXECUTE_CONSUME; break; + + default: + throw new IllegalStateException(); } + // Consume and/or execute task according to the selected mode. if (LOG.isDebugEnabled()) LOG.debug("{} m={} t={}/{}", this, mode, task, Invocable.getInvocationType(task)); - - // Consume and/or execute task according to the selected mode. switch (mode) { case PRODUCE_CONSUME: @@ -328,6 +365,7 @@ private boolean doProduce(boolean nonBlocking) return true; } } + // The pending producer is now producing, so this thread no longer produces return false; default: From 395173ebb4f0c9f1365e1de6b0af35b80d3beda5 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 10 Jun 2021 12:42:12 +1000 Subject: [PATCH 15/21] Rename EWYK The AdaptiveExecutionStrategy hide the Runnable interface from the strategy signature Signed-off-by: Greg Wilkins --- .../strategy/AdaptiveExecutionStrategy.java | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java index 71cba95bf736..f14900cc9456 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java @@ -90,7 +90,7 @@ * */ @ManagedObject("Adaptive execution strategy") -public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements ExecutionStrategy, Runnable +public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements ExecutionStrategy { private static final Logger LOG = LoggerFactory.getLogger(AdaptiveExecutionStrategy.class); @@ -116,6 +116,7 @@ private enum Mode private final Producer _producer; private final Executor _executor; private final TryExecutor _tryExecutor; + private final Runnable _runPendingProducer = () -> tryProduce(true); private State _state = State.IDLE; private boolean _pending; @@ -157,13 +158,7 @@ public void dispatch() if (LOG.isDebugEnabled()) LOG.debug("{} dispatch {}", this, execute); if (execute) - _executor.execute(this); - } - - @Override - public void run() - { - tryProduce(true); + _executor.execute(_runPendingProducer); } @Override @@ -284,7 +279,7 @@ private boolean doProduce(boolean nonBlocking) try (AutoLock l = _lock.lock()) { // If a pending producer is available or one can be started - if (_pending || _tryExecutor.tryExecute(this)) + if (_pending || _tryExecutor.tryExecute(_runPendingProducer)) { // use EPC: directly consume the task and then race // with the pending producer to resume production. @@ -309,7 +304,7 @@ private boolean doProduce(boolean nonBlocking) try (AutoLock l = _lock.lock()) { // If a pending producer is available or one can be started - if (_pending || _tryExecutor.tryExecute(this)) + if (_pending || _tryExecutor.tryExecute(_runPendingProducer)) { // use EPC: directly consume the task and then race // with the pending producer to resume production. From c0a358d3befbcde04881e631d250f040bf8a224a Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 10 Jun 2021 12:43:26 +1000 Subject: [PATCH 16/21] Rename EWYK The AdaptiveExecutionStrategy fix accidental change Signed-off-by: Greg Wilkins --- .../jetty/util/thread/strategy/AdaptiveExecutionStrategy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java index f14900cc9456..534af002adae 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java @@ -214,7 +214,7 @@ private void tryProduce(boolean wasPending) try { // Produce a task and then continue producing only if the strategy used returns true - if (!doProduce(nonBlocking)) + if (doProduce(nonBlocking)) continue; return; } From c2ecdc4bf533ad7ccb4b8b41d318870cc6e29f97 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 10 Jun 2021 13:34:04 +1000 Subject: [PATCH 17/21] Rename EWYK The AdaptiveExecutionStrategy refactor production and consumption methods to be smaller a better named/defined. Signed-off-by: Greg Wilkins --- .../strategy/AdaptiveExecutionStrategy.java | 153 +++++++++++------- 1 file changed, 97 insertions(+), 56 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java index 534af002adae..5d2aeb3fa618 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java @@ -78,6 +78,7 @@ * produced by one execution strategy may become itself be a producer in a second execution strategy * (e.g. an IO selector may use an execution strategy to handle multiple connections and each * connection may use a execution strategy to handle multiplexed channels/streams within the connection).

+ * *

A task containing another {@link AdaptiveExecutionStrategy} should identify as * {@link Invocable.InvocationType#EITHER} so when there are no pending producers threads available to * the first strategy, then it may invoke the second as {@link Invocable.InvocationType#NON_BLOCKING}. @@ -94,18 +95,35 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe { private static final Logger LOG = LoggerFactory.getLogger(AdaptiveExecutionStrategy.class); + /** + * The state of this strategy + */ private enum State { - IDLE, PRODUCING, REPRODUCING + IDLE, // No tasks or producers + PRODUCING, // There is an active producing thread + REPRODUCING // There is an active producing thread and demand for more production } - /* The modes this strategy can work in */ - private enum Mode + /* The sub-strategies used by this strategy to consume tasks that are produced */ + private enum SubStrategy { + /** + * Consume produced tasks and resume producing + */ PRODUCE_CONSUME, - PRODUCE_INVOKE_CONSUME, // This is PRODUCE_CONSUME an EITHER task with NON_BLOCKING invocation + /** + * Invoke produced tasks as non blocking and resume producing + */ + PRODUCE_INVOKE_CONSUME, + /** + * Execute produced tasks and continue producing + */ PRODUCE_EXECUTE_CONSUME, - EXECUTE_PRODUCE_CONSUME // Eat What You Kill! + /** + * Execute a pending producer, consume produced tasks and race pending producer to resume producing. + */ + EXECUTE_PRODUCE_CONSUME } private final AutoLock _lock = new AutoLock(); @@ -120,6 +138,10 @@ private enum Mode private State _state = State.IDLE; private boolean _pending; + /** + * @param producer The produce of tasks to be consumed. + * @param executor The executor to be used for executing producers or consumers, depending on the sub-strategy. + */ public AdaptiveExecutionStrategy(Producer producer, Executor executor) { _producer = producer; @@ -168,7 +190,7 @@ public void produce() } /** - * Try to become the producing thread. + * Try to become the producing thread and then produce and consume tasks * @param wasPending True if this thread was started as a pending producer */ private void tryProduce(boolean wasPending) @@ -208,13 +230,40 @@ private void tryProduce(boolean wasPending) // Determine this threads invocation type once outside of the production loop boolean nonBlocking = Invocable.isNonBlockingInvocation(); - while (isRunning()) { try { - // Produce a task and then continue producing only if the strategy used returns true - if (doProduce(nonBlocking)) + Runnable task = produceTask(); + + // If we did not produce a task + if (task == null) + { + // the we need take the lock to atomically determine if we should keep producing + try (AutoLock l = _lock.lock()) + { + switch (_state) + { + case PRODUCING: + // This thread was the only producer, so it is now Idle and we return from production + _state = State.IDLE; + return; + + case REPRODUCING: + // Another thread may have queued a task and tried to produce + // so this thread should continue to produce. + _state = State.PRODUCING; + continue; + + default: + throw new IllegalStateException(toStringLocked()); + } + } + } + + // Consume the task according the selected sub-strategy, then + // continue producing only if the sub-strategy used returns true + if (consumeTask(task, selectSubStrategy(task, nonBlocking))) continue; return; } @@ -226,49 +275,20 @@ private void tryProduce(boolean wasPending) } /** - * @param nonBlocking True if this thread has been invoked in non-blocking mode - * @return true if this thread should keep producing + * Select the execution strategy + * @param task The task to select the strategy for + * @param nonBlocking True if the producing thread cannot block + * @return The sub-strategy mode to use for the task */ - private boolean doProduce(boolean nonBlocking) + private SubStrategy selectSubStrategy(Runnable task, boolean nonBlocking) { - Runnable task = produceTask(); - - // If we did not produce a task - if (task == null) - { - // the we need take the lock to atomically determine if we should keep producing - try (AutoLock l = _lock.lock()) - { - switch (_state) - { - case PRODUCING: - // This thread was the only producer, so it is now Idle - _state = State.IDLE; - return false; - - case REPRODUCING: - // Another thread may have queued a task and tried to produce - // so this thread should try to produce again. - _state = State.PRODUCING; - return true; - - default: - throw new IllegalStateException(toStringLocked()); - } - } - } - - // Determine the execution mode by a function of the tasks invocation type, this threads invocation type - // and the availability of a pending producer thread: - Mode mode; Invocable.InvocationType taskType = Invocable.getInvocationType(task); switch (taskType) { case NON_BLOCKING: // The produced task will not block, so use PC: consume task directly // and then resume production. - mode = Mode.PRODUCE_CONSUME; - break; + return SubStrategy.PRODUCE_CONSUME; case EITHER: // If this producing thread may block then it can directly consume @@ -285,15 +305,13 @@ private boolean doProduce(boolean nonBlocking) // with the pending producer to resume production. _pending = true; _state = State.IDLE; - mode = Mode.EXECUTE_PRODUCE_CONSUME; - break; + return SubStrategy.EXECUTE_PRODUCE_CONSUME; } } } // otherwise use PIC: directly consume the task in non-blocking mode and then resume production. - mode = Mode.PRODUCE_INVOKE_CONSUME; - break; + return SubStrategy.PRODUCE_INVOKE_CONSUME; case BLOCKING: // If this producing thread may block then it can directly consume @@ -310,24 +328,30 @@ private boolean doProduce(boolean nonBlocking) // with the pending producer to resume production. _pending = true; _state = State.IDLE; - mode = Mode.EXECUTE_PRODUCE_CONSUME; - break; + return SubStrategy.EXECUTE_PRODUCE_CONSUME; } } } // Otherwise use PEC: the task is consumed by the executor and this thread continues to produce - mode = Mode.PRODUCE_EXECUTE_CONSUME; - break; + return SubStrategy.PRODUCE_EXECUTE_CONSUME; default: throw new IllegalStateException(); } + } + /** Consume a task + * @param task The task to consume + * @param subStrategy The execution sub-strategy mode to use to consume it + * @return True if the sub-strategy requires the caller to continue to produce tasks + */ + private boolean consumeTask(Runnable task, SubStrategy subStrategy) + { // Consume and/or execute task according to the selected mode. if (LOG.isDebugEnabled()) - LOG.debug("{} m={} t={}/{}", this, mode, task, Invocable.getInvocationType(task)); - switch (mode) + LOG.debug("{} m={} t={}/{}", this, subStrategy, task, Invocable.getInvocationType(task)); + switch (subStrategy) { case PRODUCE_CONSUME: _pcMode.increment(); @@ -336,7 +360,7 @@ private boolean doProduce(boolean nonBlocking) case PRODUCE_INVOKE_CONSUME: _picMode.increment(); - invokeTask(task); + invokeAsNonBlocking(task); return true; case PRODUCE_EXECUTE_CONSUME: @@ -368,6 +392,10 @@ private boolean doProduce(boolean nonBlocking) } } + /** + * Run a Runnable task, logging any thrown exception + * @param task The task to run. + */ private void runTask(Runnable task) { try @@ -380,7 +408,11 @@ private void runTask(Runnable task) } } - private void invokeTask(Runnable task) + /** + * Invoke a task in non-blocking mode + * @param task The task to invoke + */ + private void invokeAsNonBlocking(Runnable task) { try { @@ -392,6 +424,10 @@ private void invokeTask(Runnable task) } } + /** + * Produce a task, logging any Throwable that results + * @return A produced task or null if there were no tasks or a Throwable was thrown. + */ private Runnable produceTask() { try @@ -405,6 +441,11 @@ private Runnable produceTask() } } + /** + * Execute a task via the {@link Executor} used to construct this strategy. + * If the execution is rejected and the task is a Closeable, then it is closed. + * @param task The task to execute + */ private void execute(Runnable task) { try From 8dd8f1bdb2d807316768aaa852d7aadbb929a29d Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 10 Jun 2021 14:49:07 +1000 Subject: [PATCH 18/21] Rename EWYK The AdaptiveExecutionStrategy updates from review Signed-off-by: Greg Wilkins --- .../strategy/AdaptiveExecutionStrategy.java | 82 +++++++++---------- 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java index 5d2aeb3fa618..3ec59910db7c 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java @@ -96,28 +96,28 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe private static final Logger LOG = LoggerFactory.getLogger(AdaptiveExecutionStrategy.class); /** - * The state of this strategy + * The production state of the strategy. */ private enum State { - IDLE, // No tasks or producers - PRODUCING, // There is an active producing thread - REPRODUCING // There is an active producing thread and demand for more production + IDLE, // No tasks or producers. + PRODUCING, // There is an active producing thread. + REPRODUCING // There is an active producing thread and demand for more production. } - /* The sub-strategies used by this strategy to consume tasks that are produced */ + /* The sub-strategies used by this strategy to consume tasks that are produced. */ private enum SubStrategy { /** - * Consume produced tasks and resume producing + * Consume produced tasks and resume producing. */ PRODUCE_CONSUME, /** - * Invoke produced tasks as non blocking and resume producing + * Invoke produced tasks as non blocking and resume producing. */ PRODUCE_INVOKE_CONSUME, /** - * Execute produced tasks and continue producing + * Execute produced tasks and continue producing. */ PRODUCE_EXECUTE_CONSUME, /** @@ -190,25 +190,25 @@ public void produce() } /** - * Try to become the producing thread and then produce and consume tasks - * @param wasPending True if this thread was started as a pending producer + * Try to become the producing thread and then produce and consume tasks. + * @param wasPending True if this thread was started as a pending producer. */ private void tryProduce(boolean wasPending) { if (LOG.isDebugEnabled()) LOG.debug("{} tryProduce {}", this, wasPending); - // Take the lock to atomically check if this thread can produce + // Take the lock to atomically check if this thread can produce. try (AutoLock l = _lock.lock()) { - // If this thread was the pending producer, there is no longer one pending + // If this thread was the pending producer, there is no longer one pending. if (wasPending) _pending = false; switch (_state) { case IDLE: - // The strategy was IDLE, so this thread can become the producer + // The strategy was IDLE, so this thread can become the producer. _state = State.PRODUCING; break; @@ -220,15 +220,15 @@ private void tryProduce(boolean wasPending) return; case REPRODUCING: - // Another thread is already producing and will already try another #doProduce + // Another thread is already producing and will already try another #doProduce. return; default: - throw new IllegalStateException(); + throw new IllegalStateException(toStringLocked()); } } - // Determine this threads invocation type once outside of the production loop + // Determine this threads invocation type once outside of the production loop. boolean nonBlocking = Invocable.isNonBlockingInvocation(); while (isRunning()) { @@ -239,13 +239,13 @@ private void tryProduce(boolean wasPending) // If we did not produce a task if (task == null) { - // the we need take the lock to atomically determine if we should keep producing + // take the lock to atomically determine if we should keep producing. try (AutoLock l = _lock.lock()) { switch (_state) { case PRODUCING: - // This thread was the only producer, so it is now Idle and we return from production + // This thread was the only producer, so it is now Idle and we return from production. _state = State.IDLE; return; @@ -262,7 +262,7 @@ private void tryProduce(boolean wasPending) } // Consume the task according the selected sub-strategy, then - // continue producing only if the sub-strategy used returns true + // continue producing only if the sub-strategy used returns true. if (consumeTask(task, selectSubStrategy(task, nonBlocking))) continue; return; @@ -275,10 +275,10 @@ private void tryProduce(boolean wasPending) } /** - * Select the execution strategy - * @param task The task to select the strategy for - * @param nonBlocking True if the producing thread cannot block - * @return The sub-strategy mode to use for the task + * Select the execution strategy. + * @param task The task to select the strategy for. + * @param nonBlocking True if the producing thread cannot block. + * @return The sub-strategy mode to use for the task. */ private SubStrategy selectSubStrategy(Runnable task, boolean nonBlocking) { @@ -295,7 +295,7 @@ private SubStrategy selectSubStrategy(Runnable task, boolean nonBlocking) // the task in blocking mode if a pending producer is available. if (!nonBlocking) { - // We need to take the lock so we can atomically check if a pending producer is available. + // Take the lock so we can atomically check if a pending producer is available. try (AutoLock l = _lock.lock()) { // If a pending producer is available or one can be started @@ -318,7 +318,7 @@ private SubStrategy selectSubStrategy(Runnable task, boolean nonBlocking) // the blocking task if a pending producer is available. if (!nonBlocking) { - // We need to take the lock so we can atomically check if a pending producer is available. + // Take the lock so we can atomically check if a pending producer is available. try (AutoLock l = _lock.lock()) { // If a pending producer is available or one can be started @@ -333,18 +333,18 @@ private SubStrategy selectSubStrategy(Runnable task, boolean nonBlocking) } } - // Otherwise use PEC: the task is consumed by the executor and this thread continues to produce + // Otherwise use PEC: the task is consumed by the executor and this thread continues to produce. return SubStrategy.PRODUCE_EXECUTE_CONSUME; default: - throw new IllegalStateException(); + throw new IllegalStateException(String.format("taskType=%s %s", taskType, this)); } } - /** Consume a task - * @param task The task to consume - * @param subStrategy The execution sub-strategy mode to use to consume it - * @return True if the sub-strategy requires the caller to continue to produce tasks + /** Consume a task with a sub-strategy. + * @param task The task to consume. + * @param subStrategy The execution sub-strategy mode to use to consume it. + * @return True if the sub-strategy requires the caller to continue to produce tasks. */ private boolean consumeTask(Runnable task, SubStrategy subStrategy) { @@ -372,28 +372,28 @@ private boolean consumeTask(Runnable task, SubStrategy subStrategy) _epcMode.increment(); runTask(task); - // Race the pending producer to produce again + // Race the pending producer to produce again. try (AutoLock l = _lock.lock()) { if (_state == State.IDLE) { - // We beat the pending producer, so we will become the producer instead + // We beat the pending producer, so we will become the producer instead. // The pending produce will become a noop if it arrives whilst we are producing, - // or it may take over if we subsequently do another EPC consumption + // or it may take over if we subsequently do another EPC consumption. _state = State.PRODUCING; return true; } } - // The pending producer is now producing, so this thread no longer produces + // The pending producer is now producing, so this thread no longer produces. return false; default: - throw new IllegalStateException(toString()); + throw new IllegalStateException(String.format("ss=%s %s", subStrategy, this)); } } /** - * Run a Runnable task, logging any thrown exception + * Run a Runnable task, logging any thrown exception. * @param task The task to run. */ private void runTask(Runnable task) @@ -409,8 +409,8 @@ private void runTask(Runnable task) } /** - * Invoke a task in non-blocking mode - * @param task The task to invoke + * Invoke a task in non-blocking mode. + * @param task The task to invoke. */ private void invokeAsNonBlocking(Runnable task) { @@ -425,7 +425,7 @@ private void invokeAsNonBlocking(Runnable task) } /** - * Produce a task, logging any Throwable that results + * Produce a task, logging any Throwable that results. * @return A produced task or null if there were no tasks or a Throwable was thrown. */ private Runnable produceTask() @@ -444,7 +444,7 @@ private Runnable produceTask() /** * Execute a task via the {@link Executor} used to construct this strategy. * If the execution is rejected and the task is a Closeable, then it is closed. - * @param task The task to execute + * @param task The task to execute. */ private void execute(Runnable task) { From 8acfd449d8cdf7431985c50e4a5828d52b177535 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Tue, 15 Jun 2021 09:17:23 +1000 Subject: [PATCH 19/21] Rename EWYK The AdaptiveExecutionStrategy niggles for javadoc Signed-off-by: Greg Wilkins --- .../thread/strategy/AdaptiveExecutionStrategy.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java index 3ec59910db7c..37cf51f71a2a 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java @@ -213,7 +213,7 @@ private void tryProduce(boolean wasPending) break; case PRODUCING: - // The strategy is already producing, so another thread must be the producer + // The strategy is already producing, so another thread must be the producer. // However, it may be just about to stop being the producer so we set the // REPRODUCING state to force it to call #doProduce at least once more. _state = State.REPRODUCING; @@ -278,7 +278,7 @@ private void tryProduce(boolean wasPending) * Select the execution strategy. * @param task The task to select the strategy for. * @param nonBlocking True if the producing thread cannot block. - * @return The sub-strategy mode to use for the task. + * @return The sub-strategy to use for the task. */ private SubStrategy selectSubStrategy(Runnable task, boolean nonBlocking) { @@ -341,7 +341,8 @@ private SubStrategy selectSubStrategy(Runnable task, boolean nonBlocking) } } - /** Consume a task with a sub-strategy. + /** + * Consume a task with a sub-strategy. * @param task The task to consume. * @param subStrategy The execution sub-strategy mode to use to consume it. * @return True if the sub-strategy requires the caller to continue to produce tasks. @@ -350,7 +351,7 @@ private boolean consumeTask(Runnable task, SubStrategy subStrategy) { // Consume and/or execute task according to the selected mode. if (LOG.isDebugEnabled()) - LOG.debug("{} m={} t={}/{}", this, subStrategy, task, Invocable.getInvocationType(task)); + LOG.debug("{} ss={} t={}/{} {}", this, subStrategy, task, Invocable.getInvocationType(task)); switch (subStrategy) { case PRODUCE_CONSUME: @@ -393,7 +394,7 @@ private boolean consumeTask(Runnable task, SubStrategy subStrategy) } /** - * Run a Runnable task, logging any thrown exception. + * Runs a Runnable task, logging any thrown exception. * @param task The task to run. */ private void runTask(Runnable task) From d2dee7bf7402aa275b7e0713620dfc4d68f2584a Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Tue, 15 Jun 2021 22:36:29 +1000 Subject: [PATCH 20/21] Rename EWYK The AdaptiveExecutionStrategy more niggles for javadoc Signed-off-by: Greg Wilkins --- .../strategy/AdaptiveExecutionStrategy.java | 51 ++++++++++--------- 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java index 37cf51f71a2a..e4fdc25bfadb 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java @@ -105,23 +105,23 @@ private enum State REPRODUCING // There is an active producing thread and demand for more production. } - /* The sub-strategies used by this strategy to consume tasks that are produced. */ + /* The sub-strategies used by the strategy to consume tasks that are produced. */ private enum SubStrategy { /** - * Consume produced tasks and resume producing. + * Consumes produced tasks and resume producing. */ PRODUCE_CONSUME, /** - * Invoke produced tasks as non blocking and resume producing. + * Invokes produced tasks as non blocking and resume producing. */ PRODUCE_INVOKE_CONSUME, /** - * Execute produced tasks and continue producing. + * Executes produced tasks and continue producing. */ PRODUCE_EXECUTE_CONSUME, /** - * Execute a pending producer, consume produced tasks and race pending producer to resume producing. + * Executes a pending producer, consumes produced tasks and races pending producer to resume producing. */ EXECUTE_PRODUCE_CONSUME } @@ -190,18 +190,18 @@ public void produce() } /** - * Try to become the producing thread and then produce and consume tasks. - * @param wasPending True if this thread was started as a pending producer. + * Tries to become the producing thread and then produces and consumes tasks. + * @param wasPending True if the calling thread was started as a pending producer. */ private void tryProduce(boolean wasPending) { if (LOG.isDebugEnabled()) LOG.debug("{} tryProduce {}", this, wasPending); - // Take the lock to atomically check if this thread can produce. + // Takes the lock to atomically check if the thread can produce. try (AutoLock l = _lock.lock()) { - // If this thread was the pending producer, there is no longer one pending. + // If the calling thread was the pending producer, there is no longer one pending. if (wasPending) _pending = false; @@ -228,7 +228,7 @@ private void tryProduce(boolean wasPending) } } - // Determine this threads invocation type once outside of the production loop. + // Determine the threads invocation type once outside of the production loop. boolean nonBlocking = Invocable.isNonBlockingInvocation(); while (isRunning()) { @@ -245,13 +245,13 @@ private void tryProduce(boolean wasPending) switch (_state) { case PRODUCING: - // This thread was the only producer, so it is now Idle and we return from production. + // The calling thread was the only producer, so it is now Idle and we return from production. _state = State.IDLE; return; case REPRODUCING: // Another thread may have queued a task and tried to produce - // so this thread should continue to produce. + // so the calling thread should continue to produce. _state = State.PRODUCING; continue; @@ -291,18 +291,19 @@ private SubStrategy selectSubStrategy(Runnable task, boolean nonBlocking) return SubStrategy.PRODUCE_CONSUME; case EITHER: - // If this producing thread may block then it can directly consume - // the task in blocking mode if a pending producer is available. + // The produced task may be run either as blocking or non blocking. + + // If the calling producing thread may also block if (!nonBlocking) { - // Take the lock so we can atomically check if a pending producer is available. + // Take the lock to atomically check if a pending producer is available. try (AutoLock l = _lock.lock()) { // If a pending producer is available or one can be started if (_pending || _tryExecutor.tryExecute(_runPendingProducer)) { - // use EPC: directly consume the task and then race - // with the pending producer to resume production. + // use EPC: The producer directly consumes the task, which may block + // and then races with the pending producer to resume production. _pending = true; _state = State.IDLE; return SubStrategy.EXECUTE_PRODUCE_CONSUME; @@ -310,22 +311,24 @@ private SubStrategy selectSubStrategy(Runnable task, boolean nonBlocking) } } - // otherwise use PIC: directly consume the task in non-blocking mode and then resume production. + // otherwise use PIC: The producer consumers the task in non-blocking mode + // and then resumes production. return SubStrategy.PRODUCE_INVOKE_CONSUME; case BLOCKING: - // If this producing thread may block then it can directly consume - // the blocking task if a pending producer is available. + // The produced task may block. + + // If the calling producing thread may also block if (!nonBlocking) { - // Take the lock so we can atomically check if a pending producer is available. + // Take the lock to atomically check if a pending producer is available. try (AutoLock l = _lock.lock()) { // If a pending producer is available or one can be started if (_pending || _tryExecutor.tryExecute(_runPendingProducer)) { - // use EPC: directly consume the task and then race - // with the pending producer to resume production. + // use EPC: The producer directly consumes the task, which may block + // and then races with the pending producer to resume production. _pending = true; _state = State.IDLE; return SubStrategy.EXECUTE_PRODUCE_CONSUME; @@ -333,7 +336,7 @@ private SubStrategy selectSubStrategy(Runnable task, boolean nonBlocking) } } - // Otherwise use PEC: the task is consumed by the executor and this thread continues to produce. + // Otherwise use PEC: the task is consumed by the executor and the producer continues to produce. return SubStrategy.PRODUCE_EXECUTE_CONSUME; default: From f79ec975b86ae702bc7819135b6be9ffc5a8bc9c Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 15 Jun 2021 16:53:54 +0200 Subject: [PATCH 21/21] Fixes #6391 - Rename EatWhatYouKill to AdaptiveExecutionStrategy. Fixed javadoc 3rd person forms. Removed stale references to doProduce() method. Using IO.close(Closeable). Other small cosmetic changes. Signed-off-by: Simone Bordet --- .../strategy/AdaptiveExecutionStrategy.java | 79 +++++++++---------- 1 file changed, 39 insertions(+), 40 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java index e4fdc25bfadb..72ee3a283423 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java @@ -20,6 +20,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.LongAdder; +import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedOperation; @@ -40,31 +41,30 @@ *

This strategy selects between the following sub-strategies:

*
*
ProduceConsume(PC)
- *
The producing thread consumes the task by executing it directly and then - * continues to produce.
+ *
The producing thread consumes the task by running it directly + * and then continues to produce.
*
ProduceInvokeConsume(PIC)
- *
The producing thread consumes the task by invoking it with {@link Invocable#invokeNonBlocking(Runnable)} + *
The producing thread consumes the task by running it with {@link Invocable#invokeNonBlocking(Runnable)} * and then continues to produce.
*
ProduceExecuteConsume(PEC)
- *
The producing thread dispatches the task to a thread pool to be executed and then immediately resumes - * producing.
+ *
The producing thread dispatches the task to a thread pool to be executed + * and then continues to produce.
*
ExecuteProduceConsume(EPC)
- *
The producing thread consumes the task by executing it directly (as in PC mode) but then races with - * a pending producer thread to take over production. + *
The producing thread consumes dispatches a pending producer to a thread pool, + * then consumes the task by running it directly (as in PC mode), then races with + * the pending producer thread to take over production. *
*
*

The sub-strategy is selected as follows:

*
*
PC
- *
If the produced task has been invoked with {@link Invocable#invokeNonBlocking(Runnable)} - * to indicate that it is {@link Invocable.InvocationType#NON_BLOCKING}.
+ *
If the produced task is {@link Invocable.InvocationType#NON_BLOCKING}.
*
EPC
*
If the producing thread is not {@link Invocable.InvocationType#NON_BLOCKING} * and a pending producer thread is available, either because there is already a pending producer * or one is successfully started with {@link TryExecutor#tryExecute(Runnable)}.
*
PIC
- *
If the produced task has used the {@link Invocable#getInvocationType()} API to - * indicate that it is {@link Invocable.InvocationType#EITHER}.
+ *
If the produced task is {@link Invocable.InvocationType#EITHER} and EPC was not selected.
*
PEC
*
Otherwise.
*
@@ -88,7 +88,6 @@ *

This strategy was previously named EatWhatYouKill (EWYK) because its preference for a * producer to directly consume the tasks that it produces is similar to a hunting proverb * that says that a hunter should eat (i.e. consume) what they kill (i.e. produced).

- * */ @ManagedObject("Adaptive execution strategy") public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements ExecutionStrategy @@ -105,23 +104,25 @@ private enum State REPRODUCING // There is an active producing thread and demand for more production. } - /* The sub-strategies used by the strategy to consume tasks that are produced. */ + /** + * The sub-strategies used by the strategy to consume tasks that are produced. + */ private enum SubStrategy { /** - * Consumes produced tasks and resume producing. + * Consumes produced tasks and continues producing. */ PRODUCE_CONSUME, /** - * Invokes produced tasks as non blocking and resume producing. + * Consumes produced tasks as non blocking and continues producing. */ PRODUCE_INVOKE_CONSUME, /** - * Executes produced tasks and continue producing. + * Executes produced tasks and continues producing. */ PRODUCE_EXECUTE_CONSUME, /** - * Executes a pending producer, consumes produced tasks and races pending producer to resume producing. + * Executes a pending producer, consumes produced tasks and races the pending producer to continue producing. */ EXECUTE_PRODUCE_CONSUME } @@ -191,6 +192,7 @@ public void produce() /** * Tries to become the producing thread and then produces and consumes tasks. + * * @param wasPending True if the calling thread was started as a pending producer. */ private void tryProduce(boolean wasPending) @@ -215,12 +217,12 @@ private void tryProduce(boolean wasPending) case PRODUCING: // The strategy is already producing, so another thread must be the producer. // However, it may be just about to stop being the producer so we set the - // REPRODUCING state to force it to call #doProduce at least once more. + // REPRODUCING state to force it to produce at least once more. _state = State.REPRODUCING; return; case REPRODUCING: - // Another thread is already producing and will already try another #doProduce. + // Another thread is already producing and will already try again to produce. return; default: @@ -228,7 +230,7 @@ private void tryProduce(boolean wasPending) } } - // Determine the threads invocation type once outside of the production loop. + // Determine the thread's invocation type once, outside of the production loop. boolean nonBlocking = Invocable.isNonBlockingInvocation(); while (isRunning()) { @@ -245,7 +247,7 @@ private void tryProduce(boolean wasPending) switch (_state) { case PRODUCING: - // The calling thread was the only producer, so it is now Idle and we return from production. + // The calling thread was the only producer, so it is now IDLE and we stop producing. _state = State.IDLE; return; @@ -262,7 +264,7 @@ private void tryProduce(boolean wasPending) } // Consume the task according the selected sub-strategy, then - // continue producing only if the sub-strategy used returns true. + // continue producing only if the sub-strategy returns true. if (consumeTask(task, selectSubStrategy(task, nonBlocking))) continue; return; @@ -275,7 +277,8 @@ private void tryProduce(boolean wasPending) } /** - * Select the execution strategy. + * Selects the execution strategy. + * * @param task The task to select the strategy for. * @param nonBlocking True if the producing thread cannot block. * @return The sub-strategy to use for the task. @@ -345,16 +348,17 @@ private SubStrategy selectSubStrategy(Runnable task, boolean nonBlocking) } /** - * Consume a task with a sub-strategy. + * Consumes a task with a sub-strategy. + * * @param task The task to consume. - * @param subStrategy The execution sub-strategy mode to use to consume it. + * @param subStrategy The execution sub-strategy to use to consume the task. * @return True if the sub-strategy requires the caller to continue to produce tasks. */ private boolean consumeTask(Runnable task, SubStrategy subStrategy) { // Consume and/or execute task according to the selected mode. if (LOG.isDebugEnabled()) - LOG.debug("{} ss={} t={}/{} {}", this, subStrategy, task, Invocable.getInvocationType(task)); + LOG.debug("ss={} t={}/{} {}", subStrategy, task, Invocable.getInvocationType(task), this); switch (subStrategy) { case PRODUCE_CONSUME: @@ -398,6 +402,7 @@ private boolean consumeTask(Runnable task, SubStrategy subStrategy) /** * Runs a Runnable task, logging any thrown exception. + * * @param task The task to run. */ private void runTask(Runnable task) @@ -413,8 +418,9 @@ private void runTask(Runnable task) } /** - * Invoke a task in non-blocking mode. - * @param task The task to invoke. + * Runs a task in non-blocking mode. + * + * @param task The task to run in non-blocking mode. */ private void invokeAsNonBlocking(Runnable task) { @@ -429,7 +435,8 @@ private void invokeAsNonBlocking(Runnable task) } /** - * Produce a task, logging any Throwable that results. + * Produces a task, logging any Throwable that may result. + * * @return A produced task or null if there were no tasks or a Throwable was thrown. */ private Runnable produceTask() @@ -446,8 +453,9 @@ private Runnable produceTask() } /** - * Execute a task via the {@link Executor} used to construct this strategy. + * Executes a task via the {@link Executor} used to construct this strategy. * If the execution is rejected and the task is a Closeable, then it is closed. + * * @param task The task to execute. */ private void execute(Runnable task) @@ -464,16 +472,7 @@ private void execute(Runnable task) LOG.trace("IGNORED", e); if (task instanceof Closeable) - { - try - { - ((Closeable)task).close(); - } - catch (Throwable e2) - { - LOG.trace("IGNORED", e2); - } - } + IO.close((Closeable)task); } }