Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add maxMessageSize option to pubsub #634

Merged
merged 11 commits into from
Oct 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion libp2p/protocols/pubsub/pubsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ type
anonymize*: bool # if we omit fromPeer and seqno from RPC messages we send
subscriptionValidator*: SubscriptionValidator # callback used to validate subscriptions
topicsHigh*: int # the maximum number of topics a peer is allowed to subscribe to
maxMessageSize*: int ##\
## the maximum raw message size we'll globally allow
## for finer tuning, check message size on topic validator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still needs a deeper explanation, for the risk of getting descored on networks with different limits

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But note that this limit doesn't apply to sending message, even with a low maxRecvMessageSize, you could still send big messages

##
## sending a big message to a peer with a lower size limit can
## lead to issues, from descoring to connection drops
##
## defaults to 1mB
rng*: ref BrHmacDrbgContext

knownTopics*: HashSet[string]
Expand Down Expand Up @@ -285,7 +293,7 @@ proc getOrCreatePeer*(
p.onPubSubPeerEvent(peer, event)

# create new pubsub peer
let pubSubPeer = PubSubPeer.new(peerId, getConn, dropConn, onEvent, protos[0])
let pubSubPeer = PubSubPeer.new(peerId, getConn, dropConn, onEvent, protos[0], p.maxMessageSize)
debug "created new pubsub peer", peerId

p.peers[peerId] = pubSubPeer
Expand Down Expand Up @@ -540,6 +548,7 @@ proc init*[PubParams: object | bool](
sign: bool = true,
msgIdProvider: MsgIdProvider = defaultMsgIdProvider,
subscriptionValidator: SubscriptionValidator = nil,
maxMessageSize: int = 1024 * 1024,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is the 1mb limit from? we used to have 64 kb - ideally we'd use the same limit as "Defaults" in other impls

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spec half-says it, and it's the default in go-libp2p.
I would have kept it to 64 kB to avoid breaking changes, but both nim-waku and nimbus wants 1mb, so let's harmonize with the other libp2ps :)

rng: ref BrHmacDrbgContext = newRng(),
parameters: PubParams = false): P
{.raises: [Defect, InitializationError].} =
Expand All @@ -553,6 +562,7 @@ proc init*[PubParams: object | bool](
sign: sign,
msgIdProvider: msgIdProvider,
subscriptionValidator: subscriptionValidator,
maxMessageSize: maxMessageSize,
rng: rng,
topicsHigh: int.high)
else:
Expand All @@ -565,6 +575,7 @@ proc init*[PubParams: object | bool](
msgIdProvider: msgIdProvider,
subscriptionValidator: subscriptionValidator,
parameters: parameters,
maxMessageSize: maxMessageSize,
rng: rng,
topicsHigh: int.high)

Expand Down
11 changes: 9 additions & 2 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type
score*: float64
iWantBudget*: int
iHaveBudget*: int
maxMessageSize: int
appScore*: float64 # application specific score
behaviourPenalty*: float64 # the eventual penalty score

Expand Down Expand Up @@ -119,7 +120,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
while not conn.atEof:
trace "waiting for data", conn, peer = p, closed = conn.closed

var data = await conn.readLp(64 * 1024)
var data = await conn.readLp(p.maxMessageSize)
trace "read data from peer",
conn, peer = p, closed = conn.closed,
data = data.shortLog
Expand Down Expand Up @@ -243,6 +244,10 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [Defect].} =
debug "empty message, skipping", p, msg = shortLog(msg)
return

if msg.len > p.maxMessageSize:
info "trying to send a too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
return

let conn = p.sendConn
if conn == nil or conn.closed():
trace "No send connection, skipping message", p, msg = shortLog(msg)
Expand Down Expand Up @@ -280,12 +285,14 @@ proc new*(
getConn: GetConn,
dropConn: DropConn,
onEvent: OnEvent,
codec: string): T =
codec: string,
maxMessageSize: int): T =

T(
getConn: getConn,
dropConn: dropConn,
onEvent: onEvent,
codec: codec,
peerId: peerId,
maxMessageSize: maxMessageSize
)
54 changes: 54 additions & 0 deletions tests/pubsub/testfloodsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -382,3 +382,57 @@ suite "FloodSub":
it.switch.stop())))

await allFuturesThrowing(nodesFut)

asyncTest "FloodSub message size validation":
var messageReceived = 0
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check data.len < 50
inc(messageReceived)

let
bigNode = generateNodes(1)
smallNode = generateNodes(1, maxMessageSize = 200)

# start switches
nodesFut = await allFinished(
bigNode[0].switch.start(),
smallNode[0].switch.start(),
)

# start pubsubcon
await allFuturesThrowing(
allFinished(
bigNode[0].start(),
smallNode[0].start(),
))

await subscribeNodes(bigNode & smallNode)
bigNode[0].subscribe("foo", handler)
smallNode[0].subscribe("foo", handler)
await waitSub(bigNode[0], smallNode[0], "foo")

let
bigMessage = newSeq[byte](1000)
smallMessage1 = @[1.byte]
smallMessage2 = @[3.byte]

# Need two different messages, otherwise they are the same when anonymized
check (await smallNode[0].publish("foo", smallMessage1)) > 0
check (await bigNode[0].publish("foo", smallMessage2)) > 0

check (await checkExpiring(messageReceived == 2)) == true

check (await smallNode[0].publish("foo", bigMessage)) > 0
check (await bigNode[0].publish("foo", bigMessage)) > 0

await allFuturesThrowing(
smallNode[0].switch.stop(),
bigNode[0].switch.stop()
)

await allFuturesThrowing(
smallNode[0].stop(),
bigNode[0].stop()
)

await allFuturesThrowing(nodesFut)
2 changes: 1 addition & 1 deletion tests/pubsub/testgossipinternal.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ proc getPubSubPeer(p: TestGossipSub, peerId: PeerID): PubSubPeer =
proc dropConn(peer: PubSubPeer) =
discard # we don't care about it here yet

let pubSubPeer = PubSubPeer.new(peerId, getConn, dropConn, nil, GossipSubCodec)
let pubSubPeer = PubSubPeer.new(peerId, getConn, dropConn, nil, GossipSubCodec, 1024 * 1024)
debug "created new pubsub peer", peerId

p.peers[peerId] = pubSubPeer
Expand Down
5 changes: 4 additions & 1 deletion tests/pubsub/utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ proc generateNodes*(
triggerSelf: bool = false,
verifySignature: bool = libp2p_pubsub_verify,
anonymize: bool = libp2p_pubsub_anonymize,
sign: bool = libp2p_pubsub_sign): seq[PubSub] =
sign: bool = libp2p_pubsub_sign,
maxMessageSize: int = 1024 * 1024): seq[PubSub] =

for i in 0..<num:
let switch = newStandardSwitch(secureManagers = secureManagers)
Expand All @@ -38,6 +39,7 @@ proc generateNodes*(
sign = sign,
msgIdProvider = msgIdProvider,
anonymize = anonymize,
maxMessageSize = maxMessageSize,
parameters = (var p = GossipSubParams.init(); p.floodPublish = false; p.historyLength = 20; p.historyGossip = 20; p))
# set some testing params, to enable scores
g.topicParams.mgetOrPut("foobar", TopicParams.init()).topicWeight = 1.0
Expand All @@ -51,6 +53,7 @@ proc generateNodes*(
verifySignature = verifySignature,
sign = sign,
msgIdProvider = msgIdProvider,
maxMessageSize = maxMessageSize,
anonymize = anonymize).PubSub

switch.mount(pubsub)
Expand Down