Skip to content

Commit da72ffe

Browse files
committed
Bunch of changes and bump of mdbx
1 parent 6abae91 commit da72ffe

File tree

22 files changed

+461
-772
lines changed

22 files changed

+461
-772
lines changed

TODO.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,5 @@
5858
- Further optimize BatchWriter for higher throughput
5959
- Improve worker selection algorithm for P2P distribution
6060
- Enhance TCP streaming for large payloads
61+
62+

client/client.go

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"encoding/binary"
66
"errors"
77
"fmt"
8+
"github.com/google/uuid"
9+
"github.com/unpackdev/fdb/pkg/messages"
810
"github.com/unpackdev/fdb/pkg/types"
911
"sync"
1012
"time"
@@ -91,13 +93,10 @@ func (c *Client) Close() error {
9193
return nil
9294
}
9395

94-
// SendAndReceiveMessage sends a message to another node and waits for a
95-
// response. This implementation handles large payloads with chunking and
96-
// provides proper response handling with timeouts.
97-
// Note: Currently this method only works with TCPTransport and will return
98-
// an error for other transport types.
99-
func (c *Client) SendAndReceiveMessage(name string, data []byte, timeout time.Duration) ([]byte, error) {
100-
transport, err := c.GetTransport(name)
96+
// SendAndReceiveMessage sends a message and waits for a response with the specified timeout.
97+
func (c *Client) SendAndReceiveMessage(ctx context.Context, transportType string, data []byte, timeout time.Duration) ([]byte, error) {
98+
// Get the transport
99+
transport, err := c.GetTransport(transportType)
101100
if err != nil {
102101
return nil, err
103102
}
@@ -108,11 +107,23 @@ func (c *Client) SendAndReceiveMessage(name string, data []byte, timeout time.Du
108107
return nil, fmt.Errorf("SendAndReceiveMessage is only supported for TCP transport, got %T", transport)
109108
}
110109

111-
// Get the response message type - use the success status
112-
responseType := MessageType(1) // Using 1 as a default success value
110+
// Try to decode the message to extract the UUID
111+
// If it's not already an encoded message, we'll need to generate a new UUID
112+
var messageID uuid.UUID
113+
114+
if decodedMsg, err := messages.Decode(data); err == nil {
115+
// Data is already an encoded message, extract the ID
116+
messageID = decodedMsg.ID
117+
} else {
118+
// Generate a new UUID for this request
119+
messageID = uuid.New()
120+
// Note: we should ideally encode this ID into the data, but for now
121+
// we'll just use it for correlation. In a complete implementation,
122+
// you'd want to encode the data with this ID.
123+
}
113124

114-
// Register a response channel before sending the message
115-
responseCh := tcpTransport.RegisterResponseChannel(responseType)
125+
// Register a response channel using the message ID
126+
responseCh := tcpTransport.RegisterResponseChannel(messageID)
116127

117128
// Define the maximum size for non-chunked messages
118129
const maxChunkSize = 1024 * 1024 // 1MB
@@ -135,7 +146,7 @@ func (c *Client) SendAndReceiveMessage(name string, data []byte, timeout time.Du
135146
// Send the message
136147
if err := tcpTransport.Send(encodedMsg); err != nil {
137148
// Make sure to unregister the response channel on error
138-
tcpTransport.UnregisterResponseChannel(responseType)
149+
tcpTransport.UnregisterResponseChannel(messageID)
139150
return nil, fmt.Errorf("failed to send message: %w", err)
140151
}
141152

client/client_tcp_test.go

Lines changed: 0 additions & 124 deletions
This file was deleted.

client/default_handlers.go

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,19 @@ import (
44
"fmt"
55
"strings"
66

7-
"github.com/panjf2000/gnet/v2"
87
"github.com/unpackdev/fdb/pkg/packets"
98
"github.com/unpackdev/fdb/pkg/types"
109
"go.uber.org/zap"
1110
)
1211

12+
// Global transport instance for default handlers - will be set during registration
13+
var defaultTransport *TCPTransport
14+
1315
// RegisterDefaultHandlers registers default message type handlers for common error conditions
1416
// and system messages that the client should always be able to handle.
1517
func RegisterDefaultHandlers(transport *TCPTransport) {
1618
transport.logger.Info("Registering default message handlers for client")
19+
defaultTransport = transport
1720

1821
// Register handler for InvalidActionMessageType (69 decimal = 'E' ASCII)
1922
// This message type is sent by the server when it encounters issues with very large payloads
@@ -40,13 +43,12 @@ func RegisterDefaultHandlers(transport *TCPTransport) {
4043

4144
// handleInvalidAction processes InvalidActionMessageType (0x69) messages from the server
4245
// These typically indicate resource constraints when processing large payloads
43-
func handleInvalidAction(conn gnet.Conn, data []byte) error {
44-
// Try to parse the response data into a DBResponse object
45-
dbResponse, err := packets.DecodeDBResponse(data)
46+
func handleInvalidAction(data []byte) error {
47+
// Try to parse the response data into a MessageResponse object
48+
dbResponse, err := packets.DecodeMessageResponse(data)
4649
if err != nil {
47-
handler, ok := conn.Context().(*tcpEventHandler)
48-
if ok && handler != nil && handler.transport != nil && handler.transport.logger != nil {
49-
handler.transport.logger.Error("Failed to decode InvalidAction response",
50+
if defaultTransport != nil && defaultTransport.logger != nil {
51+
defaultTransport.logger.Error("Failed to decode InvalidAction response",
5052
zap.Error(err),
5153
zap.Int("data_length", len(data)))
5254
}
@@ -60,46 +62,44 @@ func handleInvalidAction(conn gnet.Conn, data []byte) error {
6062
}
6163

6264
// Log the error
63-
handler, ok := conn.Context().(*tcpEventHandler)
64-
if ok && handler != nil && handler.transport != nil && handler.transport.logger != nil {
65-
handler.transport.logger.Warn("Server reported invalid action or resource constraint",
65+
if defaultTransport != nil && defaultTransport.logger != nil {
66+
defaultTransport.logger.Warn("Server reported invalid action or resource constraint",
6667
zap.String("error", errorMessage),
6768
zap.Uint32("data_length", dbResponse.Length),
6869
zap.Uint8("status", uint8(dbResponse.Status)))
6970
}
7071

7172
// Since this is a special error type, we can notify any waiting response channels
7273
// by converting this to a regular error response
73-
errorResp := &packets.DBResponse{
74+
errorResp := &packets.MessageResponse{
7475
Status: types.HandlerStatusError,
7576
Length: uint32(len(errorMessage)),
7677
Data: []byte(errorMessage),
7778
}
7879

7980
// Forward this error to any success message handlers that might be waiting
80-
if handler != nil && handler.transport != nil && handler.transport.responseHandler != nil {
81+
if defaultTransport != nil && defaultTransport.responseHandler != nil {
8182
// Create a new message that response handlers can understand (success message type)
8283
successMsgType := MessageType(types.HandlerStatusSuccess.Byte())
8384

8485
// Encode the error response to be handled
8586
encodedResp := errorResp.Encode()
8687

8788
// Use the response handler's existing mechanism to handle this
88-
handler.transport.responseHandler.HandleResponse(successMsgType, encodedResp)
89+
defaultTransport.responseHandler.HandleResponse(successMsgType, encodedResp)
8990
}
9091

9192
return nil
9293
}
9394

9495
// handleErrorResponse processes HandlerStatusError (0) messages from the server
95-
// These are properly formatted protocol error messages using DBResponse structure
96-
func handleErrorResponse(conn gnet.Conn, data []byte) error {
97-
// Try to parse the response data into a DBResponse object
98-
dbResponse, err := packets.DecodeDBResponse(data)
96+
// These are properly formatted protocol error messages using MessageResponse structure
97+
func handleErrorResponse(data []byte) error {
98+
// Try to parse the response data into a MessageResponse object
99+
dbResponse, err := packets.DecodeMessageResponse(data)
99100
if err != nil {
100-
handler, ok := conn.Context().(*tcpEventHandler)
101-
if ok && handler != nil && handler.transport != nil && handler.transport.logger != nil {
102-
handler.transport.logger.Error("Failed to decode Error response",
101+
if defaultTransport != nil && defaultTransport.logger != nil {
102+
defaultTransport.logger.Error("Failed to decode Error response",
103103
zap.Error(err),
104104
zap.Int("data_length", len(data)))
105105
}
@@ -130,20 +130,19 @@ func handleErrorResponse(conn gnet.Conn, data []byte) error {
130130
}
131131

132132
// Log the error
133-
handler, ok := conn.Context().(*tcpEventHandler)
134-
if ok && handler != nil && handler.transport != nil && handler.transport.logger != nil {
135-
handler.transport.logger.Error("Received protocol-formatted error from server",
133+
if defaultTransport != nil && defaultTransport.logger != nil {
134+
defaultTransport.logger.Error("Received protocol-formatted error from server",
136135
zap.String("error_message", errorMessage),
137136
zap.Uint32("data_length", dbResponse.Length))
138137
}
139138

140139
// Forward this error to any success message handlers that might be waiting
141-
if handler != nil && handler.transport != nil && handler.transport.responseHandler != nil {
140+
if defaultTransport != nil && defaultTransport.responseHandler != nil {
142141
// Create a new message that response handlers can understand (success message type)
143142
successMsgType := MessageType(types.HandlerStatusSuccess.Byte())
144143

145144
// Just pass the original data - it's already properly formatted
146-
handler.transport.responseHandler.HandleResponse(successMsgType, data)
145+
defaultTransport.responseHandler.HandleResponse(successMsgType, data)
147146
}
148147

149148
return nil

0 commit comments

Comments
 (0)