Skip to content

Commit

Permalink
Compute score using lodestar score and gossipsub score (#3875)
Browse files Browse the repository at this point in the history
* Refactor PeerRpcScoreStore: add PeerScore class

* Aggregate lodestarScore, gossipsubScore to compute final score

* updateGossipsubScores util and unit test

* Populate PeerScore on updateGossipsubScore

* Fix peerManager e2e test

* Fix test/sim/multiNodeSingleThread.test.ts
  • Loading branch information
twoeths authored Mar 29, 2022
1 parent dd76614 commit d7cdd4a
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 48 deletions.
2 changes: 1 addition & 1 deletion packages/lodestar/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
"jwt-simple": "0.5.6",
"libp2p": "^0.36.2",
"libp2p-bootstrap": "^0.14.0",
"libp2p-gossipsub": "tuyennhv/js-libp2p-gossipsub#6a9965ecf095182c05808f2064f3a63d95ce707c",
"libp2p-gossipsub": "tuyennhv/js-libp2p-gossipsub#4b14a2640d23cbe3a8352cb9fd4b9ebb058f61f9",
"libp2p-mdns": "^0.18.0",
"libp2p-mplex": "^0.10.5",
"libp2p-tcp": "^0.17.2",
Expand Down
5 changes: 4 additions & 1 deletion packages/lodestar/src/network/gossip/gossipsub.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Libp2p from "libp2p";
import Gossipsub from "libp2p-gossipsub";
import {GossipsubMessage, SignaturePolicy, TopicStr} from "libp2p-gossipsub/src/types";
import {PeerScore} from "libp2p-gossipsub/src/score";
import {PeerScore, PeerScoreParams} from "libp2p-gossipsub/src/score";
import PeerId from "peer-id";
import {AbortSignal} from "@chainsafe/abort-controller";
import {IBeaconConfig} from "@chainsafe/lodestar-config";
Expand Down Expand Up @@ -76,6 +76,7 @@ export type Eth2GossipsubOpts = {
*/
export class Eth2Gossipsub extends Gossipsub {
readonly jobQueues: GossipJobQueues;
readonly scoreParams: Partial<PeerScoreParams>;
private readonly config: IBeaconConfig;
private readonly logger: ILogger;

Expand Down Expand Up @@ -109,6 +110,7 @@ export class Eth2Gossipsub extends Gossipsub {
metricsRegister: modules.metrics ? ((modules.metrics.register as unknown) as MetricsRegister) : null,
metricsTopicStrToLabel: modules.metrics ? getMetricsTopicStrToLabel(modules.config) : undefined,
});
this.scoreParams = scoreParams;
const {config, logger, metrics, signal, gossipHandlers} = modules;
this.config = config;
this.logger = logger;
Expand Down Expand Up @@ -145,6 +147,7 @@ export class Eth2Gossipsub extends Gossipsub {
this.logger.verbose("Publish to topic", {topic: topicStr});
const sszType = getGossipSSZType(topic);
const messageData = (sszType.serialize as (object: GossipTypeMap[GossipType]) => Uint8Array)(object);
// TODO: log number of sent peers
await this.publish(topicStr, messageData);
}

Expand Down
1 change: 1 addition & 0 deletions packages/lodestar/src/network/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ export class Network implements INetwork {
{
libp2p,
reqResp: this.reqResp,
gossip: this.gossip,
attnetsService: this.attnetsService,
syncnetsService: this.syncnetsService,
logger,
Expand Down
25 changes: 24 additions & 1 deletion packages/lodestar/src/network/peers/peerManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {IReqResp, ReqRespMethod, RequestTypedContainer} from "../reqresp";
import {prettyPrintPeerId, getClientFromPeerStore} from "../util";
import {ISubnetsService} from "../subnets";
import {PeerDiscovery, SubnetDiscvQueryMs} from "./discover";
import {IPeerRpcScoreStore, ScoreState} from "./score";
import {IPeerRpcScoreStore, ScoreState, updateGossipsubScores} from "./score";
import {
getConnectedPeerIds,
hasSomeConnectedPeer,
Expand All @@ -21,6 +21,7 @@ import {
renderIrrelevantPeerType,
} from "./utils";
import {SubnetType} from "../metadata";
import {Eth2Gossipsub} from "../gossip/gossipsub";

/** heartbeat performs regular updates such as updating reputations and performing discovery requests */
const HEARTBEAT_INTERVAL_MS = 30 * 1000;
Expand All @@ -34,6 +35,11 @@ const STATUS_INBOUND_GRACE_PERIOD = 15 * 1000;
/** Internal interval to check PING and STATUS timeouts */
const CHECK_PING_STATUS_INTERVAL = 10 * 1000;

/**
* Relative factor of peers that are allowed to have a negative gossipsub score without penalizing them in lodestar.
*/
const ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR = 0.1;

// TODO:
// maxPeers and targetPeers should be dynamic on the num of validators connected
// The Node should compute a recomended value every interval and log a warning
Expand Down Expand Up @@ -64,6 +70,7 @@ export type PeerManagerModules = {
logger: ILogger;
metrics: IMetrics | null;
reqResp: IReqResp;
gossip: Eth2Gossipsub;
attnetsService: ISubnetsService;
syncnetsService: ISubnetsService;
chain: IBeaconChain;
Expand Down Expand Up @@ -103,6 +110,7 @@ export class PeerManager {
private logger: ILogger;
private metrics: IMetrics | null;
private reqResp: IReqResp;
private gossipsub: Eth2Gossipsub;
private attnetsService: ISubnetsService;
private syncnetsService: ISubnetsService;
private chain: IBeaconChain;
Expand All @@ -123,6 +131,7 @@ export class PeerManager {
this.logger = modules.logger;
this.metrics = modules.metrics;
this.reqResp = modules.reqResp;
this.gossipsub = modules.gossip;
this.attnetsService = modules.attnetsService;
this.syncnetsService = modules.syncnetsService;
this.chain = modules.chain;
Expand Down Expand Up @@ -158,6 +167,10 @@ export class PeerManager {
this.intervals = [
setInterval(this.pingAndStatusTimeouts.bind(this), CHECK_PING_STATUS_INTERVAL),
setInterval(this.heartbeat.bind(this), HEARTBEAT_INTERVAL_MS),
setInterval(
this.updateGossipsubScores.bind(this),
this.gossipsub.scoreParams.decayInterval ?? HEARTBEAT_INTERVAL_MS
),
];
}

Expand Down Expand Up @@ -448,6 +461,16 @@ export class PeerManager {
}
}

private updateGossipsubScores(): void {
const gossipsubScores = new Map<string, number>();
for (const peerIdStr of this.connectedPeers.keys()) {
gossipsubScores.set(peerIdStr, this.gossipsub.getScore(peerIdStr));
}

const toIgnoreNegativePeers = Math.ceil(this.opts.targetPeers * ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR);
updateGossipsubScores(this.peerRpcScores, gossipsubScores, toIgnoreNegativePeers);
}

private pingAndStatusTimeouts(): void {
const now = Date.now();
const peersToStatus: PeerId[] = [];
Expand Down
169 changes: 137 additions & 32 deletions packages/lodestar/src/network/peers/score.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import PeerId from "peer-id";
import {pruneSetToMax} from "../../util/map";
import {MapDef, pruneSetToMax} from "../../util/map";
import {gossipScoreThresholds} from "../gossip/scoringParameters";

/** The default score for new peers */
const DEFAULT_SCORE = 0;
/** The minimum reputation before a peer is disconnected */
const MIN_SCORE_BEFORE_DISCONNECT = -20;
/** The minimum reputation before a peer is banned */
const MIN_SCORE_BEFORE_BAN = -50;
// If a peer has a lodestar score below this constant all other score parts will get ignored and
// the peer will get banned regardless of the other parts.
const MIN_LODESTAR_SCORE_BEFORE_BAN = -60.0;
/** The maximum score a peer can obtain */
const MAX_SCORE = 100;
/** The minimum score a peer can obtain */
Expand All @@ -20,6 +24,12 @@ const HALFLIFE_DECAY_MS = -Math.log(2) / SCORE_HALFLIFE_MS;
const BANNED_BEFORE_DECAY_MS = 30 * 60 * 1000;
/** Limit of entries in the scores map */
const MAX_ENTRIES = 1000;
/**
* We weight negative gossipsub scores in such a way that they never result in a disconnect by
* themselves. This "solves" the problem of non-decaying gossipsub scores for disconnected peers.
*/
const GOSSIPSUB_NEGATIVE_SCORE_WEIGHT = (MIN_SCORE_BEFORE_DISCONNECT + 1) / gossipScoreThresholds.graylistThreshold;
const GOSSIPSUB_POSITIVE_SCORE_WEIGHT = GOSSIPSUB_NEGATIVE_SCORE_WEIGHT;

export enum PeerAction {
/** Immediately ban peer */
Expand Down Expand Up @@ -70,6 +80,7 @@ export interface IPeerRpcScoreStore {
getScoreState(peer: PeerId): ScoreState;
applyAction(peer: PeerId, action: PeerAction, actionName?: string): void;
update(): void;
updateGossipsubScore(peerId: PeerIdStr, newScore: number, ignore: boolean): void;
}

/**
Expand All @@ -78,21 +89,20 @@ export interface IPeerRpcScoreStore {
* The decay rate applies equally to positive and negative scores.
*/
export class PeerRpcScoreStore implements IPeerRpcScoreStore {
private readonly scores = new Map<string, number>();
private readonly lastUpdate = new Map<string, number>();

private readonly scores = new MapDef<PeerIdStr, PeerScore>(() => new PeerScore());
// TODO: Persist scores, at least BANNED status to disk

getScore(peer: PeerId): number {
return this.scores.get(peer.toB58String()) ?? DEFAULT_SCORE;
return this.scores.get(peer.toB58String())?.getScore() ?? DEFAULT_SCORE;
}

getScoreState(peer: PeerId): ScoreState {
return scoreToState(this.getScore(peer));
}

applyAction(peer: PeerId, action: PeerAction, actionName?: string): void {
this.add(peer, peerActionScore[action]);
const peerScore = this.scores.getOrDefault(peer.toB58String());
peerScore.add(peerActionScore[action]);

// TODO: Log action to debug + do metrics
actionName;
Expand All @@ -101,56 +111,151 @@ export class PeerRpcScoreStore implements IPeerRpcScoreStore {
update(): void {
// Bound size of data structures
pruneSetToMax(this.scores, MAX_ENTRIES);
pruneSetToMax(this.lastUpdate, MAX_ENTRIES);

for (const [peerIdStr, prevScore] of this.scores) {
const newScore = this.decayScore(peerIdStr, prevScore);
for (const [peerIdStr, peerScore] of this.scores) {
const newScore = peerScore.update();

// Prune scores below threshold
if (Math.abs(newScore) < SCORE_THRESHOLD) {
this.scores.delete(peerIdStr);
this.lastUpdate.delete(peerIdStr);
}

// If above threshold, persist decayed value
else {
this.scores.set(peerIdStr, newScore);
}
}
}

private decayScore(peer: PeerIdStr, prevScore: number): number {
updateGossipsubScore(peerId: PeerIdStr, newScore: number, ignore: boolean): void {
const peerScore = this.scores.getOrDefault(peerId);
peerScore.updateGossipsubScore(newScore, ignore);
}
}

/**
* Manage score of a peer.
*/
export class PeerScore {
private lodestarScore: number;
private gossipScore: number;
private ignoreNegativeGossipScore: boolean;
/** The final score, computed from the above */
private score: number;
private lastUpdate: number;

constructor() {
this.lodestarScore = DEFAULT_SCORE;
this.gossipScore = DEFAULT_SCORE;
this.score = DEFAULT_SCORE;
this.ignoreNegativeGossipScore = false;
this.lastUpdate = Date.now();
}

getScore(): number {
return this.score;
}

add(scoreDelta: number): void {
let newScore = this.lodestarScore + scoreDelta;
if (newScore > MAX_SCORE) newScore = MAX_SCORE;
if (newScore < MIN_SCORE) newScore = MIN_SCORE;

this.setLodestarScore(newScore);
}

/**
* Applies time-based logic such as decay rates to the score.
* This function should be called periodically.
*
* Return the new score.
*/
update(): number {
const nowMs = Date.now();
const lastUpdate = this.lastUpdate.get(peer) ?? nowMs;

// Decay the current score
// Using exponential decay based on a constant half life.
const sinceLastUpdateMs = nowMs - lastUpdate;
const sinceLastUpdateMs = nowMs - this.lastUpdate;
// If peer was banned, lastUpdate will be in the future
if (sinceLastUpdateMs > 0 && prevScore !== 0) {
this.lastUpdate.set(peer, nowMs);
if (sinceLastUpdateMs > 0 && this.lodestarScore !== 0) {
this.lastUpdate = nowMs;
// e^(-ln(2)/HL*t)
const decayFactor = Math.exp(HALFLIFE_DECAY_MS * sinceLastUpdateMs);
return prevScore * decayFactor;
} else {
return prevScore;
this.setLodestarScore(this.lodestarScore * decayFactor);
}

return this.lodestarScore;
}

private add(peer: PeerId, scoreDelta: number): void {
const prevScore = this.getScore(peer);
updateGossipsubScore(newScore: number, ignore: boolean): void {
// we only update gossipsub if last_updated is in the past which means either the peer is
// not banned or the BANNED_BEFORE_DECAY time is over.
if (this.lastUpdate <= Date.now()) {
this.gossipScore = newScore;
this.ignoreNegativeGossipScore = ignore;
}
}

let newScore = this.decayScore(peer.toB58String(), prevScore) + scoreDelta;
if (newScore > MAX_SCORE) newScore = MAX_SCORE;
if (newScore < MIN_SCORE) newScore = MIN_SCORE;
/**
* Updating lodestarScore should always go through this method,
* so that we update this.score accordingly.
*/
private setLodestarScore(newScore: number): void {
this.lodestarScore = newScore;
this.updateState();
}

/**
* Compute the final score, ban peer if needed
*/
private updateState(): void {
const prevState = scoreToState(this.score);
this.recomputeScore();
const newState = scoreToState(this.score);

const prevState = scoreToState(prevScore);
const newState = scoreToState(newScore);
if (prevState !== ScoreState.Banned && newState === ScoreState.Banned) {
// ban this peer for at least BANNED_BEFORE_DECAY_MS seconds
this.lastUpdate.set(peer.toB58String(), Date.now() + BANNED_BEFORE_DECAY_MS);
this.lastUpdate = Date.now() + BANNED_BEFORE_DECAY_MS;
}
}

this.scores.set(peer.toB58String(), newScore);
/**
* Compute the final score
*/
private recomputeScore(): void {
this.score = this.lodestarScore;
if (this.score <= MIN_LODESTAR_SCORE_BEFORE_BAN) {
// ignore all other scores, i.e. do nothing here
return;
}

if (this.gossipScore >= 0) {
this.score += this.gossipScore * GOSSIPSUB_POSITIVE_SCORE_WEIGHT;
} else if (!this.ignoreNegativeGossipScore) {
this.score += this.gossipScore * GOSSIPSUB_NEGATIVE_SCORE_WEIGHT;
}
}
}

/**
* Utility to update gossipsub score of connected peers
*/
export function updateGossipsubScores(
peerRpcScores: IPeerRpcScoreStore,
gossipsubScores: Map<string, number>,
toIgnoreNegativePeers: number
): void {
// sort by gossipsub score desc
const sortedPeerIds = Array.from(gossipsubScores.keys()).sort(
(a, b) => (gossipsubScores.get(b) ?? 0) - (gossipsubScores.get(a) ?? 0)
);
for (const peerId of sortedPeerIds) {
const gossipsubScore = gossipsubScores.get(peerId);
if (gossipsubScore !== undefined) {
let ignore = false;
if (gossipsubScore < 0 && toIgnoreNegativePeers > 0) {
// We ignore the negative score for the best negative peers so that their
// gossipsub score can recover without getting disconnected.
ignore = true;
toIgnoreNegativePeers -= 1;
}

peerRpcScores.updateGossipsubScore(peerId, gossipsubScore, ignore);
}
}
}
3 changes: 2 additions & 1 deletion packages/lodestar/test/e2e/network/peers/peerManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {expect} from "chai";
import {config} from "@chainsafe/lodestar-config/default";
import {IReqResp, ReqRespMethod} from "../../../../src/network/reqresp";
import {PeerRpcScoreStore, PeerManager} from "../../../../src/network/peers";
import {NetworkEvent, NetworkEventBus} from "../../../../src/network";
import {Eth2Gossipsub, NetworkEvent, NetworkEventBus} from "../../../../src/network";
import {createNode, getAttnets, getSyncnets} from "../../../utils/network";
import {MockBeaconChain} from "../../../utils/mocks/chain/chain";
import {generateEmptySignedBlock} from "../../../utils/block";
Expand Down Expand Up @@ -82,6 +82,7 @@ describe("network / peers / PeerManager", function () {
networkEventBus,
attnetsService: mockSubnetsService,
syncnetsService: mockSubnetsService,
gossip: ({getScore: () => 0, scoreParams: {decayInterval: 1000}} as unknown) as Eth2Gossipsub,
},
{
targetPeers: 30,
Expand Down
Loading

0 comments on commit d7cdd4a

Please sign in to comment.