Skip to content

Commit

Permalink
better implem
Browse files Browse the repository at this point in the history
  • Loading branch information
Menduist committed Oct 21, 2021
1 parent 0ec65c1 commit cf04783
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 12 deletions.
11 changes: 6 additions & 5 deletions libp2p/protocols/pubsub/pubsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@ type
anonymize*: bool # if we omit fromPeer and seqno from RPC messages we send
subscriptionValidator*: SubscriptionValidator # callback used to validate subscriptions
topicsHigh*: int # the maximum number of topics a peer is allowed to subscribe to
maxRecvMessageSize*: int ##\
maxMessageSize*: int ##\
## the maximum raw message size we'll globally allow
## for finer tuning, check message size on topic validator
##
## sending a big message to a peer with a lower size limit can
## lead to issues, from descoring to connection drops
##
## defaults to 64kB
## defaults to 1mB

knownTopics*: HashSet[string]

Expand Down Expand Up @@ -291,7 +291,7 @@ proc getOrCreatePeer*(
p.onPubSubPeerEvent(peer, event)

# create new pubsub peer
let pubSubPeer = PubSubPeer.new(peerId, getConn, dropConn, onEvent, protos[0], p.maxRecvMessageSize)
let pubSubPeer = PubSubPeer.new(peerId, getConn, dropConn, onEvent, protos[0], p.maxMessageSize)
debug "created new pubsub peer", peerId

p.peers[peerId] = pubSubPeer
Expand Down Expand Up @@ -481,8 +481,6 @@ method initPubSub*(p: PubSub)
p.observers = new(seq[PubSubObserver])
if p.msgIdProvider == nil:
p.msgIdProvider = defaultMsgIdProvider
if p.maxRecvMessageSize == 0:
p.maxRecvMessageSize = 64 * 1024

method start*(p: PubSub) {.async, base.} =
## start pubsub
Expand Down Expand Up @@ -548,6 +546,7 @@ proc init*[PubParams: object | bool](
sign: bool = true,
msgIdProvider: MsgIdProvider = defaultMsgIdProvider,
subscriptionValidator: SubscriptionValidator = nil,
maxMessageSize: int = 1024 * 1024,
parameters: PubParams = false): P
{.raises: [Defect, InitializationError].} =
let pubsub =
Expand All @@ -560,6 +559,7 @@ proc init*[PubParams: object | bool](
sign: sign,
msgIdProvider: msgIdProvider,
subscriptionValidator: subscriptionValidator,
maxMessageSize: maxMessageSize,
topicsHigh: int.high)
else:
P(switch: switch,
Expand All @@ -571,6 +571,7 @@ proc init*[PubParams: object | bool](
msgIdProvider: msgIdProvider,
subscriptionValidator: subscriptionValidator,
parameters: parameters,
maxMessageSize: maxMessageSize,
topicsHigh: int.high)

proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} =
Expand Down
16 changes: 10 additions & 6 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type
score*: float64
iWantBudget*: int
iHaveBudget*: int
maxRecvMessageSize: int
maxMessageSize: int
appScore*: float64 # application specific score
behaviourPenalty*: float64 # the eventual penalty score

Expand Down Expand Up @@ -120,7 +120,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
while not conn.atEof:
trace "waiting for data", conn, peer = p, closed = conn.closed

var data = await conn.readLp(p.maxRecvMessageSize)
var data = await conn.readLp(p.maxMessageSize)
trace "read data from peer",
conn, peer = p, closed = conn.closed,
data = data.shortLog
Expand Down Expand Up @@ -244,6 +244,10 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [Defect].} =
debug "empty message, skipping", p, msg = shortLog(msg)
return

if msg.len > p.maxMessageSize:
info "trying to send a too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
return

let conn = p.sendConn
if conn == nil or conn.closed():
trace "No send connection, skipping message", p, msg = shortLog(msg)
Expand Down Expand Up @@ -282,15 +286,15 @@ proc new*(
dropConn: DropConn,
onEvent: OnEvent,
codec: string,
maxRecvMessageSize: int): T =
maxMessageSize: int): T =

T(
getConn: getConn,
dropConn: dropConn,
onEvent: onEvent,
codec: codec,
peerId: peerId,
maxRecvMessageSize: maxRecvMessageSize
maxMessageSize: maxMessageSize
)

proc newPubSubPeer*(
Expand All @@ -299,13 +303,13 @@ proc newPubSubPeer*(
dropConn: DropConn,
onEvent: OnEvent,
codec: string,
maxRecvMessageSize: int): PubSubPeer {.deprecated: "use PubSubPeer.new".} =
maxMessageSize: int): PubSubPeer {.deprecated: "use PubSubPeer.new".} =

PubSubPeer.new(
peerId,
getConn,
dropConn,
onEvent,
codec,
maxRecvMessageSize
maxMessageSize
)
52 changes: 52 additions & 0 deletions tests/pubsub/testfloodsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -382,3 +382,55 @@ suite "FloodSub":
it.switch.stop())))

await allFuturesThrowing(nodesFut)

asyncTest "FloodSub message size validation":
var messageReceived = 0
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check data.len < 50
inc(messageReceived)

let
bigNode = generateNodes(1)
smallNode = generateNodes(1, maxMessageSize = 200)

# start switches
nodesFut = await allFinished(
bigNode[0].switch.start(),
smallNode[0].switch.start(),
)

# start pubsubcon
await allFuturesThrowing(
allFinished(
bigNode[0].start(),
smallNode[0].start(),
))

await subscribeNodes(bigNode & smallNode)
bigNode[0].subscribe("foo", handler)
smallNode[0].subscribe("foo", handler)
await waitSub(bigNode[0], smallNode[0], "foo")

let
bigMessage = newSeq[byte](1000)
smallMessage = newSeq[byte](10)

check (await smallNode[0].publish("foo", smallMessage)) > 0
check (await bigNode[0].publish("foo", smallMessage)) > 0

check (await smallNode[0].publish("foo", bigMessage)) > 0
check (await bigNode[0].publish("foo", bigMessage)) > 0

await allFuturesThrowing(
smallNode[0].switch.stop(),
bigNode[0].switch.stop()
)

check messageReceived == 2

await allFuturesThrowing(
smallNode[0].stop(),
bigNode[0].stop()
)

await allFuturesThrowing(nodesFut)
5 changes: 4 additions & 1 deletion tests/pubsub/utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ proc generateNodes*(
triggerSelf: bool = false,
verifySignature: bool = libp2p_pubsub_verify,
anonymize: bool = libp2p_pubsub_anonymize,
sign: bool = libp2p_pubsub_sign): seq[PubSub] =
sign: bool = libp2p_pubsub_sign,
maxMessageSize: int = 1024 * 1024): seq[PubSub] =

for i in 0..<num:
let switch = newStandardSwitch(secureManagers = secureManagers)
Expand All @@ -38,6 +39,7 @@ proc generateNodes*(
sign = sign,
msgIdProvider = msgIdProvider,
anonymize = anonymize,
maxMessageSize = maxMessageSize,
parameters = (var p = GossipSubParams.init(); p.floodPublish = false; p.historyLength = 20; p.historyGossip = 20; p))
# set some testing params, to enable scores
g.topicParams.mgetOrPut("foobar", TopicParams.init()).topicWeight = 1.0
Expand All @@ -51,6 +53,7 @@ proc generateNodes*(
verifySignature = verifySignature,
sign = sign,
msgIdProvider = msgIdProvider,
maxMessageSize = maxMessageSize,
anonymize = anonymize).PubSub

switch.mount(pubsub)
Expand Down

0 comments on commit cf04783

Please sign in to comment.