Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ abstract class AbstractRouter(
processControl(msg.control, peer)
}

if (protocol.supportsExtensions()) {
// TODO we need to handle the existence of extension messages more generically (https://github.com/libp2p/jvm-libp2p/issues/441)

if (protocol.supportsExtensions() && (msg.hasTestExtension() || msg.hasPartial())) {
processExtensions(msg, peer)
}

Expand Down
4 changes: 4 additions & 0 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import io.libp2p.pubsub.PubsubApiImpl
import io.libp2p.pubsub.PubsubProtocol
import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder
import io.netty.channel.ChannelHandler
import org.slf4j.LoggerFactory
import java.util.concurrent.CompletableFuture

class Gossip @JvmOverloads constructor(
Expand All @@ -21,6 +22,8 @@ class Gossip @JvmOverloads constructor(
) :
ProtocolBinding<Unit>, ConnectionHandler, PubsubApi by api {

private val logger = LoggerFactory.getLogger(Gossip::class.java)

fun updateTopicScoreParams(scoreParams: Map<String, GossipTopicScoreParams>) {
router.score.updateTopicParams(scoreParams)
}
Expand Down Expand Up @@ -62,6 +65,7 @@ class Gossip @JvmOverloads constructor(
}

override fun initChannel(ch: P2PChannel, selectedProtocol: String): CompletableFuture<out Unit> {
logger.trace("Gossip initChannel - selected protocol: {}", selectedProtocol)
router.addPeerWithDebugHandler(ch as Stream, debugGossipHandler)
return CompletableFuture.completedFuture(Unit)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.libp2p.pubsub.gossip

import io.libp2p.core.PeerId
import pubsub.pb.Rpc

class GossipExtensionsState {

/*
Tracks the peers that we have already sent a control extensions message
*/
private val outgoingControlExtensionsMsgPeers: MutableSet<PeerId> = mutableSetOf()

/*
Tracks peers that already sent us a control extensions message
*/
private val peerExtensionSupportMap: MutableMap<PeerId, Rpc.ControlExtensions> = mutableMapOf()

fun onPeerDisconnected(peer: PeerId) {
outgoingControlExtensionsMsgPeers.remove(peer)
peerExtensionSupportMap.remove(peer)
}

fun onControlExtensionsMessage(ctrlExtensions: Rpc.ControlExtensions, receivedFrom: PeerId) {
peerExtensionSupportMap[receivedFrom] = ctrlExtensions
}

fun registerControlExtensionMessageSentToPeers(peerId: PeerId) {
outgoingControlExtensionsMsgPeers.add(peerId)
}

fun peerSupportedExtensions(peerId: PeerId) = peerExtensionSupportMap[peerId]

fun hasReceivedControlExtensionsFrom(peer: PeerId) =
peerExtensionSupportMap.contains(peer)

fun hasSentControlExtensionsTo(peer: PeerId) =
outgoingControlExtensionsMsgPeers.contains(peer)
}
85 changes: 69 additions & 16 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ open class GossipRouter(
private val acceptRequestsWhitelist = mutableMapOf<PeerHandler, AcceptRequestsWhitelistEntry>()
override val pendingRpcParts = PendingRpcPartsMap<GossipRpcPartsQueue> { DefaultGossipRpcPartsQueue(params) }

private val peerExtensionSupportMap = mutableMapOf<PeerId, Rpc.ControlExtensions>()
val gossipExtensionsState = GossipExtensionsState()

private fun setBackOff(peer: PeerHandler, topic: Topic) = setBackOff(peer, topic, params.pruneBackoff.toMillis())
private fun setBackOff(peer: PeerHandler, topic: Topic, delay: Long) {
Expand All @@ -159,13 +159,15 @@ open class GossipRouter(
fanout.values.forEach { it.remove(peer) }
acceptRequestsWhitelist -= peer
pendingRpcParts.popQueue(peer) // discard them
gossipExtensionsState.onPeerDisconnected(peer.peerId)
super.onPeerDisconnected(peer)
}

override fun onPeerActive(peer: PeerHandler) {
super.onPeerActive(peer)
eventBroadcaster.notifyConnected(peer.peerId, peer.getRemoteAddress())
heartbeatTask.hashCode() // force lazy initialization
sendControlExtensions(peer)
}

override fun notifyUnseenMessage(peer: PeerHandler, msg: PubsubMessage) {
Expand Down Expand Up @@ -398,34 +400,56 @@ open class GossipRouter(
) {
logger.trace("Received control extension {}", ctrlExtensions.toString())

if (peerExtensionSupportMap[receivedFrom.peerId] != null) {
// TODO Should downscore peers that send control extension multiple times? (https://github.com/libp2p/jvm-libp2p/issues/437)
if (gossipExtensionsState.hasReceivedControlExtensionsFrom(receivedFrom.peerId)) {
// TODO Should disconnect peers that send control extension multiple times (https://github.com/libp2p/jvm-libp2p/issues/437)
logger.trace(
"Received another control extension message from peer {}",
receivedFrom.peerId
)
return
} else {
peerExtensionSupportMap[receivedFrom.peerId] = ctrlExtensions
gossipExtensionsState.onControlExtensionsMessage(ctrlExtensions, receivedFrom.peerId)
}
}

override fun processExtensions(msg: Rpc.RPC, receivedFrom: PeerHandler) {
val peerSupportedExtensions = peerExtensionSupportMap[receivedFrom.peerId]
if (peerSupportedExtensions == null) {
val peerSupportedExtensions =
gossipExtensionsState.peerSupportedExtensions(receivedFrom.peerId)

// TODO Revisit this logic as part of adding feature flags (https://github.com/libp2p/jvm-libp2p/issues/441)

when {
msg.hasTestExtension() && checkPeerExtensionSupport(
peerSupportedExtensions,
Rpc.ControlExtensions::hasTestExtension
) ->
processTestExtensionMessage(msg.testExtension, receivedFrom)

msg.hasPartial() && checkPeerExtensionSupport(
peerSupportedExtensions,
Rpc.ControlExtensions::hasPartialMessages
) ->
processPartialMessageExtension(msg.partial, receivedFrom)
}
}

private fun checkPeerExtensionSupport(
peerSavedPreferences: Rpc.ControlExtensions?,
checkSupportFunction: (Rpc.ControlExtensions) -> Boolean
): Boolean {
if (peerSavedPreferences == null) {
return false
}

if (!checkSupportFunction.invoke(peerSavedPreferences)) {
logger.trace(
"Ignoring extension messages from peer {} - did it send an extension control message?",
receivedFrom.peerId
"Ignoring extension messages from peer {} - did it send an control extensions message?",
peerSavedPreferences
)
} else {
when {
peerSupportedExtensions.hasTestExtension() && msg.hasTestExtension() ->
processTestExtensionMessage(msg.testExtension, receivedFrom)

peerSupportedExtensions.hasPartialMessages() && msg.hasPartial() ->
processPartialMessageExtension(msg.partial, receivedFrom)
}
return false
}

return true
}

private fun processTestExtensionMessage(
Expand Down Expand Up @@ -578,6 +602,8 @@ open class GossipRouter(
fanout -= topic
lastPublished -= topic
}

activePeers.forEach { sendControlExtensions(it) }
}

override fun unsubscribe(topic: Topic) {
Expand Down Expand Up @@ -778,6 +804,33 @@ open class GossipRouter(
send(peer, iDontWant)
}

private fun sendControlExtensions(peer: PeerHandler) {
if (!this.protocol.supportsExtensions()) {
logger.trace(
"Protocol does not support extensions. Won't send control extensions message."
)
return
}

if (gossipExtensionsState.hasSentControlExtensionsTo(peer.peerId)) {
logger.trace(
"Already sent control extensions msg to peer {}. Won't send another one.",
peer.peerId
)
return
}

logger.trace("Sending control extensions message to peer {}", peer.peerId)

pendingRpcParts.getQueue(peer).addControlExtensions(
Rpc.ControlExtensions.newBuilder()
.setTestExtension(true)
.setPartialMessages(true)
.build()
)
gossipExtensionsState.registerControlExtensionMessageSentToPeers(peer.peerId)
}

data class AcceptRequestsWhitelistEntry(val whitelistedTill: Long, val messagesAccepted: Int = 0) {
fun incrementMessageCount() = AcceptRequestsWhitelistEntry(whitelistedTill, messagesAccepted + 1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ interface GossipRpcPartsQueue : RpcPartsQueue {
* Gossip 1.1 variant
*/
fun addPrune(topic: Topic, backoffSeconds: Long, backoffPeers: List<PeerId>)

// TODO Need to check if we should handle when control extension and extension messages could be separated by split (https://github.com/libp2p/jvm-libp2p/issues/440)
fun addControlExtensions(ctrlMessage: Rpc.ControlExtensions)
}

/**
Expand Down Expand Up @@ -81,6 +84,12 @@ open class DefaultGossipRpcPartsQueue(
}
}

protected data class ControlExtensionPart(val ctrlExtension: Rpc.ControlExtensions) : AbstractPart {
override fun appendToBuilder(builder: Rpc.RPC.Builder) {
builder.controlBuilder.setExtensions(ctrlExtension)
}
}

override fun addIHave(messageId: MessageId, topic: Topic) {
addPart(IHavePart(messageId, topic))
}
Expand All @@ -101,6 +110,10 @@ open class DefaultGossipRpcPartsQueue(
addPart(PrunePart(topic, backoffSeconds, backoffPeers))
}

override fun addControlExtensions(ctrlMessage: Rpc.ControlExtensions) {
addPart(ControlExtensionPart(ctrlMessage))
}

override fun takeMerged(): List<Rpc.RPC> {
val ret = mutableListOf<Rpc.RPC>()
var partIdx = 0
Expand Down
Loading
Loading