Skip to content

Commit

Permalink
Prepare for handling uncompressed Gossip MessageId (#155)
Browse files Browse the repository at this point in the history
* Initial refactor: replace Rpc.Message with abstract PubsubMessage which is created by pluggable messageFactory.
messageFactory replaces messageIdGenerator because of PubsubMessage.messageId property
* Convert messageId from String to byte[] (see spec PR: libp2p/specs#285)
* Use WBytes ByteArray wrapper instead of plain ByteArray because of equals/hashCode absence
* Replace AbstractRouter.seenMessages with a special SeenCache interface which is capable of handling PubsubMessage.messageId in a lazy fashion
  • Loading branch information
Nashatyrev authored Nov 10, 2020
1 parent ec43a5b commit eb0303a
Show file tree
Hide file tree
Showing 25 changed files with 920 additions and 210 deletions.
4 changes: 4 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ dependencies {
testImplementation("org.junit.jupiter:junit-jupiter-params:5.4.2")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.4.2")
testImplementation("io.mockk:mockk:1.10.0")
testRuntimeOnly("org.mockito:mockito-core:3.3.3")
testImplementation("org.mockito:mockito-junit-jupiter:3.3.3")
testImplementation("org.assertj:assertj-core:3.16.1")

}

sourceSets {
Expand Down
4 changes: 4 additions & 0 deletions src/main/kotlin/io/libp2p/core/pubsub/PubsubApi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.libp2p.core.pubsub
import io.libp2p.core.PeerId
import io.libp2p.core.crypto.PrivKey
import io.libp2p.pubsub.PubsubApiImpl
import io.libp2p.pubsub.PubsubMessage
import io.libp2p.pubsub.PubsubRouter
import io.netty.buffer.ByteBuf
import java.util.concurrent.CompletableFuture
Expand Down Expand Up @@ -165,6 +166,9 @@ interface PubsubApi : PubsubSubscriberApi {
* Abstract Pubsub Message API
*/
interface MessageApi {

val originalMessage: PubsubMessage

/**
* Message body
*/
Expand Down
34 changes: 34 additions & 0 deletions src/main/kotlin/io/libp2p/etc/types/WBytes.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.libp2p.etc.types

import com.google.protobuf.ByteString

/**
* `ByteArray` wrapper with `equals()`, `hashCode()` and `toString()`
*/
class WBytes(val array: ByteArray) {

operator fun plus(other: WBytes) = (array + other.array).toWBytes()
operator fun plus(other: ByteArray) = (array + other).toWBytes()

override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false

other as WBytes

if (!array.contentEquals(other.array)) return false

return true
}

override fun hashCode(): Int {
return array.contentHashCode()
}

override fun toString() = array.toHex()
}

fun ByteArray.toWBytes() = WBytes(this)
fun String.toWBytes() = this.fromHex().toWBytes()
fun WBytes.toProtobuf() = this.array.toProtobuf()
fun ByteString.toWBytes() = this.toByteArray().toWBytes()
2 changes: 1 addition & 1 deletion src/main/kotlin/io/libp2p/etc/util/P2PService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ abstract class P2PService {
* to write data to the peer
*/
open inner class PeerHandler(val streamHandler: StreamHandler) {
val peerId = streamHandler.stream.remotePeerId()
open val peerId = streamHandler.stream.remotePeerId()
open fun writeAndFlush(msg: Any): CompletableFuture<Unit> = streamHandler.ctx!!.writeAndFlush(msg).toVoidCompletableFuture()
open fun isActive() = streamHandler.ctx != null
open fun getInboundHandler(): StreamHandler? = streamHandler
Expand Down
73 changes: 39 additions & 34 deletions src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ import io.libp2p.core.pubsub.ValidationResult
import io.libp2p.etc.types.MultiSet
import io.libp2p.etc.types.completedExceptionally
import io.libp2p.etc.types.copy
import io.libp2p.etc.types.createLRUMap
import io.libp2p.etc.types.forward
import io.libp2p.etc.types.lazyVarInit
import io.libp2p.etc.types.toHex
import io.libp2p.etc.types.toWBytes
import io.libp2p.etc.util.P2PServiceSemiDuplex
import io.netty.channel.ChannelHandler
import io.netty.handler.codec.protobuf.ProtobufDecoder
Expand All @@ -27,7 +26,13 @@ import java.util.concurrent.CompletableFuture
import java.util.function.BiConsumer
import java.util.function.Consumer

typealias MessageId = String
class DefaultPubsubMessage(override val protobufMessage: Rpc.Message) : PubsubMessage {
override val messageId: MessageId = protobufMessage.from.toWBytes() + protobufMessage.seqno.toWBytes()

override fun equals(other: Any?) = protobufMessage == (other as? PubsubMessage)?.protobufMessage
override fun hashCode() = protobufMessage.hashCode()
override fun toString() = "DefaultPubsubMessage{$protobufMessage}"
}

/**
* Implements common logic for pubsub routers
Expand All @@ -38,37 +43,37 @@ abstract class AbstractRouter : P2PServiceSemiDuplex(), PubsubRouter, PubsubRout
override var curTimeMillis: () -> Long by lazyVarInit { { System.currentTimeMillis() } }
override var random by lazyVarInit { Random() }
override var name: String = "router"
var messageIdGenerator: (Rpc.Message) -> MessageId =
{ it.from.toByteArray().toHex() + it.seqno.toByteArray().toHex() }

private val peerTopics = MultiSet<PeerHandler, String>()
private var msgHandler: (Rpc.Message) -> CompletableFuture<ValidationResult> = { RESULT_VALID }
override var messageFactory: PubsubMessageFactory = { DefaultPubsubMessage(it) }
var maxSeenMessagesLimit = 10000
var validator: PubsubMessageValidator = PubsubMessageValidator.nopValidator()
protected open val seenMessages by lazy {
createLRUMap<MessageId, Optional<ValidationResult>>(maxSeenMessagesLimit)

protected open val seenMessages: SeenCache<Optional<ValidationResult>> by lazy {
LRUSeenCache(SimpleSeenCache(), maxSeenMessagesLimit)
}

private val peerTopics = MultiSet<PeerHandler, String>()
private var msgHandler: (PubsubMessage) -> CompletableFuture<ValidationResult> = { RESULT_VALID }
override var messageValidator = NOP_ROUTER_VALIDATOR

val subscribedTopics = linkedSetOf<String>()
val pendingRpcParts = linkedMapOf<PeerHandler, MutableList<Rpc.RPC>>()
private var debugHandler: ChannelHandler? = null
private val pendingMessagePromises = MultiSet<PeerHandler, CompletableFuture<Unit>>()

protected fun getMessageId(msg: Rpc.Message): MessageId = messageIdGenerator(msg)

override fun publish(msg: Rpc.Message): CompletableFuture<Unit> {
override fun publish(msg: PubsubMessage): CompletableFuture<Unit> {
return submitAsyncOnEventThread {
if (getMessageId(msg) in seenMessages) {
if (msg in seenMessages) {
completedExceptionally(MessageAlreadySeenException("Msg: $msg"))
} else {
validator.validate(msg) // check ourselves not to be a bad peer
seenMessages[getMessageId(msg)] = Optional.of(ValidationResult.Valid)
messageValidator.validate(msg) // check ourselves not to be a bad peer
seenMessages[msg] = Optional.of(ValidationResult.Valid)
broadcastOutbound(msg)
}
}
}

protected open fun submitPublishMessage(toPeer: PeerHandler, msg: Rpc.Message): CompletableFuture<Unit> {
addPendingRpcPart(toPeer, Rpc.RPC.newBuilder().addPublish(msg).build())
protected open fun submitPublishMessage(toPeer: PeerHandler, msg: PubsubMessage): CompletableFuture<Unit> {
addPendingRpcPart(toPeer, Rpc.RPC.newBuilder().addPublish(msg.protobufMessage).build())
val sendPromise = CompletableFuture<Unit>()
pendingMessagePromises[toPeer] += sendPromise
return sendPromise
Expand Down Expand Up @@ -142,12 +147,12 @@ abstract class AbstractRouter : P2PServiceSemiDuplex(), PubsubRouter, PubsubRout
/**
* Broadcasts to peers validated unseen messages received from api
*/
protected abstract fun broadcastOutbound(msg: Rpc.Message): CompletableFuture<Unit>
protected abstract fun broadcastOutbound(msg: PubsubMessage): CompletableFuture<Unit>

/**
* Broadcasts to peers validated unseen messages received from another peer
*/
protected abstract fun broadcastInbound(msgs: List<Rpc.Message>, receivedFrom: PeerHandler)
protected abstract fun broadcastInbound(msgs: List<PubsubMessage>, receivedFrom: PeerHandler)

/**
* Processes Pubsub control message
Expand All @@ -165,11 +170,11 @@ abstract class AbstractRouter : P2PServiceSemiDuplex(), PubsubRouter, PubsubRout
}

protected open fun notifyMalformedMessage(peer: PeerHandler) {}
protected open fun notifyUnseenMessage(peer: PeerHandler, msg: Rpc.Message) {}
protected open fun notifyUnseenMessage(peer: PeerHandler, msg: PubsubMessage) {}
protected open fun notifyNonSubscribedMessage(peer: PeerHandler, msg: Rpc.Message) {}
protected open fun notifySeenMessage(peer: PeerHandler, msg: Rpc.Message, validationResult: Optional<ValidationResult>) {}
protected open fun notifyUnseenInvalidMessage(peer: PeerHandler, msg: Rpc.Message) {}
protected open fun notifyUnseenValidMessage(peer: PeerHandler, msg: Rpc.Message) {}
protected open fun notifySeenMessage(peer: PeerHandler, msg: PubsubMessage, validationResult: Optional<ValidationResult>) {}
protected open fun notifyUnseenInvalidMessage(peer: PeerHandler, msg: PubsubMessage) {}
protected open fun notifyUnseenValidMessage(peer: PeerHandler, msg: PubsubMessage) {}
protected open fun acceptRequestsFrom(peer: PeerHandler) = true

override fun onInbound(peer: PeerHandler, msg: Any) {
Expand All @@ -185,29 +190,29 @@ abstract class AbstractRouter : P2PServiceSemiDuplex(), PubsubRouter, PubsubRout

(msg.publishList - msgSubscribed).forEach { notifyNonSubscribedMessage(peer, it) }

val msgUnseen = msgSubscribed
val pMsgSubscribed = msgSubscribed.map { messageFactory(it) }
val msgUnseen = pMsgSubscribed
.filter { subscribedMessage ->
val messageId = getMessageId(subscribedMessage)
val validationResult = seenMessages[messageId]
val validationResult = seenMessages[subscribedMessage]
if (validationResult != null) {
// Message has been seen
notifySeenMessage(peer, subscribedMessage, validationResult)
notifySeenMessage(peer, seenMessages.getSeenMessage(subscribedMessage), validationResult)
false
} else {
// Message is unseen
seenMessages[messageId] = Optional.empty()
seenMessages[subscribedMessage] = Optional.empty()
notifyUnseenMessage(peer, subscribedMessage)
true
}
}

val msgValid = msgUnseen.filter {
try {
validator.validate(it)
messageValidator.validate(it)
true
} catch (e: Exception) {
logger.debug("Invalid pubsub message from peer $peer: $it", e)
seenMessages[getMessageId(it)] = Optional.of(ValidationResult.Invalid)
seenMessages[it] = Optional.of(ValidationResult.Invalid)
notifyUnseenInvalidMessage(peer, it)
false
}
Expand All @@ -221,7 +226,7 @@ abstract class AbstractRouter : P2PServiceSemiDuplex(), PubsubRouter, PubsubRout
validFuts.forEach { (msg, validationFut) ->
validationFut.thenAcceptAsync(
Consumer { res ->
seenMessages[getMessageId(msg)] = Optional.of(res)
seenMessages[msg] = Optional.of(res)
if (res == ValidationResult.Invalid) notifyUnseenInvalidMessage(peer, msg)
},
executor
Expand Down Expand Up @@ -260,7 +265,7 @@ abstract class AbstractRouter : P2PServiceSemiDuplex(), PubsubRouter, PubsubRout
}
}

private fun newValidatedMessages(msgs: List<Rpc.Message>, receivedFrom: PeerHandler) {
private fun newValidatedMessages(msgs: List<PubsubMessage>, receivedFrom: PeerHandler) {
msgs.forEach { notifyUnseenValidMessage(receivedFrom, it) }
broadcastInbound(msgs, receivedFrom)
}
Expand Down Expand Up @@ -346,7 +351,7 @@ abstract class AbstractRouter : P2PServiceSemiDuplex(), PubsubRouter, PubsubRout
return peer.writeAndFlush(msg)
}

override fun initHandler(handler: (Rpc.Message) -> CompletableFuture<ValidationResult>) {
override fun initHandler(handler: (PubsubMessage) -> CompletableFuture<ValidationResult>) {
msgHandler = handler
}
}
33 changes: 13 additions & 20 deletions src/main/kotlin/io/libp2p/pubsub/PubsubApiImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ open class PubsubApiImpl(val router: PubsubRouter) : PubsubApi {
msgToSign.setFrom(it)
}

return router.publish(sign(msgToSign.build()))
return router.publish(router.messageFactory(sign(msgToSign.build())))
}

private fun sign(msg: Rpc.Message) = if (privKey != null) pubsubSign(msg, privKey) else msg
Expand All @@ -74,9 +74,9 @@ open class PubsubApiImpl(val router: PubsubRouter) : PubsubApi {
}
}

private fun onNewMessage(msg: Rpc.Message): CompletableFuture<ValidationResult> {
private fun onNewMessage(msg: PubsubMessage): CompletableFuture<ValidationResult> {
val validationFuts = synchronized(this) {
msg.topicIDsList.mapNotNull { subscriptions[Topic(it)] }.flatten().distinct()
msg.topics.mapNotNull { subscriptions[Topic(it)] }.flatten().distinct()
}.map {
it.receiver.apply(rpc2Msg(msg))
}
Expand All @@ -86,17 +86,7 @@ open class PubsubApiImpl(val router: PubsubRouter) : PubsubApi {
}
}

private fun rpc2Msg(msg: Rpc.Message): MessageApi {
return MessageImpl(
msg.data.toByteArray().toByteBuf(),
if (msg.hasFrom()) msg.from.toByteArray()
else null,
if (msg.hasSeqno() && msg.seqno.size() >= 8)
msg.seqno.toByteArray().copyOfRange(0, 8).toLongBigEndian()
else null,
msg.topicIDsList.map { Topic(it) }
)
}
private fun rpc2Msg(msg: PubsubMessage) = MessageImpl(msg)

override fun subscribe(receiver: Validator, vararg topics: Topic): PubsubSubscription {
val subscription = SubscriptionImpl(topics, receiver)
Expand Down Expand Up @@ -145,9 +135,12 @@ open class PubsubApiImpl(val router: PubsubRouter) : PubsubApi {
PublisherImpl(privKey, seqIdGenerator)
}

class MessageImpl(
override val data: ByteBuf,
override val from: ByteArray?,
override val seqId: Long?,
override val topics: List<Topic>
) : MessageApi
class MessageImpl(override val originalMessage: PubsubMessage) : MessageApi {
private val msg = originalMessage.protobufMessage
override val data = msg.data.toByteArray().toByteBuf()
override val from = if (msg.hasFrom()) msg.from.toByteArray() else null
override val seqId = if (msg.hasSeqno() && msg.seqno.size() >= 8)
msg.seqno.toByteArray().copyOfRange(0, 8).toLongBigEndian()
else null
override val topics = msg.topicIDsList.map { Topic(it) }
}
45 changes: 0 additions & 45 deletions src/main/kotlin/io/libp2p/pubsub/PubsubMessageValidator.kt

This file was deleted.

Loading

0 comments on commit eb0303a

Please sign in to comment.