Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@ package io.libp2p.pubsub.gossip
import io.libp2p.core.PeerId
import pubsub.pb.Rpc

enum class GossipExtension {
// Canonical extensions
PARTIAL_MESSAGES,

// Non-canonical extensions
TEST_EXTENSION
}

data class GossipExtensionsConfig(
val partialMessagesEnabled: Boolean = false,
val testExtensionEnabled: Boolean = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,11 +402,11 @@ open class GossipRouter(
logger.trace("Received control extension {}", ctrlExtensions.toString())

if (gossipExtensionsState.hasReceivedControlExtensionsFrom(receivedFrom.peerId)) {
// TODO Should disconnect peers that send control extension multiple times (https://github.com/libp2p/jvm-libp2p/issues/437)
logger.trace(
"Received another control extension message from peer {}",
receivedFrom.peerId
)
notifyRouterMisbehavior(receivedFrom, 10)
return
} else {
gossipExtensionsState.onControlExtensionsMessage(ctrlExtensions, receivedFrom.peerId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ open class GossipRouterBuilder(
var scheduledAsyncExecutor: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
ThreadFactoryBuilder().setDaemon(true).setNameFormat("GossipRouter-event-thread-%d").build()
),
var currentTimeSuppluer: CurrentTimeSupplier = { System.currentTimeMillis() },
var currentTimeSupplier: CurrentTimeSupplier = { System.currentTimeMillis() },
var random: Random = Random(),

var messageFactory: PubsubMessageFactory = { DefaultPubsubMessage(it) },
Expand All @@ -33,28 +33,32 @@ open class GossipRouterBuilder(
var subscriptionTopicSubscriptionFilter: TopicSubscriptionFilter = TopicSubscriptionFilter.AllowAllTopicSubscriptionFilter(),

var scoreFactory: GossipScoreFactory =
{ scoreParams1, scheduledAsyncRxecutor, currentTimeSuppluer1, eventsSubscriber ->
val gossipScore = DefaultGossipScore(scoreParams1, scheduledAsyncRxecutor, currentTimeSuppluer1)
{ scoreParams1, scheduledAsyncRxecutor, currentTimeSupplier1, eventsSubscriber ->
val gossipScore = DefaultGossipScore(scoreParams1, scheduledAsyncRxecutor, currentTimeSupplier1)
eventsSubscriber(gossipScore)
gossipScore
},
val gossipRouterEventListeners: MutableList<GossipRouterEventListener> = mutableListOf(),

var gossipExtensionsConfig: GossipExtensionsConfig = GossipExtensionsConfig()
val enabledGossipExtensions: List<GossipExtension> = mutableListOf(),
) {

var seenCache: SeenCache<Optional<ValidationResult>> by lazyVar { TTLSeenCache(SimpleSeenCache(), params.seenTTL, currentTimeSuppluer) }
var seenCache: SeenCache<Optional<ValidationResult>> by lazyVar { TTLSeenCache(SimpleSeenCache(), params.seenTTL, currentTimeSupplier) }
var mCache: MCache by lazyVar { MCache(params.gossipSize, params.gossipHistoryLength) }

private var disposed = false

fun enabledGossipExtensions(vararg gossipExtensions: GossipExtension): GossipRouterBuilder {
(enabledGossipExtensions as MutableList).addAll(gossipExtensions)
return this
}

protected open fun createGossipRouter(): GossipRouter {
val gossipScore = scoreFactory(scoreParams, scheduledAsyncExecutor, currentTimeSuppluer, { gossipRouterEventListeners += it })
val gossipScore = scoreFactory(scoreParams, scheduledAsyncExecutor, currentTimeSupplier, { gossipRouterEventListeners += it })

val router = GossipRouter(
params = params,
scoreParams = scoreParams,
currentTimeSupplier = currentTimeSuppluer,
currentTimeSupplier = currentTimeSupplier,
random = random,
name = name,
mCache = mCache,
Expand All @@ -65,7 +69,7 @@ open class GossipRouterBuilder(
messageFactory = messageFactory,
seenMessages = seenCache,
messageValidator = messageValidator,
gossipExtensionsConfig = gossipExtensionsConfig
gossipExtensionsConfig = buildGossipExtensionsConfig(),
)

router.eventBroadcaster.listeners += gossipRouterEventListeners
Expand All @@ -77,4 +81,11 @@ open class GossipRouterBuilder(
disposed = true
return createGossipRouter()
}

private fun buildGossipExtensionsConfig(): GossipExtensionsConfig {
return GossipExtensionsConfig(
partialMessagesEnabled = enabledGossipExtensions.contains(GossipExtension.PARTIAL_MESSAGES),
testExtensionEnabled = enabledGossipExtensions.contains(GossipExtension.TEST_EXTENSION)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,30 @@ class GossipRouterBuilderTest {

@Test
fun `localExtensionSupport reflects config in built router`() {
val config = GossipExtensionsConfig(
testExtensionEnabled = true,
partialMessagesEnabled = false
)
val router = GossipRouterBuilder(gossipExtensionsConfig = config).build()
val router = GossipRouterBuilder()
// Enabling only test extensions
.enabledGossipExtensions(
GossipExtension.TEST_EXTENSION
)
.build()

val localSupport = router.gossipExtensionsState.localExtensionSupport
assertThat(localSupport.testExtension).isTrue()
assertThat(localSupport.partialMessages).isFalse()
}

@Test
fun `localExtensionSupport with all extensions enabled`() {
val router = GossipRouterBuilder()
// Enabling all extensions
.enabledGossipExtensions(
GossipExtension.TEST_EXTENSION,
GossipExtension.PARTIAL_MESSAGES,
)
.build()

val localSupport = router.gossipExtensionsState.localExtensionSupport
assertThat(localSupport.testExtension).isTrue()
assertThat(localSupport.partialMessages).isTrue()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,16 @@ abstract class GossipTestsBase {
val scoreParams: GossipScoreParams = GossipScoreParams(),
val mockRouterFactory: DeterministicFuzzRouterFactory = createMockFuzzRouterFactory(),
val protocol: PubsubProtocol = PubsubProtocol.Gossip_V_1_1,
val gossipExtensionsConfig: GossipExtensionsConfig = GossipExtensionsConfig(
testExtensionEnabled = true
)
val enabledGossipExtensions: List<GossipExtension> = listOf(GossipExtension.TEST_EXTENSION)

) {
val fuzz = DeterministicFuzz()
val gossipRouterBuilderFactory = {
GossipRouterBuilder(
protocol = protocol,
params = coreParams,
scoreParams = scoreParams,
gossipExtensionsConfig = gossipExtensionsConfig
enabledGossipExtensions = enabledGossipExtensions
)
}
val router1 = fuzz.createTestRouter(createGossipFuzzRouterFactory(gossipRouterBuilderFactory))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.libp2p.pubsub.gossip.extensions

import io.libp2p.pubsub.PubsubProtocol
import io.libp2p.pubsub.gossip.GossipExtensionsConfig
import io.libp2p.pubsub.gossip.GossipExtension
import io.libp2p.pubsub.gossip.GossipPeerScoreParams
import io.libp2p.pubsub.gossip.GossipScoreParams
import io.libp2p.pubsub.gossip.GossipTestsBase
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
Expand Down Expand Up @@ -117,9 +119,7 @@ class GossipExtensionsMessageHandlingTest : GossipTestsBase() {
fun `local peer ignores test extension messages when they are disabled in config`() {
val test = TwoRoutersTest(
protocol = PubsubProtocol.Gossip_V_1_3,
gossipExtensionsConfig = GossipExtensionsConfig(
testExtensionEnabled = false
)
enabledGossipExtensions = listOf()
)

test.mockRouter.sendToSingle(rpcMsgWithCtrlExtensionsAndTestExtension)
Expand All @@ -135,9 +135,9 @@ class GossipExtensionsMessageHandlingTest : GossipTestsBase() {
fun `control extension message contains all supported extensions flags`() {
val test = TwoRoutersTest(
protocol = PubsubProtocol.Gossip_V_1_3,
gossipExtensionsConfig = GossipExtensionsConfig(
testExtensionEnabled = true,
partialMessagesEnabled = true
enabledGossipExtensions = listOf(
GossipExtension.TEST_EXTENSION,
GossipExtension.PARTIAL_MESSAGES
)
)

Expand Down Expand Up @@ -193,6 +193,33 @@ class GossipExtensionsMessageHandlingTest : GossipTestsBase() {
assertThat(test.gossipRouter.gossipExtensionsState.hasSentControlExtensionsTo(test.router2.peerId)).isFalse()
}

@Test
fun `peer sending multiple control extension messages are downscored`() {
val test = TwoRoutersTest(
protocol = PubsubProtocol.Gossip_V_1_3,
enabledGossipExtensions = listOf(GossipExtension.PARTIAL_MESSAGES),
// Creating GossipScoreParams with behaviourPenaltyWeight (peer bad behavior affecting
// score). Here we are not interested if the weight is "correct". What we want to see if
// that a peer is penalized for sending more than one ControlExtensions message.
scoreParams = GossipScoreParams(
peerScoreParams = GossipPeerScoreParams(
behaviourPenaltyWeight = -1.0
)
)
)

val offendingPeer = test.gossipRouter.peers[0].peerId
val initialScore = test.gossipRouter.score.score(offendingPeer)

// first ControlExtensions message, no downscoring
test.mockRouter.sendToSingle(rpcMessageWithControlExtensions)
assertThat(test.gossipRouter.score.score(offendingPeer)).isEqualTo(initialScore)

// second ControlExtensions message, peer downscored
test.mockRouter.sendToSingle(rpcMessageWithControlExtensions)
assertThat(test.gossipRouter.score.score(offendingPeer)).isLessThan(initialScore)
}

companion object {
@JvmStatic
fun protocolVersionsWithExtensionSupport() = listOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class DeterministicFuzz {
{ executor, curTime, random ->
routerBuilderFactory().also {
it.scheduledAsyncExecutor = executor
it.currentTimeSuppluer = curTime
it.currentTimeSupplier = curTime
it.random = random
}.build()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class GossipSimNetwork(

protected fun createSimPeer(number: Int): GossipSimPeer {
val router = routerFactory(number).also {
it.currentTimeSuppluer = { timeController.time }
it.currentTimeSupplier = { timeController.time }
it.serializeMessagesToBytes = false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class GossipSimPeer(
routerBuilder.also {
it.name = name
it.scheduledAsyncExecutor = simExecutor
it.currentTimeSuppluer = { currentTime() }
it.currentTimeSupplier = { currentTime() }
it.random = random
}.build()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ class SimGossipRouterBuilder : GossipRouterBuilder() {

override fun createGossipRouter(): GossipRouter {
val gossipScore =
scoreFactory(scoreParams, scheduledAsyncExecutor, currentTimeSuppluer) { gossipRouterEventListeners += it }
scoreFactory(scoreParams, scheduledAsyncExecutor, currentTimeSupplier) { gossipRouterEventListeners += it }

val router = SimGossipRouter(
params = params,
scoreParams = scoreParams,
currentTimeSupplier = currentTimeSuppluer,
currentTimeSupplier = currentTimeSupplier,
random = random,
name = name,
mCache = mCache,
Expand Down
Loading