Skip to content

Commit 8d82a57

Browse files
committed
Some cleanups and implementation of client into the write strategy
1 parent 47b7532 commit 8d82a57

7 files changed

Lines changed: 224 additions & 72 deletions

File tree

.go-version

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
go1.24.3

TODO.md

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# FDB TODO List
2+
3+
## Core Production Features
4+
5+
### Record Versioning and Conflict Resolution
6+
7+
- Add version tracking to the `Record` structure:
8+
```go
9+
// Record represents a key-value pair in a record batch
10+
type Record struct {
11+
Key [32]byte // Fixed-size byte array for keys
12+
Value []byte // Value as byte slice
13+
Version uint64 // Monotonically increasing version number
14+
Priority uint8 // Priority level for processing
15+
Timestamp int64 // Creation/modification timestamp
16+
}
17+
```
18+
19+
- Implement conflict detection and resolution mechanisms:
20+
- Last-write-wins strategy based on version numbers
21+
- Optimistic concurrency control for client operations
22+
- Support for conditional writes based on expected versions
23+
24+
- Update serialization/deserialization methods to handle new fields
25+
26+
### Missing Production-Ready Features
27+
28+
1. **Consensus Protocol**
29+
- Implement strong consensus for write ordering
30+
- Consider Raft or PBFT for distributed agreement
31+
32+
2. **Transaction Management**
33+
- Implement ACID transaction support
34+
- Add distributed commit/rollback protocols
35+
- Support for multi-key atomic operations
36+
37+
3. **Failure Detection and Recovery**
38+
- Automated node failure detection
39+
- Efficient state transfer for recovering nodes
40+
- Automatic recovery procedures
41+
42+
4. **State Transfer Optimization**
43+
- Efficient incremental state transfer
44+
- Snapshot-based recovery for new/rejoining nodes
45+
46+
5. **Comprehensive Security**
47+
- Data encryption (at-rest and in-transit)
48+
- Enhanced authentication mechanisms
49+
- Audit logging for all operations
50+
51+
6. **Backup & Disaster Recovery**
52+
- Point-in-time recovery
53+
- Incremental backup support
54+
- Disaster recovery procedures
55+
56+
## Performance Optimizations
57+
58+
- Further optimize BatchWriter for higher throughput
59+
- Improve worker selection algorithm for P2P distribution
60+
- Enhance TCP streaming for large payloads

client/client.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/binary"
66
"errors"
77
"fmt"
8+
"github.com/unpackdev/fdb/pkg/types"
89
"sync"
910
"time"
1011
)
@@ -44,6 +45,17 @@ func (c *Client) GetTransport(name string) (Transport, error) {
4445
return transport, nil
4546
}
4647

48+
// GetTransportByType retrieves a registered transport by type
49+
func (c *Client) GetTransportByType(name types.TransportType) (Transport, error) {
50+
c.mu.RLock()
51+
defer c.mu.RUnlock()
52+
transport, exists := c.transports[name.String()]
53+
if !exists {
54+
return nil, fmt.Errorf("transport by type not found: %s", name)
55+
}
56+
return transport, nil
57+
}
58+
4759
// SendMessage sends a message using the specified transport
4860
func (c *Client) SendMessage(name string, data []byte) error {
4961
transport, err := c.GetTransport(name)
@@ -89,49 +101,49 @@ func (c *Client) SendAndReceiveMessage(name string, data []byte, timeout time.Du
89101
if err != nil {
90102
return nil, err
91103
}
92-
104+
93105
// Check if we're dealing with a TCPTransport which has advanced response handling
94106
tcpTransport, ok := transport.(*TCPTransport)
95107
if !ok {
96108
return nil, fmt.Errorf("SendAndReceiveMessage is only supported for TCP transport, got %T", transport)
97109
}
98-
110+
99111
// Get the response message type - use the success status
100112
responseType := MessageType(1) // Using 1 as a default success value
101-
113+
102114
// Register a response channel before sending the message
103115
responseCh := tcpTransport.RegisterResponseChannel(responseType)
104-
116+
105117
// Define the maximum size for non-chunked messages
106118
const maxChunkSize = 1024 * 1024 // 1MB
107-
119+
108120
// For large payloads, use chunking protocol to ensure reliable transmission
109121
encodedMsg := data
110122
if len(data) > maxChunkSize {
111123
// Prepare the chunked message with a 4-byte length prefix
112124
// The server expects: [total_size(4 bytes)][payload...]
113125
lengthPrefix := make([]byte, 4)
114126
binary.LittleEndian.PutUint32(lengthPrefix, uint32(len(data)))
115-
127+
116128
// Prepend the length prefix to the message
117129
chunkedMsg := append(lengthPrefix, data...)
118-
130+
119131
// Replace the original message with the chunked version
120132
encodedMsg = chunkedMsg
121133
}
122-
134+
123135
// Send the message
124136
if err := tcpTransport.Send(encodedMsg); err != nil {
125137
// Make sure to unregister the response channel on error
126138
tcpTransport.UnregisterResponseChannel(responseType)
127139
return nil, fmt.Errorf("failed to send message: %w", err)
128140
}
129-
141+
130142
// Wait for the response with the provided timeout
131143
response, err := tcpTransport.WaitForResponseWithTimeout(responseCh, timeout)
132144
if err != nil {
133145
return nil, fmt.Errorf("error waiting for response: %w", err)
134146
}
135-
147+
136148
return response, nil
137149
}

config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ rpc:
128128
pprof:
129129
- name: fdb
130130
enabled: true
131-
addr: "0.0.0.0:6060"
131+
addr: "0.0.0.0:6065"
132132

133133
# ======================================================================================
134134
# OBSERVABILITY CONFIGURATION (Metrics and Tracing)

playground/strategies/write.go

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"time"
88

99
"github.com/unpackdev/fdb/pkg/logger"
10+
"github.com/unpackdev/fdb/pkg/messages"
11+
"github.com/unpackdev/fdb/pkg/types"
1012
"github.com/unpackdev/fdb/playground/suite"
1113
)
1214

@@ -109,12 +111,14 @@ func WithTotalOperations(ops int) WriteStrategyOption {
109111

110112
// Start begins executing the write strategy
111113
func (s *WriteStrategy) Start(ctx context.Context) error {
112-
s.logger.Info("Starting write strategy",
114+
s.logger.Info(
115+
"Starting write strategy",
113116
"workers", s.workersCount,
114117
"data_size_kb", s.dataSizeKB,
115118
"ops_per_sec", s.opsPerSec,
116119
"total_ops", s.totalOps,
117-
"target_node", s.targetNode.PeerID().String())
120+
"target_node", s.targetNode.PeerID().String(),
121+
)
118122

119123
ctx, s.cancel = context.WithCancel(ctx)
120124
s.startTime = time.Now()
@@ -209,20 +213,53 @@ func (s *WriteStrategy) runWorker(ctx context.Context, id, ops int, delay time.D
209213

210214
// performWriteOperation executes a single write operation
211215
func (s *WriteStrategy) performWriteOperation(ctx context.Context, workerID, opID int) error {
212-
key := fmt.Sprintf("%s%d-%d", s.keyPrefix, workerID, opID)
216+
//key := fmt.Sprintf("%s%d-%d", s.keyPrefix, workerID, opID)
213217

214218
data, err := suite.GenerateTestDataKB(s.dataSizeKB, false)
215219
if err != nil {
216220
return fmt.Errorf("failed to generate test data: %w", err)
217221
}
218222

219-
// TODO: Implement actual write operation using the client
220-
// This is a placeholder for the actual implementation that would use the
221-
// optimized BatchWriter component with its 2048 batch size and 100ms flush interval
223+
key, err := messages.GenerateRandomKey()
224+
if err != nil {
225+
return fmt.Errorf("failed to generate random key: %w", err)
226+
}
222227

223-
s.logger.Debug("Write operation",
228+
// Create a write message with the key and data
229+
writeMsg := &messages.Message{
230+
Handler: types.WriteHandlerType,
231+
Key: key,
232+
Data: data,
233+
}
234+
235+
// Encode the message
236+
encodedWriteMsg, err := writeMsg.Encode()
237+
if err != nil {
238+
return fmt.Errorf("failed to encode write message: %w", err)
239+
}
240+
241+
// Send the message to the target node using the client
242+
// This leverages the optimized BatchWriter component with 2048 batch size and 100ms flush interval
243+
// that's already configured in the underlying implementation
244+
response, err := s.targetNode.SendAndReceiveMessage(
245+
ctx,
246+
s.targetNode,
247+
types.TCPTransportType,
248+
types.WriteHandlerType,
249+
encodedWriteMsg,
250+
5*time.Second,
251+
)
252+
if err != nil {
253+
return fmt.Errorf("failed to write record to target node: %w", err)
254+
}
255+
256+
s.logger.Debug(
257+
"Write operation successful",
224258
"key", key,
225-
"data_size", len(data))
259+
"data_size", len(data),
260+
"worker_id", workerID,
261+
"response", response,
262+
)
226263

227264
return nil
228265
}

0 commit comments

Comments
 (0)