Skip to content

Commit

Permalink
prioritizePeers: ensure to prune to target peers
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths committed Mar 2, 2023
1 parent e98d8e9 commit 1486489
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 26 deletions.
85 changes: 65 additions & 20 deletions packages/beacon-node/src/network/peers/utils/prioritizePeers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export enum ExcessPeerDisconnectReason {
LOW_SCORE = "low_score",
NO_LONG_LIVED_SUBNET = "no_long_lived_subnet",
TOO_GROUPED_SUBNET = "too_grouped_subnet",
FIND_BETTER_PEERS = "find_better_peers",
}

/**
Expand Down Expand Up @@ -106,7 +107,7 @@ export function prioritizePeers(
})
);

const {attnetQueries, syncnetQueries, peerHasDuty} = requestAttnetPeers(
const {attnetQueries, syncnetQueries, dutiesByPeer} = requestAttnetPeers(
connectedPeers,
activeAttnets,
activeSyncnets,
Expand All @@ -126,7 +127,7 @@ export function prioritizePeers(
maxPeers - connectedPeerCount
);
} else if (connectedPeerCount > targetPeers) {
pruneExcessPeers(connectedPeers, peerHasDuty, activeAttnets, peersToDisconnect, opts);
pruneExcessPeers(connectedPeers, dutiesByPeer, activeAttnets, peersToDisconnect, opts);
}

return {
Expand All @@ -148,14 +149,14 @@ function requestAttnetPeers(
): {
attnetQueries: SubnetDiscvQuery[];
syncnetQueries: SubnetDiscvQuery[];
peerHasDuty: Map<PeerInfo, boolean>;
dutiesByPeer: Map<PeerInfo, number>;
} {
const {targetSubnetPeers = TARGET_SUBNET_PEERS} = opts;
const attnetQueries: SubnetDiscvQuery[] = [];
const syncnetQueries: SubnetDiscvQuery[] = [];

// To filter out peers that are part of 1+ attnets of interest from possible disconnection
const peerHasDuty = new Map<PeerInfo, boolean>();
// To filter out peers containing enough attnets of interest from possible disconnection
const dutiesByPeer = new Map<PeerInfo, number>();

// attnets, do we need queries for more peers
if (activeAttnets.length > 0) {
Expand All @@ -164,16 +165,14 @@ function requestAttnetPeers(

for (const peer of connectedPeers) {
const trueBitIndices = peer.attnetsTrueBitIndices;
let hasDuty = false;
let dutyCount = 0;
for (const {subnet} of activeAttnets) {
if (trueBitIndices.includes(subnet)) {
hasDuty = true;
dutyCount += 1;
peersPerSubnet.set(subnet, 1 + (peersPerSubnet.get(subnet) ?? 0));
}
}
if (hasDuty) {
peerHasDuty.set(peer, true);
}
dutiesByPeer.set(peer, dutyCount);
}

for (const {subnet, toSlot} of activeAttnets) {
Expand All @@ -192,16 +191,14 @@ function requestAttnetPeers(

for (const peer of connectedPeers) {
const trueBitIndices = peer.syncnetsTrueBitIndices;
let hasDuty = false;
let dutyCount = dutiesByPeer.get(peer) ?? 0;
for (const {subnet} of activeSyncnets) {
if (trueBitIndices.includes(subnet)) {
hasDuty = true;
dutyCount += 1;
peersPerSubnet.set(subnet, 1 + (peersPerSubnet.get(subnet) ?? 0));
}
}
if (hasDuty) {
peerHasDuty.set(peer, true);
}
dutiesByPeer.set(peer, dutyCount);
}

for (const {subnet, toSlot} of activeSyncnets) {
Expand All @@ -213,7 +210,7 @@ function requestAttnetPeers(
}
}

return {attnetQueries, syncnetQueries, peerHasDuty};
return {attnetQueries, syncnetQueries, dutiesByPeer};
}

/**
Expand All @@ -229,7 +226,7 @@ function requestAttnetPeers(
*/
function pruneExcessPeers(
connectedPeers: PeerInfo[],
peerHasDuty: Map<PeerInfo, boolean>,
dutiesByPeer: Map<PeerInfo, number>,
activeAttnets: RequestedSubnet[],
peersToDisconnect: MapDef<ExcessPeerDisconnectReason, PeerId[]>,
opts: PrioritizePeersOpts
Expand All @@ -248,12 +245,13 @@ function pruneExcessPeers(

let outboundPeersEligibleForPruning = 0;

// Sort by score ascending, shuffling first to break ties.
const peersEligibleForPruning = sortBy(shuffle(connectedPeers), (peer) => peer.score)
const sortedPeers = sortPeers(connectedPeers, dutiesByPeer);

const peersEligibleForPruning = sortedPeers
// Then, iterate from highest score to lowest doing a manual filter for duties and outbound ratio
.filter((peer) => {
// Peers with duties are not eligible for pruning
if (peerHasDuty.get(peer)) {
if ((dutiesByPeer.get(peer) ?? 0) > 0) {
return false;
}

Expand Down Expand Up @@ -360,9 +358,56 @@ function pruneExcessPeers(
}

peersToDisconnect.set(ExcessPeerDisconnectReason.TOO_GROUPED_SUBNET, tooGroupedPeersToDisconnect);

// 4. Ensure to always to prune to target peers
// In rare case, all peers may have duties and good score but very low long lived subnet,
// and not too grouped to any subnets, we need to always disconnect peers until it reaches targetPeers
// because we want to keep improving peers (long lived subnets + score)
// otherwise we'll not able to accept new peer connection to consider better peers
// see https://github.com/ChainSafe/lodestar/issues/5198
const remainingPeersToDisconnect: PeerId[] = [];
for (const {id} of sortedPeers) {
if (peersToDisconnectCount >= peersToDisconnectTarget) {
break;
}
if (
noLongLivedSubnetPeersToDisconnect.includes(id) ||
badScorePeersToDisconnect.includes(id) ||
tooGroupedPeersToDisconnect.includes(id)
) {
continue;
}
remainingPeersToDisconnect.push(id);
peersToDisconnectCount++;
}

peersToDisconnect.set(ExcessPeerDisconnectReason.FIND_BETTER_PEERS, remainingPeersToDisconnect);
}
}

/**
* Shuffling first to break ties.
* prefer sorting by dutied subnets first then number of long lived subnets,
* peer score is the last criteria since they are supposed to be in the same score range,
* bad score peers are removed by peer manager anyway
*/
export function sortPeers(connectedPeers: PeerInfo[], dutiesByPeer: Map<PeerInfo, number>): PeerInfo[] {
return shuffle(connectedPeers).sort((p1, p2) => {
const dutiedSubnet1 = dutiesByPeer.get(p1) ?? 0;
const dutiedSubnet2 = dutiesByPeer.get(p2) ?? 0;
if (dutiedSubnet1 === dutiedSubnet2) {
const [longLivedSubnets1, longLivedSubnets2] = [p1, p2].map(
(p) => p.attnetsTrueBitIndices.length + p.syncnetsTrueBitIndices.length
);
if (longLivedSubnets1 === longLivedSubnets2) {
return p1.score - p2.score;
}
return longLivedSubnets1 - longLivedSubnets2;
}
return dutiedSubnet1 - dutiedSubnet2;
});
}

/**
* Find subnet that has the most peers and > TARGET_SUBNET_PEERS, return null if peers are not grouped
* to any subnets.
Expand Down
74 changes: 68 additions & 6 deletions packages/beacon-node/test/unit/network/peers/priorization.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
ExcessPeerDisconnectReason,
prioritizePeers,
PrioritizePeersOpts,
sortPeers,
} from "../../../../src/network/peers/utils/prioritizePeers.js";
import {getAttnets, getSyncnets} from "../../../utils/network.js";
import {RequestedSubnet} from "../../../../src/network/peers/utils/index.js";
Expand Down Expand Up @@ -61,16 +62,17 @@ describe("network / peers / priorization", async () => {
connectedPeers: [
{id: peers[0], direction: null, syncnets: none, attnets: getAttnets([3]), score: 0},
{id: peers[1], direction: null, syncnets: none, attnets: getAttnets([5]), score: -5},
{id: peers[2], direction: null, syncnets: none, attnets: getAttnets([5]), score: -20},
{id: peers[3], direction: null, syncnets: none, attnets: getAttnets([5]), score: -40},
{id: peers[2], direction: null, syncnets: none, attnets: getAttnets([5]), score: -10},
{id: peers[3], direction: null, syncnets: none, attnets: getAttnets([5, 6, 7]), score: -19},
],
activeAttnets: [3],
activeSyncnets: [],
opts: {targetPeers: 1, maxPeers: 1, targetSubnetPeers: 1},
expectedResult: {
// Peers sorted by score, excluding with future duties
peersToDisconnect: new Map<ExcessPeerDisconnectReason, PeerId[]>([
[ExcessPeerDisconnectReason.LOW_SCORE, [peers[3], peers[2], peers[1]]],
// peer3 should be the last since it has most subnets
[ExcessPeerDisconnectReason.LOW_SCORE, [peers[2], peers[1], peers[3]]],
]),
peersToConnect: 0,
attnetQueries: [],
Expand Down Expand Up @@ -112,7 +114,7 @@ describe("network / peers / priorization", async () => {
],
activeAttnets: [3],
activeSyncnets: [2],
opts: {targetPeers: 1, maxPeers: 1, targetSubnetPeers: 1},
opts: {targetPeers: 2, maxPeers: 2, targetSubnetPeers: 1},
expectedResult: {
// Peers sorted by long lived subnets
peersToDisconnect: new Map<ExcessPeerDisconnectReason, PeerId[]>([
Expand Down Expand Up @@ -140,7 +142,7 @@ describe("network / peers / priorization", async () => {
],
activeAttnets: [3],
activeSyncnets: [2],
opts: {targetPeers: 1, maxPeers: 1, targetSubnetPeers: 1},
opts: {targetPeers: 4, maxPeers: 4, targetSubnetPeers: 1},
expectedResult: {
// Peers sorted by long lived subnets
peersToDisconnect: new Map<ExcessPeerDisconnectReason, PeerId[]>([
Expand All @@ -151,6 +153,28 @@ describe("network / peers / priorization", async () => {
syncnetQueries: [],
},
},
{
id: "Ensure to prune to target peers",
connectedPeers: [
{id: peers[0], direction: null, syncnets: none, attnets: getAttnets([1, 2, 3]), score: 0},
{id: peers[1], direction: null, syncnets: none, attnets: getAttnets([1, 2]), score: -1.9},
{id: peers[2], direction: null, syncnets: none, attnets: getAttnets([3, 4]), score: -1.8},
{id: peers[3], direction: null, syncnets: none, attnets: getAttnets([4]), score: -1},
{id: peers[4], direction: null, syncnets: none, attnets: getAttnets([5]), score: -1.5},
],
activeAttnets: [1, 2, 3],
activeSyncnets: [],
opts: {targetPeers: 1, maxPeers: 1, targetSubnetPeers: 2},
expectedResult: {
peersToDisconnect: new Map<ExcessPeerDisconnectReason, PeerId[]>([
// the order is based on sortPeers() logic
[ExcessPeerDisconnectReason.FIND_BETTER_PEERS, [peers[4], peers[3], peers[2], peers[1]]],
]),
peersToConnect: 0,
attnetQueries: [],
syncnetQueries: [],
},
},
{
id: "Keep at least 10% of outbound peers",
connectedPeers: [
Expand Down Expand Up @@ -198,7 +222,8 @@ describe("network / peers / priorization", async () => {
expectedResult: {
// Peers sorted by score, excluding with future duties
peersToDisconnect: new Map<ExcessPeerDisconnectReason, PeerId[]>([
[ExcessPeerDisconnectReason.LOW_SCORE, [peers[5], peers[3]]],
// peer 3 has better score but fewer long lived subnets
[ExcessPeerDisconnectReason.LOW_SCORE, [peers[3], peers[5]]],
]),
peersToConnect: 0,
attnetQueries: [{subnet: 3, maxPeersToDiscover: 1, toSlot: 0}],
Expand Down Expand Up @@ -237,3 +262,40 @@ describe("network / peers / priorization", async () => {
return subnets.map((subnet) => ({subnet, toSlot: 0}));
}
});

describe("sortPeers", async function () {
const peers: PeerId[] = [];
for (let i = 0; i < 8; i++) {
const peer = await createSecp256k1PeerId();
peer.toString = () => `peer-${i}`;
peers.push(peer);
}
const none = BitArray.fromBitLen(ATTESTATION_SUBNET_COUNT);

it("should sort peers by dutied subnets then long lived subnets then score", () => {
const connectedPeers = [
{id: peers[3], direction: null, syncnets: none, attnets: getAttnets([0, 4]), score: -1},
{id: peers[2], direction: null, syncnets: none, attnets: getAttnets([2, 3, 5]), score: 0},
{id: peers[1], direction: null, syncnets: none, attnets: getAttnets([3, 5]), score: -1},
{id: peers[0], direction: null, syncnets: none, attnets: getAttnets([6, 7]), score: -1.9},
].map((p) => ({
...p,
attnetsTrueBitIndices: p.attnets?.getTrueBitIndexes() ?? [],
syncnetsTrueBitIndices: p.syncnets?.getTrueBitIndexes() ?? [],
}));

const dutiesByPeer = new Map<typeof connectedPeers[0], number>([
[connectedPeers[0], 2],
[connectedPeers[1], 0],
[connectedPeers[2], 0],
[connectedPeers[3], 0],
]);

expect(sortPeers(connectedPeers, dutiesByPeer).map((p) => p.id.toString())).to.be.deep.equals([
"peer-0",
"peer-1",
"peer-2",
"peer-3",
]);
});
});

0 comments on commit 1486489

Please sign in to comment.