Skip to content

Commit 47b7532

Browse files
committed
Basic networks strategy that just runs the network itself
1 parent d5ac1a1 commit 47b7532

File tree

4 files changed

+205
-26
lines changed

4 files changed

+205
-26
lines changed

cmd/playground.go

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,36 @@ func writeStrategyFlags() []cli.Flag {
6969
}
7070
}
7171

72+
// networkStrategyFlags returns the flags specific to the network strategy
73+
func networkStrategyFlags() []cli.Flag {
74+
return []cli.Flag{
75+
&cli.IntFlag{
76+
Name: "duration",
77+
Aliases: []string{"d"},
78+
Value: 60,
79+
Usage: "Duration to run the network test in seconds",
80+
},
81+
&cli.IntFlag{
82+
Name: "ping-timeout",
83+
Aliases: []string{"t"},
84+
Value: 5,
85+
Usage: "Timeout for each ping operation in seconds",
86+
},
87+
}
88+
}
89+
90+
// PlaygroundCommand returns a cli.Command for running playground environments
7291
func PlaygroundCommand() *cli.Command {
7392
return &cli.Command{
74-
Name: "playground",
75-
Usage: "Play with (f)db clients over simulated network",
93+
Name: "playground",
94+
Usage: "Run playground environments for testing and demos",
95+
Description: "Playground provides a simple environment for spawning multiple nodes and running test strategies",
7696
Subcommands: []*cli.Command{
7797
{
78-
Name: "network",
79-
Usage: "Run just the network without any specific test strategy",
80-
Flags: commonFlags(),
98+
Name: "network",
99+
Usage: "Run a network connectivity test",
100+
Description: "Tests the P2P network connectivity between all nodes",
101+
Flags: append(commonFlags(), networkStrategyFlags()...),
81102
Action: func(c *cli.Context) error {
82103
config := playground.Config{
83104
BasePort: DefaultBasePort,

playground/registry/strategies.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,7 @@ var AvailableStrategies = map[string]RegistryFn{
1414
"write": func(logger logger.Logger) strategies.Strategy {
1515
return strategies.NewWriteStrategy(logger, nil) // Nodes will be provided later
1616
},
17+
"network": func(logger logger.Logger) strategies.Strategy {
18+
return strategies.NewNetworkStrategy(logger, nil) // Nodes will be provided later
19+
},
1720
}

playground/strategies/network.go

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
package strategies
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/unpackdev/fdb/pkg/logger"
9+
"github.com/unpackdev/fdb/playground/suite"
10+
)
11+
12+
// NetworkStrategy tests the network connectivity between nodes
13+
type NetworkStrategy struct {
14+
logger logger.Logger
15+
nodes suite.TestNodes
16+
doneCh chan struct{}
17+
cancel context.CancelFunc
18+
duration time.Duration
19+
wg sync.WaitGroup
20+
pingTimeout time.Duration
21+
}
22+
23+
// NewNetworkStrategy creates a new network strategy
24+
func NewNetworkStrategy(logger logger.Logger, nodes suite.TestNodes) *NetworkStrategy {
25+
return &NetworkStrategy{
26+
logger: logger,
27+
nodes: nodes,
28+
doneCh: make(chan struct{}),
29+
duration: 60 * time.Second, // Run for 1 minute by default
30+
pingTimeout: 5 * time.Second,
31+
}
32+
}
33+
34+
// Info returns information about the network strategy
35+
func (s *NetworkStrategy) Info() Info {
36+
return Info{
37+
Name: "network",
38+
Description: "Tests network connectivity between nodes",
39+
DefaultArgs: map[string]any{
40+
"duration": 60, // seconds
41+
"ping_timeout": 5, // seconds
42+
},
43+
ArgMappings: []ArgMapping{
44+
{
45+
Flag: "duration",
46+
ParamKey: "duration",
47+
Description: "Duration to run the network test in seconds",
48+
DefaultValue: 60,
49+
},
50+
{
51+
Flag: "ping-timeout",
52+
ParamKey: "ping_timeout",
53+
Description: "Timeout for each ping operation in seconds",
54+
DefaultValue: 5,
55+
},
56+
},
57+
}
58+
}
59+
60+
// Start begins the network connectivity testing
61+
func (s *NetworkStrategy) Start(ctx context.Context) error {
62+
s.logger.Info("Starting network connectivity testing",
63+
"node_count", len(s.nodes),
64+
"duration", s.duration.String(),
65+
"ping_timeout", s.pingTimeout.String())
66+
67+
// Create a new context we can cancel when Stop is called
68+
ctx, s.cancel = context.WithCancel(ctx)
69+
70+
s.wg.Add(1)
71+
go s.runConnectivityTest(ctx)
72+
73+
// Set up the completion timer
74+
go func() {
75+
timer := time.NewTimer(s.duration)
76+
defer timer.Stop()
77+
78+
select {
79+
case <-ctx.Done():
80+
return
81+
case <-timer.C:
82+
s.logger.Info("Network test completed successfully")
83+
close(s.doneCh)
84+
if s.cancel != nil {
85+
s.cancel()
86+
}
87+
}
88+
}()
89+
90+
return nil
91+
}
92+
93+
// Stop halts the network strategy
94+
func (s *NetworkStrategy) Stop() error {
95+
if s.cancel != nil {
96+
s.cancel()
97+
}
98+
s.wg.Wait()
99+
return nil
100+
}
101+
102+
// CompletionCh returns a channel that is closed when the strategy completes
103+
func (s *NetworkStrategy) CompletionCh() <-chan struct{} {
104+
return s.doneCh
105+
}
106+
107+
// CreateFn returns a function that can create new instances of this strategy
108+
func (s *NetworkStrategy) CreateFn() StrategyFn {
109+
return func(logger logger.Logger, nodes suite.TestNodes, args map[string]any) (Strategy, error) {
110+
strategy := NewNetworkStrategy(logger, nodes)
111+
112+
// Apply configuration from args
113+
if durationSec, ok := args["duration"].(int); ok && durationSec > 0 {
114+
strategy.duration = time.Duration(durationSec) * time.Second
115+
}
116+
117+
if timeout, ok := args["ping_timeout"].(int); ok && timeout > 0 {
118+
strategy.pingTimeout = time.Duration(timeout) * time.Second
119+
}
120+
121+
return strategy, nil
122+
}
123+
}
124+
125+
// runConnectivityTest periodically checks connectivity between nodes
126+
func (s *NetworkStrategy) runConnectivityTest(ctx context.Context) {
127+
defer s.wg.Done()
128+
129+
ticker := time.NewTicker(10 * time.Second)
130+
defer ticker.Stop()
131+
132+
for {
133+
select {
134+
case <-ctx.Done():
135+
return
136+
case <-ticker.C:
137+
s.checkConnectivity()
138+
}
139+
}
140+
}
141+
142+
// checkConnectivity verifies that all nodes can see each other
143+
func (s *NetworkStrategy) checkConnectivity() {
144+
totalNodes := len(s.nodes)
145+
s.logger.Info("Checking network connectivity", "total_nodes", totalNodes)
146+
147+
for i, node := range s.nodes {
148+
peers := node.Node().Network().Host().Network().Peers()
149+
s.logger.Info("Node connectivity status",
150+
"node_index", i,
151+
"peer_id", node.PeerID().String(),
152+
"connected_peers", len(peers),
153+
"expected_peers", totalNodes-1) // Exclude self
154+
}
155+
}

playground/suite/suite.go

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func (t *TestNode) WaitForPeersConnected(expectedPeerCount int, timeout time.Dur
168168
for {
169169
select {
170170
case <-timeoutCh:
171-
return fmt.Errorf("timeout waiting for validator %s to connect to peers", t.PeerID())
171+
return fmt.Errorf("timeout waiting for node %s to connect to peers", t.PeerID())
172172
case <-ticker.C:
173173
peers := t.node.Network().Host().Network().Peers()
174174
if len(peers) >= expectedPeerCount {
@@ -267,25 +267,6 @@ func InitializeNodes(
267267
BasePath: dir,
268268
Keys: []config.Key{}, // Keys are generated dynamically...
269269
},
270-
Observability: config.Observability{
271-
Metrics: config.MetricsConfig{
272-
Enable: false,
273-
Exporter: "prometheus",
274-
Endpoint: "0.0.0.0:9090",
275-
Headers: map[string]string{},
276-
ExportInterval: 15 * time.Second,
277-
SampleRate: 1.0,
278-
},
279-
Tracing: config.TracingConfig{
280-
Enable: false,
281-
Exporter: "otlp",
282-
Endpoint: "localhost:4317",
283-
Headers: map[string]string{},
284-
Sampler: "always_on",
285-
SamplingRate: 1.0,
286-
ExportInterval: 15 * time.Second,
287-
},
288-
},
289270
Transports: []config.Transport{
290271
{
291272
Type: types.TCPTransportType,
@@ -313,6 +294,25 @@ func InitializeNodes(
313294
TLS: nil,
314295
},
315296
},
297+
Observability: config.Observability{
298+
Metrics: config.MetricsConfig{
299+
Enable: true,
300+
Exporter: "prometheus",
301+
Endpoint: "localhost:4317",
302+
Headers: map[string]string{},
303+
ExportInterval: 15 * time.Second,
304+
SampleRate: 1.0,
305+
},
306+
Tracing: config.TracingConfig{
307+
Enable: true,
308+
Exporter: "otlp",
309+
Endpoint: "localhost:4317",
310+
Headers: map[string]string{},
311+
Sampler: "always_on",
312+
SamplingRate: 0.1,
313+
ExportInterval: 15 * time.Second,
314+
},
315+
},
316316
}
317317

318318
// Initialize the logger
@@ -435,7 +435,7 @@ func InitializeNodes(
435435

436436
for _, tNode := range nodes {
437437
// Minus one because own peer needs to be excluded
438-
wpcErr := tNode.WaitForPeersConnected(len(nodeRoles)-1, 10*time.Second)
438+
wpcErr := tNode.WaitForPeersConnected(len(nodeRoles)-1, 30*time.Second)
439439
if wpcErr != nil {
440440
return nil, fmt.Errorf("failure to establish mutual node connectivity: %w", wpcErr)
441441
}

0 commit comments

Comments
 (0)