Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,7 @@ abstract class AbstractRouter(
processControl(msg.control, peer)
}

// TODO we need to handle the existence of extension messages more generically (https://github.com/libp2p/jvm-libp2p/issues/441)

if (protocol.supportsExtensions() && (msg.hasTestExtension() || msg.hasPartial())) {
if (protocol.supportsExtensions()) {
processExtensions(msg, peer)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,17 @@ package io.libp2p.pubsub.gossip
import io.libp2p.core.PeerId
import pubsub.pb.Rpc

class GossipExtensionsState {
data class GossipExtensionsConfig(
val partialMessagesEnabled: Boolean = false,
val testExtensionEnabled: Boolean = false
)

class GossipExtensionsState(gossipExtensionsConfig: GossipExtensionsConfig? = null) {

val localExtensionSupport: Rpc.ControlExtensions = Rpc.ControlExtensions.newBuilder()
.setTestExtension(gossipExtensionsConfig?.testExtensionEnabled ?: false)
.setPartialMessages(gossipExtensionsConfig?.partialMessagesEnabled ?: false)
.build()

/*
Tracks the peers that we have already sent a control extensions message
Expand Down Expand Up @@ -35,4 +45,12 @@ class GossipExtensionsState {

fun hasSentControlExtensionsTo(peer: PeerId) =
outgoingControlExtensionsMsgPeers.contains(peer)

fun testExtensionsEnabled() = localExtensionSupport.testExtension
fun peerSupportsTestExtensions(peerId: PeerId) =
peerExtensionSupportMap[peerId]?.testExtension == true

fun partialMessagesEnabled() = localExtensionSupport.partialMessages
fun peerSupportsPartialMessages(peerId: PeerId) =
peerExtensionSupportMap[peerId]?.partialMessages == true
}
75 changes: 38 additions & 37 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ open class GossipRouter(
val name: String,
val mCache: MCache,
val score: GossipScore,
val gossipExtensionsConfig: GossipExtensionsConfig = GossipExtensionsConfig(),

subscriptionTopicSubscriptionFilter: TopicSubscriptionFilter,
protocol: PubsubProtocol,
Expand Down Expand Up @@ -132,7 +133,7 @@ open class GossipRouter(
private val acceptRequestsWhitelist = mutableMapOf<PeerHandler, AcceptRequestsWhitelistEntry>()
override val pendingRpcParts = PendingRpcPartsMap<GossipRpcPartsQueue> { DefaultGossipRpcPartsQueue(params) }

val gossipExtensionsState = GossipExtensionsState()
val gossipExtensionsState = GossipExtensionsState(gossipExtensionsConfig)

private fun setBackOff(peer: PeerHandler, topic: Topic) = setBackOff(peer, topic, params.pruneBackoff.toMillis())
private fun setBackOff(peer: PeerHandler, topic: Topic, delay: Long) {
Expand Down Expand Up @@ -413,43 +414,47 @@ open class GossipRouter(
}

override fun processExtensions(msg: Rpc.RPC, receivedFrom: PeerHandler) {
val peerSupportedExtensions =
gossipExtensionsState.peerSupportedExtensions(receivedFrom.peerId)
when {
msg.hasTestExtension() -> {
if (!gossipExtensionsState.testExtensionsEnabled()) {
logger.trace(
"Ignoring test extension message from peer {} - test extension disabled",
msg
)
return
}

// TODO Revisit this logic as part of adding feature flags (https://github.com/libp2p/jvm-libp2p/issues/441)
if (!gossipExtensionsState.peerSupportsTestExtensions(receivedFrom.peerId)) {
logger.trace(
"Ignoring test extension message from peer {} - did peer send ControlExtensions prior?",
msg
)
return
}

when {
msg.hasTestExtension() && checkPeerExtensionSupport(
peerSupportedExtensions,
Rpc.ControlExtensions::hasTestExtension
) ->
processTestExtensionMessage(msg.testExtension, receivedFrom)
}

msg.hasPartial() && checkPeerExtensionSupport(
peerSupportedExtensions,
Rpc.ControlExtensions::hasPartialMessages
) ->
processPartialMessageExtension(msg.partial, receivedFrom)
}
}
msg.hasPartial() -> {
if (!gossipExtensionsState.partialMessagesEnabled()) {
logger.trace(
"Ignoring partial messages message from peer {} - partial messages extension disabled",
msg
)
return
}

private fun checkPeerExtensionSupport(
peerSavedPreferences: Rpc.ControlExtensions?,
checkSupportFunction: (Rpc.ControlExtensions) -> Boolean
): Boolean {
if (peerSavedPreferences == null) {
return false
}
if (!gossipExtensionsState.peerSupportsPartialMessages(receivedFrom.peerId)) {
logger.trace(
"Ignoring partial messages message from peer {} - did peer send ControlExtensions prior?",
msg
)
return
}

if (!checkSupportFunction.invoke(peerSavedPreferences)) {
logger.trace(
"Ignoring extension messages from peer {} - did it send an control extensions message?",
peerSavedPreferences
)
return false
processPartialMessageExtension(msg.partial, receivedFrom)
}
}

return true
}

private fun processTestExtensionMessage(
Expand Down Expand Up @@ -822,12 +827,8 @@ open class GossipRouter(

logger.trace("Sending control extensions message to peer {}", peer.peerId)

pendingRpcParts.getQueue(peer).addControlExtensions(
Rpc.ControlExtensions.newBuilder()
.setTestExtension(true)
.setPartialMessages(true)
.build()
)
pendingRpcParts.getQueue(peer)
.addControlExtensions(gossipExtensionsState.localExtensionSupport)
gossipExtensionsState.registerControlExtensionMessageSentToPeers(peer.peerId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ open class GossipRouterBuilder(
eventsSubscriber(gossipScore)
gossipScore
},
val gossipRouterEventListeners: MutableList<GossipRouterEventListener> = mutableListOf()
val gossipRouterEventListeners: MutableList<GossipRouterEventListener> = mutableListOf(),

var gossipExtensionsConfig: GossipExtensionsConfig = GossipExtensionsConfig()
) {

var seenCache: SeenCache<Optional<ValidationResult>> by lazyVar { TTLSeenCache(SimpleSeenCache(), params.seenTTL, currentTimeSuppluer) }
Expand All @@ -62,7 +64,8 @@ open class GossipRouterBuilder(
executor = scheduledAsyncExecutor,
messageFactory = messageFactory,
seenMessages = seenCache,
messageValidator = messageValidator
messageValidator = messageValidator,
gossipExtensionsConfig = gossipExtensionsConfig
)

router.eventBroadcaster.listeners += gossipRouterEventListeners
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ import io.libp2p.core.PeerId
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.MethodSource
import pubsub.pb.Rpc
import java.util.stream.Stream

class GossipExtensionsStateTest {

Expand All @@ -15,7 +19,9 @@ class GossipExtensionsStateTest {

@BeforeEach
fun setup() {
extensionsState = GossipExtensionsState()
extensionsState = GossipExtensionsState(
gossipExtensionsConfig = GossipExtensionsConfig(testExtensionEnabled = true)
)
peer1 = PeerId.random()
peer2 = PeerId.random()
peer3 = PeerId.random()
Expand Down Expand Up @@ -193,6 +199,23 @@ class GossipExtensionsStateTest {
assertThat(extensionsState.hasReceivedControlExtensionsFrom(peer3)).isTrue()
}

@Test
fun `tracks different peer extension support for partial messages`() {
val withPartial = Rpc.ControlExtensions.newBuilder()
.setPartialMessages(true)
.build()

val withoutPartial = Rpc.ControlExtensions.newBuilder()
.setPartialMessages(false)
.build()

extensionsState.onControlExtensionsMessage(withPartial, peer1)
extensionsState.onControlExtensionsMessage(withoutPartial, peer2)

assertThat(extensionsState.peerSupportsPartialMessages(peer1)).isTrue()
assertThat(extensionsState.peerSupportsPartialMessages(peer2)).isFalse()
}

@Test
fun `tracks many peers simultaneously`() {
val peers = (1..10).map { PeerId.random() }
Expand Down Expand Up @@ -355,4 +378,129 @@ class GossipExtensionsStateTest {
assertThat(extensionsState.hasSentControlExtensionsTo(peer1)).isFalse()
assertThat(extensionsState.peerSupportedExtensions(peer1)).isNull()
}

@Test
fun `peerSupportsTestExtensions returns true when peer has extension`() {
val extension = Rpc.ControlExtensions.newBuilder()
.setTestExtension(true)
.build()

extensionsState.onControlExtensionsMessage(extension, peer1)

assertThat(extensionsState.peerSupportsTestExtensions(peer1)).isTrue()
}

@Test
fun `peerSupportsTestExtensions returns false when peer doesn't have extension`() {
val extension = Rpc.ControlExtensions.newBuilder()
.setTestExtension(false)
.setPartialMessages(true)
.build()

extensionsState.onControlExtensionsMessage(extension, peer1)

assertThat(extensionsState.peerSupportsTestExtensions(peer1)).isFalse()
}

@Test
fun `peerSupportsTestExtensions returns false for unknown peer`() {
assertThat(extensionsState.peerSupportsTestExtensions(peer1)).isFalse()
}

@Test
fun `peerSupportsPartialMessages returns true when peer has extension`() {
val extension = Rpc.ControlExtensions.newBuilder()
.setPartialMessages(true)
.build()

extensionsState.onControlExtensionsMessage(extension, peer1)

assertThat(extensionsState.peerSupportsPartialMessages(peer1)).isTrue()
}

@Test
fun `peerSupportsPartialMessages returns false when peer doesn't have extension`() {
val extension = Rpc.ControlExtensions.newBuilder()
.setPartialMessages(false)
.setTestExtension(true)
.build()

extensionsState.onControlExtensionsMessage(extension, peer1)

assertThat(extensionsState.peerSupportsPartialMessages(peer1)).isFalse()
}

@Test
fun `peerSupportsPartialMessages returns false for unknown peer`() {
assertThat(extensionsState.peerSupportsPartialMessages(peer1)).isFalse()
}

@Test
fun `default config has both extensions disabled`() {
val state = GossipExtensionsState()

assertThat(state.testExtensionsEnabled()).isFalse()
assertThat(state.partialMessagesEnabled()).isFalse()
}

@ParameterizedTest
@MethodSource("gossipExtensionConfigParams")
fun `config flags combinations for all extensions`(
description: String,
testExtensionsEnabled: Boolean,
partialMessagesEnabled: Boolean
) {
val config = GossipExtensionsConfig(
testExtensionEnabled = testExtensionsEnabled,
partialMessagesEnabled = partialMessagesEnabled
)

assertThat(config.testExtensionEnabled).isEqualTo(testExtensionsEnabled)
.withFailMessage("expected $description")
assertThat(config.partialMessagesEnabled).isEqualTo(partialMessagesEnabled)
.withFailMessage("expected $description")
}

companion object {
@JvmStatic
fun gossipExtensionConfigParams(): Stream<Arguments> {
return Stream.of(
Arguments.of("both extensions enabled", true, true),
Arguments.of("only test extensions enabled", false, true),
Arguments.of("only partial messages enabled", true, false),
Arguments.of("both extensions disabled", false, false)
)
}
}

@Test
fun `localExtensionSupport field reflects config`() {
val state = GossipExtensionsState(
GossipExtensionsConfig(
testExtensionEnabled = true,
partialMessagesEnabled = false
)
)

val localSupport = state.localExtensionSupport
assertThat(localSupport.testExtension).isTrue()
assertThat(localSupport.partialMessages).isFalse()
}

@Test
fun `peer extension support cleared on disconnect`() {
val extension = Rpc.ControlExtensions.newBuilder()
.setTestExtension(true)
.setPartialMessages(true)
.build()

extensionsState.onControlExtensionsMessage(extension, peer1)
assertThat(extensionsState.peerSupportsTestExtensions(peer1)).isTrue()
assertThat(extensionsState.peerSupportsPartialMessages(peer1)).isTrue()

extensionsState.onPeerDisconnected(peer1)

assertThat(extensionsState.peerSupportsTestExtensions(peer1)).isFalse()
assertThat(extensionsState.peerSupportsPartialMessages(peer1)).isFalse()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.libp2p.pubsub.gossip

import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test

class GossipRouterBuilderTest {

@Test
fun `builds GossipRouter with both extensions disabled by default`() {
val router = GossipRouterBuilder().build()

assertThat(router.gossipExtensionsState.testExtensionsEnabled()).isFalse()
assertThat(router.gossipExtensionsState.partialMessagesEnabled()).isFalse()
}

@Test
fun `localExtensionSupport reflects config in built router`() {
val config = GossipExtensionsConfig(
testExtensionEnabled = true,
partialMessagesEnabled = false
)
val router = GossipRouterBuilder(gossipExtensionsConfig = config).build()

val localSupport = router.gossipExtensionsState.localExtensionSupport
assertThat(localSupport.testExtension).isTrue()
assertThat(localSupport.partialMessages).isFalse()
}
}
Loading
Loading