Skip to content

Commit

Permalink
[grid] Purge timed out requests sitting the queue at regular intervals (
Browse files Browse the repository at this point in the history
#9283)

* [grid] Purge timed out requests sitting the queue at regular intervals

* [grid] Add error message as a constant

Co-authored-by: Diego Molina <[email protected]>
  • Loading branch information
pujagani and diemol authored Mar 18, 2021
1 parent f1cb54f commit 8c4050f
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ java_library(
deps = [
"//java/client/src/org/openqa/selenium/remote",
"//java/server/src/org/openqa/selenium/events",
"//java/server/src/org/openqa/selenium/concurrent",
"//java/server/src/org/openqa/selenium/grid/config",
"//java/server/src/org/openqa/selenium/grid/data",
"//java/server/src/org/openqa/selenium/grid/jmx",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.openqa.selenium.grid.sessionqueue.local;

import org.openqa.selenium.concurrent.Regularly;
import org.openqa.selenium.events.EventBus;
import org.openqa.selenium.grid.config.Config;
import org.openqa.selenium.grid.data.NewSessionErrorResponse;
Expand Down Expand Up @@ -73,12 +74,20 @@ public class LocalNewSessionQueue extends NewSessionQueue {
private final ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
private final Thread shutdownHook = new Thread(this::callExecutorShutdown);
private final String timedOutErrorMessage =
String.format( "New session request rejected after being in the queue for more than %s",
requestTimeout);

public LocalNewSessionQueue(Tracer tracer, EventBus bus, Duration retryInterval,
Duration requestTimeout) {
super(tracer, retryInterval, requestTimeout);
this.bus = Require.nonNull("Event bus", bus);
Runtime.getRuntime().addShutdownHook(shutdownHook);

Regularly regularly = new Regularly("New Session Queue Clean up");
regularly.submit(this::purgeTimedOutRequests, Duration.ofSeconds(60),
Duration.ofSeconds(30));

new JMXHelper().register(this);
}

Expand Down Expand Up @@ -194,8 +203,7 @@ private void retryRequest(SessionRequest sessionRequest) {
LOG.log(Level.INFO, "Request {0} timed out", requestId);
sessionRequests.remove(sessionRequest);
bus.fire(new NewSessionRejectedEvent(
new NewSessionErrorResponse(requestId, String.format(
"New session request rejected after being in the queue for more than %s", requestTimeout))));
new NewSessionErrorResponse(requestId, timedOutErrorMessage)));
} else {
LOG.log(Level.INFO,
"Adding request back to the queue. All slots are busy. Request: {0}",
Expand Down Expand Up @@ -239,8 +247,7 @@ public Optional<HttpRequest> remove(RequestId id) {
HttpRequest request = httpRequest.get();
if (hasRequestTimedOut(request)) {
bus.fire(new NewSessionRejectedEvent(
new NewSessionErrorResponse(id, String.format(
"New session request rejected after being in the queue for more than %s", requestTimeout))));
new NewSessionErrorResponse(id, timedOutErrorMessage)));
return Optional.empty();
}
}
Expand Down Expand Up @@ -272,6 +279,24 @@ public int clear() {
}
}

private void purgeTimedOutRequests() {
Lock writeLock = lock.writeLock();
writeLock.lock();
try {
Iterator<SessionRequest> iterator = sessionRequests.iterator();
while (iterator.hasNext()) {
SessionRequest sessionRequest = iterator.next();
if (hasRequestTimedOut(sessionRequest.getHttpRequest())) {
iterator.remove();
bus.fire(new NewSessionRejectedEvent(
new NewSessionErrorResponse(sessionRequest.getRequestId(), timedOutErrorMessage)));
}
}
} finally {
writeLock.unlock();
}
}

public void callExecutorShutdown() {
LOG.info("Shutting down session queue executor service");
executorService.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.openqa.selenium.ImmutableCapabilities;
import org.openqa.selenium.events.EventBus;
import org.openqa.selenium.events.local.GuavaEventBus;
import org.openqa.selenium.grid.data.NewSessionErrorResponse;
import org.openqa.selenium.grid.data.NewSessionRejectedEvent;
import org.openqa.selenium.grid.data.NewSessionRequestEvent;
import org.openqa.selenium.grid.data.RequestId;
import org.openqa.selenium.grid.sessionqueue.NewSessionQueue;
Expand Down Expand Up @@ -259,6 +261,27 @@ public void shouldBeAbleToGetQueueContents() {
assertEquals(firefoxCaps, response.get(1));
}

@Test
public void shouldBeAbleToRemoveRequestsOnTimeout() throws InterruptedException {
NewSessionQueue localSessionQueue = new LocalNewSessionQueue(
DefaultTestTracer.createTracer(),
bus,
Duration.ofSeconds(30),
Duration.ofSeconds(1));

CountDownLatch latch = new CountDownLatch(1);

bus.addListener(NewSessionRejectedEvent.listener(reqId -> latch.countDown()));

boolean added = localSessionQueue.offerLast(expectedSessionRequest, requestId);
assertTrue(added);

boolean requestExpired = latch.await(2, TimeUnit.MINUTES);

assertThat(requestExpired).isTrue();
assertThat(localSessionQueue.getQueueSize()).isZero();
}

private HttpRequest createRequest(NewSessionPayload payload, HttpMethod httpMethod, String uri) {
StringBuilder builder = new StringBuilder();
try {
Expand Down

0 comments on commit 8c4050f

Please sign in to comment.