Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GossipSub 1.2] Add IDONTWANT support #374

Merged
merged 20 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import java.util.Collections.singletonList
import java.util.Optional
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ScheduledExecutorService
import java.util.function.BiConsumer
import java.util.function.Consumer

// 1 MB default max message size
const val DEFAULT_MAX_PUBSUB_MESSAGE_SIZE = 1 shl 20
Expand Down Expand Up @@ -223,7 +221,7 @@ abstract class AbstractRouter(

validFuts.forEach { (msg, validationFut) ->
validationFut.thenAcceptAsync(
Consumer { res ->
{ res ->
seenMessages[msg] = Optional.of(res)
if (res == ValidationResult.Invalid) notifyUnseenInvalidMessage(peer, msg)
},
Expand All @@ -247,7 +245,7 @@ abstract class AbstractRouter(
// broadcast others on completion
undone.forEach {
it.second.whenCompleteAsync(
BiConsumer { res, err ->
{ res, err ->
when {
err != null -> logger.warn("Exception while handling message from peer $peer: ${it.first}", err)
res == ValidationResult.Invalid -> logger.debug("Invalid pubsub message from peer $peer: ${it.first}")
Expand Down
15 changes: 15 additions & 0 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubProtocol.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,25 @@ enum class PubsubProtocol(val announceStr: ProtocolId) {

Gossip_V_1_0("/meshsub/1.0.0"),
Gossip_V_1_1("/meshsub/1.1.0"),
Gossip_V_1_2("/meshsub/1.2.0"),
Floodsub("/floodsub/1.0.0");

companion object {
fun fromProtocol(protocol: ProtocolId) = PubsubProtocol.values().find { protocol == it.announceStr }
?: throw NoSuchElementException("No PubsubProtocol found with protocol $protocol")
}

/**
* https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#prune-backoff-and-peer-exchange
*/
fun supportsBackoffAndPX(): Boolean {
return this == Gossip_V_1_1 || this == Gossip_V_1_2
}

/**
* https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md#idontwant-message
*/
fun supportsIDontWant(): Boolean {
return this == Gossip_V_1_2
}
}
24 changes: 17 additions & 7 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,23 @@ class Gossip @JvmOverloads constructor(
}

override val protocolDescriptor =
if (router.protocol == PubsubProtocol.Gossip_V_1_1) {
ProtocolDescriptor(
PubsubProtocol.Gossip_V_1_1.announceStr,
PubsubProtocol.Gossip_V_1_0.announceStr
)
} else {
ProtocolDescriptor(PubsubProtocol.Gossip_V_1_0.announceStr)
when (router.protocol) {
PubsubProtocol.Gossip_V_1_2 -> {
ProtocolDescriptor(
PubsubProtocol.Gossip_V_1_2.announceStr,
PubsubProtocol.Gossip_V_1_1.announceStr,
PubsubProtocol.Gossip_V_1_0.announceStr
)
}
PubsubProtocol.Gossip_V_1_1 -> {
ProtocolDescriptor(
PubsubProtocol.Gossip_V_1_1.announceStr,
PubsubProtocol.Gossip_V_1_0.announceStr
)
}
else -> {
ProtocolDescriptor(PubsubProtocol.Gossip_V_1_0.announceStr)
}
}

override fun handleConnection(conn: Connection) {
Expand Down
19 changes: 18 additions & 1 deletion libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,24 @@ data class GossipParams(
* callback to notify outer system to which peers Gossip wants to be connected
* The second parameter is a signed peer record: https://github.com/libp2p/specs/pull/217
*/
val connectCallback: (PeerId, ByteArray) -> Unit = { _: PeerId, _: ByteArray -> }
val connectCallback: (PeerId, ByteArray) -> Unit = { _: PeerId, _: ByteArray -> },

/**
* [maxIDontWantMessageIds] is the maximum number of IDONTWANT message ids allowed per heartbeat per peer
*/
val maxIDontWantMessageIds: Int = maxIHaveLength * maxIHaveMessages,

/**
* [iDontWantMinMessageSizeThreshold] controls the minimum size (in bytes) that an incoming message needs to be so that an IDONTWANT message is sent to mesh peers.
* The default is 16 KB.
*/
val iDontWantMinMessageSizeThreshold: Int = 16000,

/**
* [iDontWantTTL] Expiry time for cache of received IDONTWANT messages for peers
*/
val iDontWantTTL: Duration = 3.seconds

) {
init {
check(D >= 0, "D should be >= 0")
Expand Down
91 changes: 77 additions & 14 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import kotlin.collections.any
import kotlin.collections.component1
import kotlin.collections.component2
import kotlin.collections.count
import kotlin.collections.distinct
import kotlin.collections.drop
import kotlin.collections.filter
import kotlin.collections.filterNot
Expand All @@ -46,7 +45,6 @@ import kotlin.collections.reversed
import kotlin.collections.set
import kotlin.collections.shuffled
import kotlin.collections.sortedBy
import kotlin.collections.sum
import kotlin.collections.take
import kotlin.collections.toMutableSet
import kotlin.math.max
Expand All @@ -56,6 +54,7 @@ const val MaxBackoffEntries = 10 * 1024
const val MaxIAskedEntries = 256
const val MaxPeerIHaveEntries = 256
const val MaxIWantRequestsEntries = 10 * 1024
const val MaxPeerIDontWantEntries = 256

typealias CurrentTimeSupplier = () -> Long

Expand Down Expand Up @@ -122,6 +121,7 @@ open class GossipRouter(
private val iAsked = createLRUMap<PeerHandler, AtomicInteger>(MaxIAskedEntries)
private val peerIHave = createLRUMap<PeerHandler, AtomicInteger>(MaxPeerIHaveEntries)
private val iWantRequests = createLRUMap<Pair<PeerHandler, MessageId>, Long>(MaxIWantRequestsEntries)
private val peerIDontWant = createLRUMap<PeerHandler, IDontWantCacheEntry>(MaxPeerIDontWantEntries)
private val heartbeatTask by lazy {
executor.scheduleWithFixedDelay(
::catchingHeartbeat,
Expand Down Expand Up @@ -166,6 +166,7 @@ open class GossipRouter(
}

override fun notifyUnseenMessage(peer: PeerHandler, msg: PubsubMessage) {
iDontWant(msg, peer)
eventBroadcaster.notifyUnseenMessage(peer.peerId, msg)
notifyAnyMessage(peer, msg)
}
Expand Down Expand Up @@ -250,8 +251,8 @@ open class GossipRouter(
}

override fun validateMessageListLimits(msg: Rpc.RPCOrBuilder): Boolean {
val iWantMessageIdCount = msg.control?.iwantList?.map { w -> w.messageIDsCount }?.sum() ?: 0
val iHaveMessageIdCount = msg.control?.ihaveList?.map { w -> w.messageIDsCount }?.sum() ?: 0
val iWantMessageIdCount = msg.control?.iwantList?.sumOf { w -> w.messageIDsCount } ?: 0
val iHaveMessageIdCount = msg.control?.ihaveList?.sumOf { w -> w.messageIDsCount } ?: 0

return params.maxPublishedMessages?.let { msg.publishCount <= it } ?: true &&
params.maxTopicsPerPublishedMessage?.let { msg.publishList.none { m -> m.topicIDsCount > it } } ?: true &&
Expand All @@ -269,6 +270,7 @@ open class GossipRouter(
is Rpc.ControlPrune -> handlePrune(controlMsg, receivedFrom)
is Rpc.ControlIHave -> handleIHave(controlMsg, receivedFrom)
is Rpc.ControlIWant -> handleIWant(controlMsg, receivedFrom)
is Rpc.ControlIDontWant -> handleIDontWant(controlMsg, receivedFrom)
}
}

Expand Down Expand Up @@ -300,7 +302,7 @@ open class GossipRouter(
mesh[topic]?.remove(peer)?.also {
notifyPruned(peer, topic)
}
if (this.protocol == PubsubProtocol.Gossip_V_1_1) {
if (this.protocol.supportsBackoffAndPX()) {
if (msg.hasBackoff()) {
setBackOff(peer, topic, msg.backoff.seconds.toMillis())
} else {
Expand Down Expand Up @@ -348,8 +350,22 @@ open class GossipRouter(
msg.messageIDsList
.mapNotNull { mCache.getMessageForPeer(peer.peerId, it.toWBytes()) }
.filter { it.sentCount < params.gossipRetransmission }
.map { it.msg }
.forEach { submitPublishMessage(peer, it) }
.forEach { submitPublishMessage(peer, it.msg) }
}

private fun handleIDontWant(msg: Rpc.ControlIDontWant, peer: PeerHandler) {
if (!this.protocol.supportsIDontWant()) return
val peerScore = score.score(peer.peerId)
if (peerScore < scoreParams.gossipThreshold) return
val iDontWantCacheEntry = peerIDontWant.computeIfAbsent(peer) { IDontWantCacheEntry() }
iDontWantCacheEntry.heartbeatMessageIdsCount += msg.messageIDsCount
if (iDontWantCacheEntry.heartbeatMessageIdsCount > params.maxIDontWantMessageIds) {
return
}
val timeReceived = currentTimeSupplier()
msg.messageIDsList
.map { it.toWBytes() }
.associateWithTo(iDontWantCacheEntry.messageIdsAndTimeReceived) { timeReceived }
}

private fun processPrunePeers(peersList: List<Rpc.PeerInfo>) {
Expand All @@ -361,18 +377,20 @@ open class GossipRouter(

override fun processControl(ctrl: Rpc.ControlMessage, receivedFrom: PeerHandler) {
ctrl.run {
(graftList + pruneList + ihaveList + iwantList)
(graftList + pruneList + ihaveList + iwantList + idontwantList)
}.forEach { processControlMessage(it, receivedFrom) }
}

override fun broadcastInbound(msgs: List<PubsubMessage>, receivedFrom: PeerHandler) {
msgs.forEach { pubMsg ->
pubMsg.topics
.asSequence()
.mapNotNull { mesh[it] }
.flatten()
.distinct()
.plus(getDirectPeers())
.filter { it != receivedFrom }
.minus(receivedFrom)
.filterNot { peerDoesNotWantMessage(it, pubMsg.messageId) }
.forEach { submitPublishMessage(it, pubMsg) }
mCache += pubMsg
}
Expand All @@ -398,15 +416,17 @@ open class GossipRouter(
}
.flatten()
}
val list = peers.map { submitPublishMessage(it, msg) }
val list = peers
.filterNot { peerDoesNotWantMessage(it, msg.messageId) }
.map { submitPublishMessage(it, msg) }

mCache += msg
flushAllPending()

if (list.isNotEmpty()) {
return anyComplete(list)
return if (list.isNotEmpty()) {
anyComplete(list)
} else {
return completedExceptionally(
completedExceptionally(
NoPeersForOutboundMessageException("No peers for message topics ${msg.topics} found")
)
}
Expand Down Expand Up @@ -459,6 +479,15 @@ open class GossipRouter(
.whenTrue { notifyIWantTimeout(key.first, key.second) }
}

val staleIDontWantTime = this.currentTimeSupplier() - params.iDontWantTTL.toMillis()
peerIDontWant.entries.removeIf { (_, cacheEntry) ->
// reset on heartbeat
cacheEntry.heartbeatMessageIdsCount = 0
cacheEntry.messageIdsAndTimeReceived.values.removeIf { timeReceived -> timeReceived < staleIDontWantTime }
// remove entry for peer if no IDONTWANT message ids are left in the cache
cacheEntry.messageIdsAndTimeReceived.isEmpty()
}

try {
mesh.entries.forEach { (topic, peers) ->

Expand Down Expand Up @@ -565,16 +594,32 @@ open class GossipRouter(
}
}

private fun peerDoesNotWantMessage(peer: PeerHandler, messageId: MessageId): Boolean {
return peerIDontWant[peer]?.messageIdsAndTimeReceived?.contains(messageId) == true
}

private fun iWant(peer: PeerHandler, messageIds: List<MessageId>) {
if (messageIds.isEmpty()) return
messageIds[random.nextInt(messageIds.size)]
.also { iWantRequests[peer to it] = currentTimeSupplier() }
enqueueIwant(peer, messageIds)
}

private fun iDontWant(msg: PubsubMessage, receivedFrom: PeerHandler) {
if (!this.protocol.supportsIDontWant()) return
if (msg.protobufMessage.data.size() < params.iDontWantMinMessageSizeThreshold) return
// we need to send IDONTWANT messages to mesh peers immediately in order for them to have an effect
msg.topics
.mapNotNull { mesh[it] }
.flatten()
.distinct()
.minus(receivedFrom)
.forEach { peer -> sendIdontwant(peer, msg.messageId) }
}

private fun enqueuePrune(peer: PeerHandler, topic: Topic) {
val peerQueue = pendingRpcParts.getQueue(peer)
if (peer.getPeerProtocol() == PubsubProtocol.Gossip_V_1_1 && this.protocol == PubsubProtocol.Gossip_V_1_1) {
if (peer.getPeerProtocol().supportsBackoffAndPX() && this.protocol.supportsBackoffAndPX()) {
val backoffPeers = (getTopicPeers(topic) - peer)
.take(params.maxPeersSentInPruneMsg)
.filter { score.score(it.peerId) >= 0 }
Expand All @@ -594,7 +639,25 @@ open class GossipRouter(
private fun enqueueIhave(peer: PeerHandler, messageIds: List<MessageId>, topic: Topic) =
pendingRpcParts.getQueue(peer).addIHaves(messageIds, topic)

private fun sendIdontwant(peer: PeerHandler, messageId: MessageId) {
if (!peer.getPeerProtocol().supportsIDontWant()) {
return
}
val iDontWant = Rpc.RPC.newBuilder().setControl(
Rpc.ControlMessage.newBuilder().addIdontwant(
Rpc.ControlIDontWant.newBuilder()
.addMessageIDs(messageId.toProtobuf())
)
).build()
send(peer, iDontWant)
}

data class AcceptRequestsWhitelistEntry(val whitelistedTill: Long, val messagesAccepted: Int = 0) {
fun incrementMessageCount() = AcceptRequestsWhitelistEntry(whitelistedTill, messagesAccepted + 1)
}

data class IDontWantCacheEntry(
var heartbeatMessageIdsCount: Int = 0,
val messageIdsAndTimeReceived: MutableMap<MessageId, Long> = mutableMapOf()
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ class GossipParamsBuilder {

private var connectCallback: Function2<PeerId, ByteArray, Unit>? = null

private var maxIDontWantMessageIds: Int? = null

private var iDontWantMinMessageSizeThreshold: Int? = null

private var iDontWantTTL: Duration? = null

init {
val source = GossipParams()
this.D = source.D
Expand Down Expand Up @@ -100,6 +106,9 @@ class GossipParamsBuilder {
this.maxPruneMessages = source.maxPruneMessages
this.gossipRetransmission = source.gossipRetransmission
this.connectCallback = source.connectCallback
this.maxIDontWantMessageIds = source.maxIDontWantMessageIds
this.iDontWantMinMessageSizeThreshold = source.iDontWantMinMessageSizeThreshold
this.iDontWantTTL = source.iDontWantTTL
}

fun D(value: Int): GossipParamsBuilder = apply { D = value }
Expand Down Expand Up @@ -172,6 +181,12 @@ class GossipParamsBuilder {
connectCallback = value
}

fun maxIDontWantMessageIds(value: Int): GossipParamsBuilder = apply { maxIDontWantMessageIds = value }

fun iDontWantMinMessageSizeThreshold(value: Int): GossipParamsBuilder = apply { iDontWantMinMessageSizeThreshold = value }

fun iDontWantTTL(value: Duration): GossipParamsBuilder = apply { iDontWantTTL = value }

fun build(): GossipParams {
calculateMissing()
checkRequiredFields()
Expand Down Expand Up @@ -206,7 +221,10 @@ class GossipParamsBuilder {
pruneBackoff = pruneBackoff!!,
maxPruneMessages = maxPruneMessages,
gossipRetransmission = gossipRetransmission!!,
connectCallback = connectCallback!!
connectCallback = connectCallback!!,
maxIDontWantMessageIds = maxIDontWantMessageIds!!,
iDontWantMinMessageSizeThreshold = iDontWantMinMessageSizeThreshold!!,
iDontWantTTL = iDontWantTTL!!
)
}

Expand Down Expand Up @@ -244,5 +262,8 @@ class GossipParamsBuilder {
check(iWantFollowupTime != null, { "iWantFollowupTime must not be null" })
check(gossipRetransmission != null, { "gossipRetransmission must not be null" })
check(connectCallback != null, { "connectCallback must not be null" })
check(maxIDontWantMessageIds != null, { "maxIDontWantMessageIds must not be null" })
check(iDontWantMinMessageSizeThreshold != null, { "iDontWantMinMessageSizeThreshold must not be null" })
check(iDontWantTTL != null, { "iDontWantTTL must not be null" })
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ typealias GossipScoreFactory =
open class GossipRouterBuilder(

var name: String = "GossipRouter",
var protocol: PubsubProtocol = PubsubProtocol.Gossip_V_1_1,
var protocol: PubsubProtocol = PubsubProtocol.Gossip_V_1_2,

var params: GossipParams = GossipParams(),
var scoreParams: GossipScoreParams = GossipScoreParams(),
Expand Down
Loading
Loading