Skip to content

Commit

Permalink
[grid] Fix flaky event bus tests by dedicated threading, reverting th…
Browse files Browse the repository at this point in the history
…e polling loop logic and increasing poll timeout (#9383)

* [grid] Fix flaky event bus tests by dedicated threading and reverting the polling loop logic.

* [grid] Fix error messages for event bus

* [grid] Print stacktrace in case of exceptions during eventbus tests

* [grid] Split the Eventbus tests into separate test classes

* [grid] Closing the context in the ZeroMqTcpTest to see if it helps

Co-authored-by: Diego Molina <[email protected]>
  • Loading branch information
pujagani and diemol authored Apr 21, 2021
1 parent 109ee82 commit dd8741a
Show file tree
Hide file tree
Showing 5 changed files with 378 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.logging.Level;
Expand All @@ -63,12 +60,15 @@ class UnboundZmqEventBus implements EventBus {
static final EventName REJECTED_EVENT = new EventName("selenium-rejected-event");
private static final Logger LOG = Logger.getLogger(EventBus.class.getName());
private static final Json JSON = new Json();
private final ScheduledExecutorService socketPollingExecutor;
private final AtomicBoolean pollingStarted = new AtomicBoolean(false);
private final ExecutorService socketPollingExecutor;
private final ExecutorService socketPublishingExecutor;
private final ExecutorService listenerNotificationExecutor;

private final Map<EventName, List<Consumer<Event>>> listeners = new ConcurrentHashMap<>();
private final Queue<UUID> recentMessages = EvictingQueue.create(128);
private final String encodedSecret;
private ZMQ.Poller poller;

private ZMQ.Socket pub;
private ZMQ.Socket sub;
Expand All @@ -81,16 +81,28 @@ class UnboundZmqEventBus implements EventBus {
}
this.encodedSecret = builder.toString();

ThreadFactory threadFactory = r -> {
this.socketPollingExecutor = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setName("Event Bus");
thread.setName("Event Bus Poller");
thread.setDaemon(true);
return thread;
};
this.socketPollingExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
});

this.socketPublishingExecutor = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setName("Event Bus Publisher");
thread.setDaemon(true);
return thread;
});

this.listenerNotificationExecutor = Executors.newFixedThreadPool(
Math.max(Runtime.getRuntime().availableProcessors() / 2, 2), // At least two threads
threadFactory);
r -> {
Thread thread = new Thread(r);
thread.setName("Event Bus Listener Notifier");
thread.setDaemon(true);
return thread;
});

String connectionMessage = String.format("Connecting to %s and %s", publishConnection, subscribeConnection);
LOG.info(connectionMessage);
Expand All @@ -116,25 +128,19 @@ class UnboundZmqEventBus implements EventBus {
}
);
// Connections are already established
ZMQ.Poller poller = context.createPoller(1);
poller.register(Objects.requireNonNull(sub), ZMQ.Poller.POLLIN);
this.poller = context.createPoller(1);
this.poller.register(Objects.requireNonNull(sub), ZMQ.Poller.POLLIN);

LOG.info("Sockets created");

AtomicBoolean pollingStarted = new AtomicBoolean(false);

socketPollingExecutor.scheduleWithFixedDelay(
() -> pollForIncomingEvents(poller, secret, pollingStarted),
0,
100,
TimeUnit.MILLISECONDS);
socketPollingExecutor.submit(new PollingRunnable(secret));

// Give ourselves up to a second to connect, using The World's Worst heuristic. If we don't
// manage to connect, it's not the end of the world, as the socket we're connecting to may not
// be up yet.
while (!pollingStarted.get()) {
try {
Thread.sleep(100);
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
Expand Down Expand Up @@ -176,7 +182,7 @@ public void addListener(EventListener<?> listener) {
public void fire(Event event) {
Require.nonNull("Event to send", event);

socketPollingExecutor.execute(() -> {
socketPublishingExecutor.execute(() -> {
pub.sendMore(event.getType().getName().getBytes(UTF_8));
pub.sendMore(encodedSecret.getBytes(UTF_8));
pub.sendMore(event.getId().toString().getBytes(UTF_8));
Expand All @@ -187,7 +193,9 @@ public void fire(Event event) {
@Override
public void close() {
socketPollingExecutor.shutdownNow();
socketPublishingExecutor.shutdownNow();
listenerNotificationExecutor.shutdownNow();
poller.close();

if (sub != null) {
sub.close();
Expand All @@ -197,62 +205,76 @@ public void close() {
}
}

private void pollForIncomingEvents(ZMQ.Poller poller, Secret secret, AtomicBoolean pollingStarted) {
try {
int count = poller.poll(0);
private class PollingRunnable implements Runnable {
private Secret secret;

pollingStarted.lazySet(true);
public PollingRunnable(Secret secret) {
this.secret = secret;
}

for (int i = 0; i < count; i++) {
if (poller.pollin(i)) {
ZMQ.Socket socket = poller.getSocket(i);
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
int count = poller.poll(150);

EventName eventName = new EventName(new String(socket.recv(), UTF_8));
Secret eventSecret =
JSON.toType(new String(socket.recv(), UTF_8), Secret.class);
UUID id = UUID.fromString(new String(socket.recv(), UTF_8));
String data = new String(socket.recv(), UTF_8);
pollingStarted.lazySet(true);

// Don't bother doing more work if we've seen this message.
if (recentMessages.contains(id)) {
return;
}
for (int i = 0; i < count; i++) {
if (poller.pollin(i)) {
ZMQ.Socket socket = poller.getSocket(i);

Object converted = JSON.toType(data, Object.class);
Event event = new Event(id, eventName, converted);
EventName eventName = new EventName(new String(socket.recv(), UTF_8));
Secret eventSecret =
JSON.toType(new String(socket.recv(), UTF_8), Secret.class);
UUID id = UUID.fromString(new String(socket.recv(), UTF_8));
String data = new String(socket.recv(), UTF_8);

recentMessages.add(id);
// Don't bother doing more work if we've seen this message.
if (recentMessages.contains(id)) {
return;
}

if (!Secret.matches(secret, eventSecret)) {
LOG.severe(
String.format(
"Received message without a valid secret. Rejecting. %s -> %s", event, data));
Event rejectedEvent =
new Event(REJECTED_EVENT, new ZeroMqEventBus.RejectedEvent(eventName, data));
Object converted = JSON.toType(data, Object.class);
Event event = new Event(id, eventName, converted);

notifyListeners(REJECTED_EVENT, rejectedEvent);
recentMessages.add(id);

return;
}
if (!Secret.matches(secret, eventSecret)) {
LOG.log(Level.SEVERE, "Received message without a valid secret. Rejecting. {0} -> {1}",
new Object[]{event, data}); // String formatting only applied if needed
Event rejectedEvent =
new Event(REJECTED_EVENT, new ZeroMqEventBus.RejectedEvent(eventName, data));

notifyListeners(REJECTED_EVENT, rejectedEvent);

notifyListeners(eventName, event);
return;
}
notifyListeners(eventName, event);
}
}
} catch (Exception e) {
if (e.getCause() instanceof AssertionError) {
// Do nothing.
} else {
LOG.log(Level.WARNING, e, () -> "Caught exception while polling for event bus messages: "
+ e.getMessage());
throw e;
}
}
}
} catch (Throwable e) {
// if the exception escapes, then we never get scheduled again
LOG.log(Level.WARNING, e, () -> "Caught and swallowed exception: " + e.getMessage());
}
}

private void notifyListeners(EventName eventName, Event event) {
List<Consumer<Event>> eventListeners = listeners.getOrDefault(eventName, new ArrayList<>());
eventListeners
.forEach(listener -> listenerNotificationExecutor.submit(() -> {
try {
listener.accept(event);
} catch (Throwable t) {
LOG.log(Level.WARNING, t, () -> "Caught exception from listener: " + listener);
}
}));
private void notifyListeners(EventName eventName, Event event) {
List<Consumer<Event>> eventListeners = listeners.getOrDefault(eventName, new ArrayList<>());
eventListeners
.forEach(listener -> listenerNotificationExecutor.submit(() -> {
try {
listener.accept(event);
} catch (Exception e) {
LOG.log(Level.WARNING, e, () -> "Caught exception from listener: " + listener);
}
}));
}
}
}
88 changes: 88 additions & 0 deletions java/server/test/org/openqa/selenium/events/EventBusGuavaTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to the Software Freedom Conservancy (SFC) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The SFC licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.openqa.selenium.events;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.openqa.selenium.events.local.GuavaEventBus;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;

public class EventBusGuavaTest {
private EventBus bus;

@Before
public void getBus() {
bus = new GuavaEventBus();
}

@After
public void closeBus() {
bus.close();
}

@Test(timeout = 4000)
public void shouldBeAbleToPublishToAKnownTopic() throws InterruptedException {
EventName cheese = new EventName("cheese");
Event event = new Event(cheese, null);

CountDownLatch latch = new CountDownLatch(1);
bus.addListener(new EventListener<>(cheese, Object.class, obj -> latch.countDown()));
bus.fire(event);
latch.await(1, SECONDS);

assertThat(latch.getCount()).isEqualTo(0);
}

@Test(timeout = 4000)
public void shouldNotReceiveEventsNotMeantForTheTopic() {
AtomicInteger count = new AtomicInteger(0);
bus.addListener(new EventListener<>(new EventName("peas"), Object.class, obj -> count.incrementAndGet()));

bus.fire(new Event(new EventName("cheese"), null));

assertThat(count.get()).isEqualTo(0);
}

@Test
public void shouldBeAbleToFireEventsInParallel() throws InterruptedException {
int maxCount = 100;
EventName name = new EventName("cheese");

CountDownLatch count = new CountDownLatch(maxCount);
bus.addListener(new EventListener<>(name, Object.class, obj -> count.countDown()));

ExecutorService executor = Executors.newCachedThreadPool();
try {
for (int i = 0; i < maxCount; i++) {
executor.submit(() -> bus.fire(new Event(name, "")));
}

assertThat(count.await(20, SECONDS)).describedAs(count.toString()).isTrue();
} finally {
executor.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,65 +17,34 @@

package org.openqa.selenium.events;

import com.google.common.collect.ImmutableSet;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.openqa.selenium.events.local.GuavaEventBus;
import org.openqa.selenium.events.zeromq.ZeroMqEventBus;
import org.openqa.selenium.grid.security.Secret;
import org.openqa.selenium.net.PortProber;
import org.zeromq.ZContext;

import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;

@RunWith(Parameterized.class)
public class EventBusTest {

@Parameterized.Parameters(name = "EventBus {0}")
public static Collection<Supplier<EventBus>> buildEventBuses() {
Secret secret = new Secret("cheese");

return ImmutableSet.of(
() -> ZeroMqEventBus.create(
new ZContext(),
"inproc://bus-pub",
"inproc://bus-sub",
true,
secret),
() -> ZeroMqEventBus.create(
new ZContext(),
"tcp://*:" + PortProber.findFreePort(),
"tcp://*:" + PortProber.findFreePort(),
true,
secret),
() -> ZeroMqEventBus.create(
new ZContext(),
"tcp://localhost:" + PortProber.findFreePort(),
"tcp://localhost:" + PortProber.findFreePort(),
true,
secret),
GuavaEventBus::new);
}

@Parameterized.Parameter
public Supplier<EventBus> busSupplier;

public class ZeroMqInProcTest {
private EventBus bus;

@Before
public void getBus() {
bus = busSupplier.get();
Secret secret = new Secret("cheese");
bus = ZeroMqEventBus.create(
new ZContext(),
"inproc://bus-pub",
"inproc://bus-sub",
true,
secret);
}

@After
Expand Down
Loading

0 comments on commit dd8741a

Please sign in to comment.