Skip to content

Commit

Permalink
HDDS-11304. Make up for the missing functionality in CommandDispatcher (
Browse files Browse the repository at this point in the history
  • Loading branch information
jianghuazhu authored Aug 29, 2024
1 parent 2d372f6 commit cc4e026
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -58,11 +57,11 @@ public class CloseContainerCommandHandler implements CommandHandler {

private final AtomicLong invocationCount = new AtomicLong(0);
private final AtomicInteger queuedCount = new AtomicInteger(0);
private final ExecutorService executor;
private final ThreadPoolExecutor executor;
private long totalTime;

/**
* Constructs a ContainerReport handler.
* Constructs a close container command handler.
*/
public CloseContainerCommandHandler(
int threadPoolSize, int queueSize, String threadNamePrefix) {
Expand Down Expand Up @@ -220,4 +219,14 @@ public long getTotalRunTime() {
public int getQueuedCount() {
return queuedCount.get();
}

@Override
public int getThreadPoolMaxPoolSize() {
return executor.getMaximumPoolSize();
}

@Override
public int getThreadPoolActivePoolSize() {
return executor.getActiveCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ public final class CommandDispatcher {
private CommandDispatcher(OzoneContainer container, SCMConnectionManager
connectionManager, StateContext context,
CommandHandler... handlers) {
Preconditions.checkNotNull(context);
Preconditions.checkNotNull(handlers);
Preconditions.checkArgument(handlers.length > 0);
Preconditions.checkNotNull(container);
Preconditions.checkNotNull(connectionManager);
this.context = context;
this.container = container;
this.connectionManager = connectionManager;
Expand All @@ -77,6 +72,7 @@ private CommandDispatcher(OzoneContainer container, SCMConnectionManager
commandHandlerMetrics = CommandHandlerMetrics.create(handlerMap);
}

@VisibleForTesting
public CommandHandler getCloseContainerHandler() {
return handlerMap.get(Type.closeContainerCommand);
}
Expand Down Expand Up @@ -201,11 +197,12 @@ public Builder setContext(StateContext stateContext) {
* @return Command Dispatcher.
*/
public CommandDispatcher build() {
Preconditions.checkNotNull(this.connectionManager, "Missing connection" +
" manager.");
Preconditions.checkNotNull(this.container, "Missing container.");
Preconditions.checkNotNull(this.context, "Missing context.");
Preconditions.checkArgument(this.handlerList.size() > 0);
Preconditions.checkNotNull(this.connectionManager,
"Missing scm connection manager.");
Preconditions.checkNotNull(this.container, "Missing ozone container.");
Preconditions.checkNotNull(this.context, "Missing state context.");
Preconditions.checkArgument(this.handlerList.size() > 0,
"The number of command handlers must be greater than 0.");
return new CommandDispatcher(this.container, this.connectionManager,
this.context, handlerList.toArray(
new CommandHandler[handlerList.size()]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,12 @@ public int getQueuedCount() {

@Override
public int getThreadPoolMaxPoolSize() {
return ((ThreadPoolExecutor)executor).getMaximumPoolSize();
return executor.getMaximumPoolSize();
}

@Override
public int getThreadPoolActivePoolSize() {
return ((ThreadPoolExecutor)executor).getActiveCount();
return executor.getActiveCount();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.io.IOException;
import java.time.Clock;
import java.util.OptionalLong;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -53,7 +52,7 @@ public class DeleteContainerCommandHandler implements CommandHandler {
private final AtomicInteger invocationCount = new AtomicInteger(0);
private final AtomicInteger timeoutCount = new AtomicInteger(0);
private final AtomicLong totalTime = new AtomicLong(0);
private final ExecutorService executor;
private final ThreadPoolExecutor executor;
private final Clock clock;
private int maxQueueSize;

Expand All @@ -70,7 +69,7 @@ public DeleteContainerCommandHandler(
}

protected DeleteContainerCommandHandler(Clock clock,
ExecutorService executor, int queueSize) {
ThreadPoolExecutor executor, int queueSize) {
this.executor = executor;
this.clock = clock;
maxQueueSize = queueSize;
Expand Down Expand Up @@ -131,7 +130,7 @@ private void handleInternal(SCMCommand command, StateContext context,

@Override
public int getQueuedCount() {
return ((ThreadPoolExecutor)executor).getQueue().size();
return executor.getQueue().size();
}

@Override
Expand Down Expand Up @@ -160,6 +159,16 @@ public long getTotalRunTime() {
return totalTime.get();
}

@Override
public int getThreadPoolMaxPoolSize() {
return executor.getMaximumPoolSize();
}

@Override
public int getThreadPoolActivePoolSize() {
return executor.getActiveCount();
}

@Override
public void stop() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.UUID;
Expand All @@ -43,6 +44,8 @@
import static org.apache.hadoop.ozone.OzoneConsts.GB;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -292,4 +295,28 @@ private void waitTillFinishExecution(
GenericTestUtils.waitFor(()
-> closeHandler.getQueuedCount() <= 0, 10, 3000);
}

@Test
public void testThreadPoolPoolSize() {
assertEquals(1, subject.getThreadPoolMaxPoolSize());
assertEquals(0, subject.getThreadPoolActivePoolSize());

CloseContainerCommandHandler closeContainerCommandHandler =
new CloseContainerCommandHandler(10, 10, "");
closeContainerCommandHandler.handle(new CloseContainerCommand(
CONTAINER_ID + 1, PipelineID.randomId()),
ozoneContainer, context, null);
closeContainerCommandHandler.handle(new CloseContainerCommand(
CONTAINER_ID + 2, PipelineID.randomId()),
ozoneContainer, context, null);
closeContainerCommandHandler.handle(new CloseContainerCommand(
CONTAINER_ID + 3, PipelineID.randomId()),
ozoneContainer, context, null);
closeContainerCommandHandler.handle(new CloseContainerCommand(
CONTAINER_ID + 4, PipelineID.randomId()),
ozoneContainer, context, null);
assertEquals(10, closeContainerCommandHandler.getThreadPoolMaxPoolSize());
assertTrue(closeContainerCommandHandler.getThreadPoolActivePoolSize() > 0);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
Expand All @@ -32,7 +40,6 @@
import java.time.ZoneId;
import java.util.OptionalLong;

import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -63,8 +70,14 @@ public void setup() {
}

@Test
public void testExpiredCommandsAreNotProcessed() throws IOException {
DeleteContainerCommandHandler handler = createSubject(clock, 1000);
public void testExpiredCommandsAreNotProcessed()
throws IOException, InterruptedException {
CountDownLatch latch1 = new CountDownLatch(1);
ThreadFactory threadFactory = new ThreadFactoryBuilder().build();
ThreadPoolWithLockExecutor executor = new ThreadPoolWithLockExecutor(
threadFactory, latch1);
DeleteContainerCommandHandler handler = new DeleteContainerCommandHandler(
clock, executor, 100);

DeleteContainerCommand command1 = new DeleteContainerCommand(1L);
command1.setDeadline(clock.millis() + 10000);
Expand All @@ -75,9 +88,14 @@ public void testExpiredCommandsAreNotProcessed() throws IOException {

clock.fastForward(15000);
handler.handle(command1, ozoneContainer, null, null);
latch1.await();
assertEquals(1, handler.getTimeoutCount());
CountDownLatch latch2 = new CountDownLatch(2);
executor.setLatch(latch2);
handler.handle(command2, ozoneContainer, null, null);
handler.handle(command3, ozoneContainer, null, null);
latch2.await();

assertEquals(1, handler.getTimeoutCount());
assertEquals(3, handler.getInvocationCount());
verify(controller, times(0))
Expand All @@ -89,18 +107,26 @@ public void testExpiredCommandsAreNotProcessed() throws IOException {
}

@Test
public void testCommandForCurrentTermIsExecuted() throws IOException {
public void testCommandForCurrentTermIsExecuted()
throws IOException, InterruptedException {
// GIVEN
DeleteContainerCommand command = new DeleteContainerCommand(1L);
command.setTerm(1);

when(context.getTermOfLeaderSCM())
.thenReturn(OptionalLong.of(command.getTerm()));

DeleteContainerCommandHandler subject = createSubject();
TestClock testClock = new TestClock(Instant.now(), ZoneId.systemDefault());
CountDownLatch latch = new CountDownLatch(1);
ThreadFactory threadFactory = new ThreadFactoryBuilder().build();
ThreadPoolWithLockExecutor executor = new ThreadPoolWithLockExecutor(
threadFactory, latch);
DeleteContainerCommandHandler subject = new DeleteContainerCommandHandler(
testClock, executor, 100);

// WHEN
subject.handle(command, ozoneContainer, context, null);
latch.await();

// THEN
verify(controller, times(1))
Expand Down Expand Up @@ -163,13 +189,32 @@ private static DeleteContainerCommandHandler createSubject() {

private static DeleteContainerCommandHandler createSubject(
TestClock clock, int queueSize) {
return new DeleteContainerCommandHandler(clock,
newDirectExecutorService(), queueSize);
ThreadFactory threadFactory = new ThreadFactoryBuilder().build();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.
newFixedThreadPool(1, threadFactory);
return new DeleteContainerCommandHandler(clock, executor, queueSize);
}

private static DeleteContainerCommandHandler createSubjectWithPoolSize(
TestClock clock, int queueSize) {
return new DeleteContainerCommandHandler(1, clock, queueSize, "");
}

static class ThreadPoolWithLockExecutor extends ThreadPoolExecutor {
private CountDownLatch countDownLatch;
ThreadPoolWithLockExecutor(ThreadFactory threadFactory, CountDownLatch latch) {
super(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory);
this.countDownLatch = latch;
}

void setLatch(CountDownLatch latch) {
this.countDownLatch = latch;
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
countDownLatch.countDown();
}
}
}

0 comments on commit cc4e026

Please sign in to comment.