Skip to content

Commit

Permalink
floodPublishMaxMessageSizeThreshold
Browse files Browse the repository at this point in the history
  • Loading branch information
tbenr committed Oct 21, 2024
1 parent bea7cf9 commit 350112d
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 24 deletions.
14 changes: 9 additions & 5 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,13 @@ data class GossipParams(
val seenTTL: Duration = 2.minutes,

/**
* [floodPublish] is a gossipsub router option that enables flood publishing.
* When this is enabled, published messages are forwarded to all peers with score >=
* to publishThreshold
* [floodPublishMaxMessageSizeThreshold] controls the maximum size (in bytes) a message will be
* published using flood publishing mode.
* When a message size is <= [floodPublishMaxMessageSizeThreshold], published messages are forwarded
* to all peers with score >= to [GossipScoreParams.publishThreshold]
* The default is 0 KiB (never flood publish).
*/
val floodPublish: Boolean = false,
val floodPublishMaxMessageSizeThreshold: Int = 0,

/**
* [gossipFactor] affects how many peers we will emit gossip to at each heartbeat.
Expand Down Expand Up @@ -240,7 +242,7 @@ data class GossipParams(

/**
* [iDontWantMinMessageSizeThreshold] controls the minimum size (in bytes) that an incoming message needs to be so that an IDONTWANT message is sent to mesh peers.
* The default is 16 KB.
* The default is 16 KiB.
*/
val iDontWantMinMessageSizeThreshold: Int = 16384,

Expand All @@ -260,6 +262,8 @@ data class GossipParams(
check(DLow <= D, "DLow should be <= D")
check(DHigh >= D, "DHigh should be >= D")
check(gossipFactor in 0.0..1.0, "gossipFactor should be in range [0.0, 1.0]")
check(floodPublishMaxMessageSizeThreshold >= 0, "floodPublishMaxMessageSizeThreshold should be >= 0")
check(iDontWantMinMessageSizeThreshold >= 0, "iDontWantMinMessageSizeThreshold should be >= 0")
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,10 @@ open class GossipRouter(
override fun broadcastOutbound(msg: PubsubMessage): CompletableFuture<Unit> {
msg.topics.forEach { lastPublished[it] = currentTimeSupplier() }

val floodPublish = msg.protobufMessage.data.size() <= params.floodPublishMaxMessageSizeThreshold

val peers =
if (params.floodPublish) {
if (floodPublish) {
msg.topics
.flatMap { getTopicPeers(it) }
.filter { score.score(it.peerId) >= scoreParams.publishThreshold }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class GossipParamsBuilder {

private var iDontWantMinMessageSizeThreshold: Int? = null

private var floodPublishMaxMessageSizeThreshold: Int? = null

private var iDontWantTTL: Duration? = null

init {
Expand All @@ -90,7 +92,6 @@ class GossipParamsBuilder {
this.maxPeersSentInPruneMsg = source.maxPeersSentInPruneMsg
this.maxPeersAcceptedInPruneMsg = source.maxPeersAcceptedInPruneMsg
this.pruneBackoff = source.pruneBackoff
this.floodPublish = source.floodPublish
this.gossipFactor = source.gossipFactor
this.opportunisticGraftPeers = source.opportunisticGraftPeers
this.opportunisticGraftTicks = source.opportunisticGraftTicks
Expand Down Expand Up @@ -141,8 +142,6 @@ class GossipParamsBuilder {

fun pruneBackoff(value: Duration): GossipParamsBuilder = apply { pruneBackoff = value }

fun floodPublish(value: Boolean): GossipParamsBuilder = apply { floodPublish = value }

fun gossipFactor(value: Double): GossipParamsBuilder = apply { gossipFactor = value }

fun opportunisticGraftPeers(value: Int): GossipParamsBuilder = apply {
Expand Down Expand Up @@ -185,6 +184,8 @@ class GossipParamsBuilder {

fun iDontWantMinMessageSizeThreshold(value: Int): GossipParamsBuilder = apply { iDontWantMinMessageSizeThreshold = value }

fun floodPublishMaxMessageSizeThreshold(value: Int): GossipParamsBuilder = apply { floodPublishMaxMessageSizeThreshold = value }

fun iDontWantTTL(value: Duration): GossipParamsBuilder = apply { iDontWantTTL = value }

fun build(): GossipParams {
Expand All @@ -203,7 +204,7 @@ class GossipParamsBuilder {
gossipHistoryLength = gossipHistoryLength!!,
heartbeatInterval = heartbeatInterval!!,
seenTTL = seenTTL!!,
floodPublish = floodPublish!!,
floodPublishMaxMessageSizeThreshold = floodPublishMaxMessageSizeThreshold!!,
gossipFactor = gossipFactor!!,
opportunisticGraftPeers = opportunisticGraftPeers!!,
opportunisticGraftTicks = opportunisticGraftTicks!!,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit

class GossipPubsubRouterTest : PubsubRouterTest(
createGossipFuzzRouterFactory {
GossipRouterBuilder(params = GossipParams(3, 3, 100, floodPublish = false))
GossipRouterBuilder(params = GossipParams(3, 3, 100, floodPublishMaxMessageSizeThreshold = 0))
}
) {

Expand Down Expand Up @@ -59,7 +59,7 @@ class GossipPubsubRouterTest : PubsubRouterTest(
// this is to test ihave/iwant
fuzz.timeController.addTime(Duration.ofMillis(1))

val r = { GossipRouterBuilder(params = GossipParams(3, 3, 3, DOut = 0, DLazy = 1000, floodPublish = false)) }
val r = { GossipRouterBuilder(params = GossipParams(3, 3, 3, DOut = 0, DLazy = 1000, floodPublishMaxMessageSizeThreshold = 0)) }
val routerCenter = fuzz.createTestGossipRouter(r)
allRouters.add(0, routerCenter)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,9 @@ class GossipV1_1Tests : GossipTestsBase() {

@Test
fun testNotFloodPublish() {
val message = newMessage("topic1", 0L, "Hello-0".toByteArray())
val appScore = mutableMapOf<PeerId, Double>().withDefault { 0.0 }
val coreParams = GossipParams(3, 3, 3, floodPublish = false)
val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = message.protobufMessage.data.size() - 1)
val peerScoreParams = GossipPeerScoreParams(appSpecificScore = { appScore.getValue(it) })
val scoreParams = GossipScoreParams(peerScoreParams = peerScoreParams)
val test = ManyRoutersTest(params = coreParams, scoreParams = scoreParams)
Expand All @@ -545,7 +546,7 @@ class GossipV1_1Tests : GossipTestsBase() {
val topicMesh = test.gossipRouter.mesh["topic1"]!!
assertTrue(topicMesh.size > 0 && topicMesh.size < test.routers.size)

test.gossipRouter.publish(newMessage("topic1", 0L, "Hello-0".toByteArray()))
test.gossipRouter.publish(message)

test.fuzz.timeController.addTime(50.millis)

Expand All @@ -557,8 +558,9 @@ class GossipV1_1Tests : GossipTestsBase() {

@Test
fun testFloodPublish() {
val message = newMessage("topic1", 0L, "Hello-0".toByteArray())
val appScore = mutableMapOf<PeerId, Double>().withDefault { 0.0 }
val coreParams = GossipParams(3, 3, 3, floodPublish = true)
val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = message.protobufMessage.data.size())
val peerScoreParams = GossipPeerScoreParams(
appSpecificScore = { appScore.getValue(it) },
appSpecificWeight = 1.0
Expand All @@ -580,7 +582,7 @@ class GossipV1_1Tests : GossipTestsBase() {
val topicMesh = test.gossipRouter.mesh["topic1"]!!.map { it.peerId }
assertTrue(topicMesh.size > 0 && topicMesh.size < test.routers.size)

test.gossipRouter.publish(newMessage("topic1", 0L, "Hello-0".toByteArray()))
test.gossipRouter.publish(message)

test.fuzz.timeController.addTime(50.millis)

Expand Down Expand Up @@ -650,7 +652,7 @@ class GossipV1_1Tests : GossipTestsBase() {
3,
3,
DLazy = 3,
floodPublish = false,
floodPublishMaxMessageSizeThreshold = 0,
gossipFactor = 0.5
)
val peerScoreParams = GossipPeerScoreParams(
Expand Down Expand Up @@ -714,7 +716,7 @@ class GossipV1_1Tests : GossipTestsBase() {
@Test
fun testOutboundMeshQuotas1() {
val appScore = mutableMapOf<PeerId, Double>().withDefault { 0.0 }
val coreParams = GossipParams(3, 3, 3, DLazy = 3, DOut = 1, floodPublish = false)
val coreParams = GossipParams(3, 3, 3, DLazy = 3, DOut = 1, floodPublishMaxMessageSizeThreshold = 0)
val peerScoreParams = GossipPeerScoreParams(appSpecificScore = { appScore.getValue(it) })
val scoreParams = GossipScoreParams(peerScoreParams = peerScoreParams)
val test = ManyRoutersTest(params = coreParams, scoreParams = scoreParams)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow

class SubscriptionsLimitTest : TwoGossipHostTestBase() {
override val params = GossipParams(maxSubscriptions = 5, floodPublish = true)
override val params = GossipParams(maxSubscriptions = 5, floodPublishMaxMessageSizeThreshold = 0)

@Test
fun `new peer subscribed to many topics`() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ val Eth2DefaultGossipParams = GossipParams(
DLazy = 8,

pruneBackoff = 1.minutes,
floodPublish = true,
floodPublishMaxMessageSizeThreshold = 0,
gossipFactor = 0.25,
DScore = 4,
DOut = 2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ val Eth2DefaultGossipParams = GossipParams(
DLazy = 8,

pruneBackoff = 1.minutes,
floodPublish = true,
floodPublishMaxMessageSizeThreshold = 16384,
gossipFactor = 0.25,
DScore = 4,
DOut = 2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class BlobDecouplingSimulation(
val randomSeed: Long = 3L,
val rnd: Random = Random(randomSeed),

val floodPublish: Boolean = true,
val floodPublishMaxMessageSizeThreshold: Int = 0,

val sendingPeerBand: Bandwidth = Bandwidth.mbitsPerSec(100),

Expand Down Expand Up @@ -85,7 +85,7 @@ class BlobDecouplingSimulation(
val gossipParams = Eth2DefaultGossipParams
.copy(
// heartbeatInterval = 1.minutes
floodPublish = floodPublish
floodPublishMaxMessageSizeThreshold = floodPublishMaxMessageSizeThreshold
)
val gossipScoreParams = Eth2DefaultScoreParams
val gossipRouterCtor = { _: Int ->
Expand Down Expand Up @@ -294,7 +294,7 @@ fun main() {
// logger = {},
nodeCount = 1000,
peerBands = band,
floodPublish = false,
floodPublishMaxMessageSizeThreshold = 0,
// randomSeed = 2
)

Expand Down

0 comments on commit 350112d

Please sign in to comment.