Skip to content

Commit

Permalink
[grid] Implementing node heartbeating that makes the distributor awar…
Browse files Browse the repository at this point in the history
…e about node availability. Fixes #9182
  • Loading branch information
barancev committed Feb 17, 2021
1 parent 0065abd commit e5194a7
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to the Software Freedom Conservancy (SFC) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The SFC licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.openqa.selenium.grid.data;

import org.openqa.selenium.events.Event;
import org.openqa.selenium.events.EventListener;
import org.openqa.selenium.events.EventName;
import org.openqa.selenium.internal.Require;

import java.util.function.Consumer;

public class NodeHeartBeatEvent extends Event {

private static final EventName NODE_HEARTBEAT = new EventName("node-heartbeat");

public NodeHeartBeatEvent(NodeId nodeId) {
super(NODE_HEARTBEAT, Require.nonNull("Node id", nodeId));
}

public static EventListener<NodeId> listener(Consumer<NodeId> handler) {
Require.nonNull("Handler", handler);

return new EventListener<NodeId>(NODE_HEARTBEAT, NodeId.class, handler);
}
}
24 changes: 24 additions & 0 deletions java/server/src/org/openqa/selenium/grid/data/NodeStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.openqa.selenium.json.TypeToken;

import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -40,15 +41,18 @@ public class NodeStatus {
private final int maxSessionCount;
private final Set<Slot> slots;
private final Availability availability;
private Duration heartbeatPeriod;
private final String version;
private final Map<String, String> osInfo;
private long touched = System.currentTimeMillis();

public NodeStatus(
NodeId nodeId,
URI externalUri,
int maxSessionCount,
Set<Slot> slots,
Availability availability,
Duration heartbeatPeriod,
String version,
Map<String, String> osInfo) {
this.nodeId = Require.nonNull("Node id", nodeId);
Expand All @@ -58,6 +62,7 @@ public NodeStatus(
"Make sure that a driver is available on $PATH");
this.slots = unmodifiableSet(new HashSet<>(Require.nonNull("Slots", slots)));
this.availability = Require.nonNull("Availability", availability);
this.heartbeatPeriod = heartbeatPeriod;
this.version = Require.nonNull("Grid Node version", version);
this.osInfo = Require.nonNull("Node host OS info", osInfo);
}
Expand All @@ -68,6 +73,7 @@ public static NodeStatus fromJson(JsonInput input) {
int maxSessions = 0;
Set<Slot> slots = null;
Availability availability = null;
Duration heartbeatPeriod = null;
String version = null;
Map<String, String> osInfo = null;

Expand All @@ -78,6 +84,10 @@ public static NodeStatus fromJson(JsonInput input) {
availability = input.read(Availability.class);
break;

case "heartbeatPeriod":
heartbeatPeriod = Duration.ofMillis(input.read(Long.class));
break;

case "id":
nodeId = input.read(NodeId.class);
break;
Expand Down Expand Up @@ -116,6 +126,7 @@ public static NodeStatus fromJson(JsonInput input) {
maxSessions,
slots,
availability,
heartbeatPeriod,
version,
osInfo);
}
Expand Down Expand Up @@ -179,6 +190,18 @@ public long getLastSessionCreated() {
.orElse(0);
}

public Duration heartbeatPeriod() {
return heartbeatPeriod;
}

public void touch() {
touched = System.currentTimeMillis();
}

public long touched() {
return touched;
}

@Override
public boolean equals(Object o) {
if (!(o instanceof NodeStatus)) {
Expand Down Expand Up @@ -206,6 +229,7 @@ private Map<String, Object> toJson() {
toReturn.put("maxSessions", maxSessionCount);
toReturn.put("slots", slots);
toReturn.put("availability", availability);
toReturn.put("heartbeatPeriod", heartbeatPeriod.toMillis());
toReturn.put("version", version);
toReturn.put("osInfo", osInfo);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Logger;

import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toSet;
import static org.openqa.selenium.grid.data.Availability.DOWN;
import static org.openqa.selenium.grid.data.Availability.DRAINING;
import static org.openqa.selenium.grid.data.Availability.UP;
Expand Down Expand Up @@ -126,6 +128,22 @@ public GridModel refresh(NodeStatus status) {
}
}

public GridModel touch(NodeId id) {
Require.nonNull("Node ID", id);

Lock writeLock = lock.writeLock();
writeLock.lock();
try {
AvailabilityAndNode availabilityAndNode = findNode(id);
if (availabilityAndNode != null) {
availabilityAndNode.status.touch();
}
return this;
} finally {
writeLock.unlock();
}
}

public GridModel remove(NodeId id) {
Require.nonNull("Node ID", id);

Expand All @@ -144,6 +162,51 @@ public GridModel remove(NodeId id) {
}
}

public void purgeDeadNodes() {
long now = System.currentTimeMillis();
long timeout = 30000;
Lock writeLock = lock.writeLock();
writeLock.lock();
try {
Set<NodeStatus> lost = nodes(UP).stream()
.filter(status -> now - status.touched() > status.heartbeatPeriod().toMillis())
.collect(toSet());
Set<NodeStatus> resurrected = nodes(DOWN).stream()
.filter(status -> now - status.touched() <= status.heartbeatPeriod().toMillis())
.collect(toSet());
Set<NodeStatus> dead = nodes(DOWN).stream()
.filter(status -> now - status.touched() > status.heartbeatPeriod().toMillis() * 4)
.collect(toSet());
if (lost.size() > 0) {
LOG.info(String.format(
"Switching nodes %s from UP to DOWN",
lost.stream()
.map(node -> String.format("%s (uri: %s)", node.getId(), node.getUri()))
.collect(joining(", "))));
nodes(UP).removeAll(lost);
nodes(DOWN).addAll(lost);
}
if (resurrected.size() > 0) {
LOG.info(String.format(
"Switching nodes %s from DOWN to UP",
resurrected.stream()
.map(node -> String.format("%s (uri: %s)", node.getId(), node.getUri()))
.collect(joining(", "))));
nodes(UP).addAll(resurrected);
}
if (dead.size() > 0) {
LOG.info(String.format(
"Removing nodes %s that are DOWN for too long",
dead.stream()
.map(node -> String.format("%s (uri: %s)", node.getId(), node.getUri()))
.collect(joining(", "))));
nodes(DOWN).removeAll(dead);
}
} finally {
writeLock.unlock();
}
}

public Availability setAvailability(NodeId id, Availability availability) {
Require.nonNull("Node ID", id);
Require.nonNull("Availability", availability);
Expand Down Expand Up @@ -251,6 +314,7 @@ private NodeStatus rewrite(NodeStatus status, Availability availability) {
status.getMaxSessionCount(),
status.getSlots(),
availability,
status.heartbeatPeriod(),
status.getVersion(),
status.getOsInfo());
}
Expand Down Expand Up @@ -362,6 +426,7 @@ private void amend(Availability availability, NodeStatus status, Slot slot) {
status.getMaxSessionCount(),
newSlots,
status.getAvailability(),
status.heartbeatPeriod(),
status.getVersion(),
status.getOsInfo()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.openqa.selenium.grid.data.NewSessionResponseEvent;
import org.openqa.selenium.grid.data.NodeAddedEvent;
import org.openqa.selenium.grid.data.NodeDrainComplete;
import org.openqa.selenium.grid.data.NodeHeartBeatEvent;
import org.openqa.selenium.grid.data.NodeId;
import org.openqa.selenium.grid.data.NodeRemovedEvent;
import org.openqa.selenium.grid.data.NodeStatus;
Expand Down Expand Up @@ -132,9 +133,13 @@ public LocalDistributor(

bus.addListener(NodeStatusEvent.listener(this::register));
bus.addListener(NodeStatusEvent.listener(model::refresh));
bus.addListener(NodeHeartBeatEvent.listener(model::touch));
bus.addListener(NodeDrainComplete.listener(this::remove));
bus.addListener(NewSessionRequestEvent.listener(requestIds::offer));

Regularly regularly = new Regularly("Local Distributor");
regularly.submit(model::purgeDeadNodes, Duration.ofSeconds(30), Duration.ofSeconds(30));

Thread shutdownHook = new Thread(this::callExecutorShutdown);
Runtime.getRuntime().addShutdownHook(shutdownHook);
NewSessionRunnable runnable = new NewSessionRunnable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ public Duration getRegisterPeriod() {
return Duration.ofSeconds(seconds);
}

public Duration getHeartbeatPeriod() {
// If the user sets 0 or less, we default to 1s.
int seconds = Math.max(config.getInt(NODE_SECTION, "heartbeat-period").orElse(10), 1);
return Duration.ofSeconds(seconds);
}

public Map<Capabilities, Collection<SessionFactory>> getSessionFactories(
/* Danger! Java stereotype ahead! */
Function<Capabilities, Collection<SessionFactory>> factoryFactory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ public class NodeFlags implements HasRoles {
@ConfigValue(section = "node", name = "register-cycle", example = "120")
public int registerPeriod;

@Parameter(
names = "--heartbeat-period",
description = "How often, in seconds, will the Node send heartbeat events to the Distributor " +
"to inform it that the Node is up.")
@ConfigValue(section = "node", name = "heartbeat-period", example = "10")
public int heartbeatPeriod;

@Override
public Set<Role> getRoles() {
return Collections.singleton(NODE_ROLE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -91,6 +92,7 @@ public class OneShotNode extends Node {
private final EventBus events;
private final WebDriverInfo driverInfo;
private final Capabilities stereotype;
private final Duration heartbeatPeriod;
private final URI gridUri;
private final UUID slotId = UUID.randomUUID();
private RemoteWebDriver driver;
Expand All @@ -103,13 +105,15 @@ private OneShotNode(
Tracer tracer,
EventBus events,
Secret registrationSecret,
Duration heartbeatPeriod,
NodeId id,
URI uri,
URI gridUri,
Capabilities stereotype,
WebDriverInfo driverInfo) {
super(tracer, id, uri, registrationSecret);

this.heartbeatPeriod = heartbeatPeriod;
this.events = Require.nonNull("Event bus", events);
this.gridUri = Require.nonNull("Public Grid URI", gridUri);
this.stereotype = ImmutableCapabilities.copyOf(Require.nonNull("Stereotype", stereotype));
Expand Down Expand Up @@ -147,6 +151,7 @@ public static Node create(Config config) {
loggingOptions.getTracer(),
eventOptions.getEventBus(),
secretOptions.getRegistrationSecret(),
nodeOptions.getHeartbeatPeriod(),
new NodeId(UUID.randomUUID()),
serverOptions.getExternalUri(),
nodeOptions.getPublicGridUri().orElseThrow(() -> new ConfigException("Unable to determine public grid address")),
Expand Down Expand Up @@ -360,6 +365,7 @@ public NodeStatus getStatus() {
Optional.empty() :
Optional.of(new Session(sessionId, getUri(), stereotype, capabilities, Instant.now())))),
isDraining() ? DRAINING : UP,
heartbeatPeriod,
getNodeVersion(),
getOsInfo());
}
Expand Down
Loading

0 comments on commit e5194a7

Please sign in to comment.