Skip to content

Commit a99eccb

Browse files
committed
Cleaning up a bit more
1 parent e98b70e commit a99eccb

File tree

5 files changed

+126
-153
lines changed

5 files changed

+126
-153
lines changed

fdb.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,10 +250,12 @@ func (fdb *FDB) Start(ctx context.Context, transports ...types.TransportType) er
250250
return errors.Wrap(gErr, "failure to start fdb database")
251251
}
252252

253-
select {
254-
case <-ctx.Done():
255-
return ctx.Err()
253+
<-ctx.Done()
254+
if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) {
255+
return err
256256
}
257+
258+
return nil
257259
}
258260

259261
func (fdb *FDB) Stop(transports ...types.TransportType) error {

node/db_write_handler.go

Lines changed: 3 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package node
22

33
import (
4-
"fmt"
5-
64
"github.com/unpackdev/fdb/db"
75
"github.com/unpackdev/fdb/logger"
86
"github.com/unpackdev/fdb/observability"
@@ -40,31 +38,21 @@ func (wh *DbWriteHandler) ForceFlush() {
4038

4139
// Handle processes the incoming message using the TCPWriteHandler
4240
func (wh *DbWriteHandler) Handle(conn transports.Connection, frame []byte) {
43-
fmt.Println("AM I HERE FOR FUCKS SAKES?!")
44-
45-
// Log packet reception
46-
wh.logger.Debug("Received write packet",
47-
zap.String("remote_addr", conn.RemoteAddr()),
48-
zap.Int("packet_size", len(frame)))
4941
// Check if first byte is our special marker 0xF0
50-
forceFlush := false
5142
offset := 0
5243
if len(frame) > 0 && frame[0] == 0xF0 {
53-
forceFlush = true
5444
offset = 1 // Skip the marker byte
55-
wh.logger.Debug("Detected force flush marker",
56-
zap.Int("offset", offset))
5745
}
5846

5947
// Adjust minimum length check based on whether we have a marker
6048
minLen := 34 // 1 byte for action, 32 bytes for key, and at least 1 byte for value
61-
if forceFlush {
49+
if offset > 0 {
6250
minLen = 35 // Extra byte for the marker
6351
}
6452

6553
if len(frame) < minLen {
66-
wh.logger.Debug("Invalid message length",
67-
zap.Int("length", len(frame)),
54+
wh.logger.Error("Invalid message length",
55+
zap.Int("length", len(frame)),
6856
zap.Int("expected", minLen))
6957
conn.Send([]byte{0x01}) // Error code
7058
return
@@ -77,13 +65,6 @@ func (wh *DbWriteHandler) Handle(conn transports.Connection, frame []byte) {
7765
// The remaining part is the value (from byte offset+33 onwards)
7866
value := frame[offset+33+3:]
7967

80-
// Log key and value details
81-
wh.logger.Debug("Processing write request",
82-
zap.Int("offset", offset),
83-
zap.Binary("key_prefix", key[:8]), // Log first 8 bytes of key for identification
84-
zap.Int("value_length", len(value)),
85-
zap.String("value_bytes", string(value)))
86-
8768
// Buffer the write request with the key as [32]byte
8869
err := wh.writer.BufferWrite(key, value)
8970
if err != nil {
@@ -94,26 +75,15 @@ func (wh *DbWriteHandler) Handle(conn transports.Connection, frame []byte) {
9475
return
9576
}
9677

97-
// Force flush to ensure data is immediately persisted
98-
// This is important for reliable testing and client expectations
99-
wh.logger.Debug("Forcing batch writer flush")
100-
wh.ForceFlush()
101-
wh.logger.Debug("Batch writer flush completed")
102-
10378
// Distribute the record to other nodes if a distributor is available
10479
if wh.distributor != nil {
105-
wh.logger.Debug("Starting P2P distribution",
106-
zap.Binary("key_prefix", key[:8]))
10780
// Use high priority for writes coming from direct client requests
10881
go func() {
10982
if err := wh.distributor.DistributeRecord(key, value, PriorityHigh, TargetAll); err != nil {
11083
wh.logger.Error("Failed to distribute record",
11184
zap.Error(err),
11285
zap.Binary("key", key[:]),
11386
)
114-
} else {
115-
wh.logger.Debug("Successfully queued record for distribution",
116-
zap.Binary("key_prefix", key[:8]))
11787
}
11888
}()
11989
}

node/p2p_distributor.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ func (d *P2PDistributor) processPendingBatches(queue chan *RecordBatch) {
229229
drainCount := 0
230230
maxDrain := 100 // Safety limit
231231

232+
drainLoop:
232233
for recordCount < d.batchSize && drainCount < maxDrain {
233234
select {
234235
case batch := <-queue:
@@ -280,8 +281,8 @@ func (d *P2PDistributor) processPendingBatches(queue chan *RecordBatch) {
280281

281282
drainCount++
282283
default:
283-
// Queue empty
284-
break
284+
// Queue empty, break out of the outer loop
285+
break drainLoop
285286
}
286287
}
287288

tests/p2p_distributon_test.go

Lines changed: 114 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func TestP2PDistribution(t *testing.T) {
4343
require.NoError(t, err, "Failed to shutdown test nodes")
4444
}()
4545

46-
// Send a test message and wait for response
46+
// Verify the cluster is up and running
4747
testMessage := []byte("test message payload")
4848
resp, err := nodes.GetBootstrapNode().SendAndReceiveMessage(
4949
nodes.GetBootstrapNode(),
@@ -54,120 +54,120 @@ func TestP2PDistribution(t *testing.T) {
5454
)
5555
require.NoError(t, err, "Failed to send message or receive response")
5656
require.NotNil(t, resp, "Response should not be nil")
57-
58-
// The write handler responds with a 1-byte response: 0x00 for success or 0x01 for error
59-
// See transports/db_write_handler.go for details
6057
require.Equal(t, 1, len(resp), "Response should be exactly 1 byte")
6158
require.Equal(t, byte(0x00), resp[0], "Response should be 0x00 (success)")
6259

63-
// Test P2P distributor
64-
t.Run("P2P Distribution", func(t *testing.T) {
65-
// Create a test record with a fixed key for consistency
66-
// Generate a random 32-byte key
67-
key, err := messages.GenerateRandomKey()
68-
require.NoError(t, err, "Failed to generate random key")
69-
70-
value := []byte("test record value payload")
71-
72-
t.Logf("Using test key (hex): %s", hex.EncodeToString(key[:]))
73-
74-
// Get the bootstrap node and a regular node
75-
bootstrapNode := nodes.GetBootstrapNode()
76-
regularNode := nodes.GetNodeByIndex(1) // Get the second node
77-
require.NotNil(t, regularNode, "Failed to get the second node")
78-
79-
// Create a message with the fixed key
80-
t.Log("Writing record to bootstrap node database")
81-
82-
// Create a message with our specific key (not random)
83-
writeMsg := &messages.Message{
84-
Handler: types.WriteHandlerType,
85-
Key: key,
86-
Data: value,
87-
}
88-
89-
// Encode the message
90-
encodedWriteMsg, err := writeMsg.Encode()
91-
require.NoError(t, err, "Failed to encode write message")
92-
93-
// Write the record to the bootstrap node
94-
writeResp, err := bootstrapNode.SendAndReceiveMessage(
95-
bootstrapNode,
96-
types.TCPTransportType,
97-
types.WriteHandlerType,
98-
encodedWriteMsg,
99-
5*time.Second,
100-
)
101-
require.NoError(t, err, "Failed to write record to bootstrap node")
102-
require.Equal(t, byte(0x00), writeResp[0], "Write operation should succeed with 0x00")
103-
104-
// The DbWriteHandler automatically distributes records after writing
105-
// We're explicitly calling a BatchWriter.Flush() in the handler
106-
107-
// Give some time for distribution to occur
108-
t.Log("Waiting for record distribution...")
109-
time.Sleep(1 * time.Second)
110-
111-
// Verify on bootstrap node first (should be immediate)
112-
t.Log("Verifying record exists on bootstrap node")
113-
114-
// Create a read message with the SAME key
115-
t.Logf("Reading with the exact same key (hex): %s", hex.EncodeToString(key[:]))
116-
117-
// Create a read message with our specific key (not random)
118-
readMsg := &messages.Message{
119-
Handler: types.ReadHandlerType,
120-
Key: key, // Same key we used for writing
121-
Data: nil, // No data needed for read requests
122-
}
123-
124-
// Encode the message
125-
encodedReadMsg, err := readMsg.Encode()
126-
require.NoError(t, err, "Failed to encode read message")
127-
128-
// Read from the bootstrap node
129-
bootstrapResp, err := bootstrapNode.SendAndReceiveMessage(
130-
bootstrapNode,
131-
types.TCPTransportType,
132-
types.ReadHandlerType,
133-
encodedReadMsg,
134-
5*time.Second,
135-
)
136-
require.NoError(t, err, "Failed to read record from bootstrap node")
137-
require.NotNil(t, bootstrapResp, "Bootstrap node response should not be nil")
138-
require.True(t, len(bootstrapResp) > 1, "Bootstrap node response should have at least a status byte and value")
139-
140-
// Response format is now: [1 byte status][value bytes]
141-
// For successful reads, status byte should be 0x00
142-
require.Equal(t, byte(0x00), bootstrapResp[0], "Read response status byte should be 0x00")
143-
144-
// Extract the actual payload (skip the status byte)
145-
bootstrapValue := bootstrapResp[1:]
146-
require.Equal(t, value, bootstrapValue, "Record value mismatch on bootstrap node")
147-
148-
// Now verify on the regular node (should get there via P2P distribution)
149-
t.Log("Verifying record exists on regular node")
150-
151-
// Read from the regular node using the same message
152-
regularResp, err := regularNode.SendAndReceiveMessage(
153-
regularNode,
154-
types.TCPTransportType,
155-
types.ReadHandlerType,
156-
encodedReadMsg,
157-
5*time.Second,
158-
)
159-
require.NoError(t, err, "Failed to read record from regular node")
160-
require.NotNil(t, regularResp, "Regular node response should not be nil")
161-
require.True(t, len(regularResp) > 1, "Regular node response should have at least a status byte and value")
162-
163-
// Response format is now: [1 byte status][value bytes]
164-
// For successful reads, status byte should be 0x00
165-
require.Equal(t, byte(0x00), regularResp[0], "Read response status byte should be 0x00")
166-
167-
// Extract the actual payload (skip the status byte)
168-
regularValue := regularResp[1:]
169-
require.Equal(t, value, regularValue, "Record value mismatch on regular node")
170-
171-
t.Log("P2P distribution test successful!")
172-
})
60+
// Get all nodes we'll use for testing
61+
bootstrapNode := nodes.GetBootstrapNode()
62+
regularNode := nodes.GetNodeByIndex(1)
63+
require.NotNil(t, regularNode, "Failed to get second node")
64+
65+
// Define table-driven test cases
66+
testCases := []struct {
67+
name string
68+
value []byte
69+
waitTime time.Duration // Time to wait for P2P distribution
70+
}{
71+
{
72+
name: "Basic string value",
73+
value: []byte("test record value payload"),
74+
waitTime: 100 * time.Millisecond,
75+
},
76+
{
77+
name: "JSON data",
78+
value: []byte(`{"id":"12345","name":"test","data":[1,2,3,4,5]}`),
79+
waitTime: 100 * time.Millisecond,
80+
},
81+
// This is broken, 4096 bytes is currently processing fine, need to get it working by buffering...
82+
// {
83+
// name: "Large binary data (100KB)",
84+
// value: bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 25 * 1024), // ~100KB of data
85+
// waitTime: 3 * time.Second, // Extra time for larger payload
86+
// },
87+
}
88+
89+
// Run all test cases
90+
for _, tc := range testCases {
91+
t.Run(tc.name, func(t *testing.T) {
92+
// Generate a unique key for this test case
93+
key, err := messages.GenerateRandomKey()
94+
require.NoError(t, err, "Failed to generate random key")
95+
t.Logf("Using test key (hex): %s", hex.EncodeToString(key[:]))
96+
97+
// 1. WRITE PHASE: Write record to bootstrap node
98+
t.Log("Writing record to bootstrap node database")
99+
writeMsg := &messages.Message{
100+
Handler: types.WriteHandlerType,
101+
Key: key,
102+
Data: tc.value,
103+
}
104+
105+
encodedWriteMsg, err := writeMsg.Encode()
106+
require.NoError(t, err, "Failed to encode write message")
107+
108+
writeResp, err := bootstrapNode.SendAndReceiveMessage(
109+
bootstrapNode,
110+
types.TCPTransportType,
111+
types.WriteHandlerType,
112+
encodedWriteMsg,
113+
5*time.Second,
114+
)
115+
require.NoError(t, err, "Failed to write record to bootstrap node")
116+
require.Equal(t, byte(0x00), writeResp[0], "Write operation should succeed with 0x00")
117+
118+
// Wait for P2P distribution to occur
119+
// Batch writter flushes immediately if there's > bufferSize messages or every 100ms it flushes
120+
// so let's await for a little bit of the time (100ms is enough)
121+
t.Logf("Waiting %v for record distribution...", tc.waitTime)
122+
time.Sleep(tc.waitTime)
123+
124+
// 2. READ PHASE: Create read message for verification
125+
readMsg := &messages.Message{
126+
Handler: types.ReadHandlerType,
127+
Key: key,
128+
Data: nil,
129+
}
130+
encodedReadMsg, err := readMsg.Encode()
131+
require.NoError(t, err, "Failed to encode read message")
132+
133+
// 2a. Verify on bootstrap node (should be immediate)
134+
t.Log("Verifying record exists on bootstrap node")
135+
bootstrapResp, err := bootstrapNode.SendAndReceiveMessage(
136+
bootstrapNode,
137+
types.TCPTransportType,
138+
types.ReadHandlerType,
139+
encodedReadMsg,
140+
5*time.Second,
141+
)
142+
require.NoError(t, err, "Failed to read record from bootstrap node")
143+
require.NotNil(t, bootstrapResp, "Bootstrap node response should not be nil")
144+
require.True(t, len(bootstrapResp) > 1, "Bootstrap node response should have at least a status byte and value")
145+
require.Equal(t, byte(0x00), bootstrapResp[0], "Read response status byte should be 0x00")
146+
147+
// Extract the actual payload (skip the status byte)
148+
bootstrapValue := bootstrapResp[1:]
149+
require.Equal(t, tc.value, bootstrapValue, "Record value mismatch on bootstrap node")
150+
151+
// 2b. Verify on regular node (should get there via P2P distribution)
152+
t.Log("Verifying record exists on regular node")
153+
regularResp, err := regularNode.SendAndReceiveMessage(
154+
regularNode,
155+
types.TCPTransportType,
156+
types.ReadHandlerType,
157+
encodedReadMsg,
158+
5*time.Second,
159+
)
160+
require.NoError(t, err, "Failed to read record from regular node")
161+
require.NotNil(t, regularResp, "Regular node response should not be nil")
162+
require.True(t, len(regularResp) > 1, "Regular node response should have at least a status byte and value")
163+
require.Equal(t, byte(0x00), regularResp[0], "Read response status byte should be 0x00")
164+
165+
// Extract the actual payload (skip the status byte)
166+
regularValue := regularResp[1:]
167+
require.Equal(t, tc.value, regularValue, "Record value mismatch on regular node")
168+
169+
t.Logf("%s: P2P distribution successful!", tc.name)
170+
})
171+
}
172+
173173
}

transports/tcp/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ func (s *Server) OnTraffic(c gnet.Conn) (action gnet.Action) {
292292
return gnet.None
293293
}
294294

295-
s.logger.Debug("ON TRAFFIC REACHED...", "handler", handlerType, "data", data)
295+
//s.logger.Debug("ON TRAFFIC REACHED...", "handler", handlerType, "data", data)
296296

297297
// Retrieve the handler
298298
s.mu.RLock()

0 commit comments

Comments
 (0)