-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathresponses.go
More file actions
237 lines (214 loc) · 7.03 KB
/
responses.go
File metadata and controls
237 lines (214 loc) · 7.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
package gorums
import (
"errors"
"iter"
"google.golang.org/protobuf/proto"
"github.com/relab/gorums/internal/stream"
)
// msg is a type alias for proto.Message intended to be used as a type parameter.
type msg = proto.Message
// NodeResponse is a type alias for stream.NodeResponse.
type NodeResponse[T any] = stream.NodeResponse[T]
// mapToCallResponse converts a NodeResponse[*stream.Message] to a NodeResponse[Resp].
// This is necessary because the channel layer's response router returns a
// NodeResponse[*stream.Message] while the calltype expects a NodeResponse[Resp].
func mapToCallResponse[Resp msg](channelResp NodeResponse[*stream.Message]) NodeResponse[Resp] {
callResp := NodeResponse[Resp]{
NodeID: channelResp.NodeID,
Err: channelResp.Err,
}
if channelResp.Err == nil {
respMsg, err := unmarshalResponse(channelResp.Value)
if err != nil {
callResp.Err = err
} else if val, ok := respMsg.(Resp); ok {
callResp.Value = val
} else {
callResp.Err = ErrTypeMismatch
}
}
return callResp
}
// -------------------------------------------------------------------------
// Iterator Helpers
// -------------------------------------------------------------------------
// ResponseSeq is an iterator that yields NodeResponse[T] values from a quorum call.
type ResponseSeq[T msg] iter.Seq[NodeResponse[T]]
// IgnoreErrors returns an iterator that yields only successful responses,
// discarding any responses with errors. This is useful when you want to process
// only valid responses from nodes.
//
// Example:
//
// responses := QuorumCall(ctx, Request_builder{Num: uint64(42)}.Build())
// var sum int32
// for resp := range responses.IgnoreErrors() {
// // resp is guaranteed to be a successful response
// sum += resp.Value.GetValue()
// }
func (seq ResponseSeq[Resp]) IgnoreErrors() ResponseSeq[Resp] {
return func(yield func(NodeResponse[Resp]) bool) {
for result := range seq {
if result.Err == nil {
if !yield(result) {
return
}
}
}
}
}
// Filter returns an iterator that yields only the responses for which the
// provided keep function returns true. This is useful for verifying or filtering
// responses from servers before further processing.
//
// Example:
//
// responses := QuorumCall(ctx, req)
// // Filter to only responses from a specific node
// for resp := range responses.Filter(func(r NodeResponse[Resp]) bool {
// return r.NodeID == 1
// }) {
// // process resp
// }
func (seq ResponseSeq[Resp]) Filter(keep func(NodeResponse[Resp]) bool) ResponseSeq[Resp] {
return func(yield func(NodeResponse[Resp]) bool) {
for result := range seq {
if keep(result) {
if !yield(result) {
return
}
}
}
}
}
// CollectN collects up to n responses, including errors, from the iterator
// into a map by node ID. It returns early if n responses are collected or
// the iterator is exhausted.
//
// Example:
//
// responses := QuorumCall(ctx, req)
// // Collect the first 2 responses (including errors)
// replies := responses.CollectN(2)
// // or collect 2 successful responses
// replies = responses.IgnoreErrors().CollectN(2)
func (seq ResponseSeq[Resp]) CollectN(n int) map[uint32]Resp {
replies := make(map[uint32]Resp, n)
for result := range seq {
replies[result.NodeID] = result.Value
if len(replies) >= n {
break
}
}
return replies
}
// CollectAll collects all responses, including errors, from the iterator
// into a map by node ID.
//
// Example:
//
// responses := QuorumCall(ctx, req)
// // Collect all responses (including errors)
// replies := responses.CollectAll()
// // or collect all successful responses
// replies = responses.IgnoreErrors().CollectAll()
func (seq ResponseSeq[Resp]) CollectAll() map[uint32]Resp {
replies := make(map[uint32]Resp)
for result := range seq {
replies[result.NodeID] = result.Value
}
return replies
}
// -------------------------------------------------------------------------
// Response Methods
// -------------------------------------------------------------------------
// Responses provides access to quorum call responses and terminal methods.
// It is returned by quorum call functions and allows fluent-style API usage:
//
// resp, err := ReadQuorumCall(ctx, req).Majority()
// // or
// resp, err := ReadQuorumCall(ctx, req).First()
// // or
// replies := ReadQuorumCall(ctx, req).IgnoreErrors().CollectAll()
//
// Type parameter:
// - Resp: The response message type
type Responses[Resp msg] struct {
ResponseSeq[Resp]
size int
sendNow func() // sendNow triggers immediate sending of requests
}
func NewResponses[Req, Resp msg](ctx *ClientCtx[Req, Resp]) *Responses[Resp] {
return &Responses[Resp]{
ResponseSeq: ctx.responseSeq,
size: ctx.Size(),
sendNow: func() { ctx.sendOnce.Do(ctx.send) },
}
}
// Size returns the number of nodes in the configuration.
func (r *Responses[Resp]) Size() int {
return r.size
}
// Seq returns the underlying response iterator that yields node responses as they arrive.
// It returns a single-use iterator. Users can use this to implement custom aggregation logic.
// This method triggers lazy sending of requests.
//
// The iterator will:
// - Yield responses as they arrive from nodes
// - Continue until the context is canceled or all expected responses have been received
// - Allow early termination by breaking from the range loop
//
// Example usage:
//
// for result := range ReadQuorumCall(ctx, req).Seq() {
// if result.Err != nil {
// // Handle node error
// continue
// }
// // Process result.Value
// }
func (r *Responses[Resp]) Seq() ResponseSeq[Resp] {
return r.ResponseSeq
}
// -------------------------------------------------------------------------
// Terminal Methods (Aggregators)
// -------------------------------------------------------------------------
// First returns the first successful response received from any node.
// This is useful for read-any patterns where any single response is sufficient.
func (r *Responses[Resp]) First() (Resp, error) {
return r.Threshold(1)
}
// Majority returns the first response once a simple majority (⌈(n+1)/2⌉)
// of successful responses are received.
func (r *Responses[Resp]) Majority() (Resp, error) {
quorumSize := r.size/2 + 1
return r.Threshold(quorumSize)
}
// All returns the first response once all nodes have responded successfully.
// If any node fails, it returns an error.
func (r *Responses[Resp]) All() (Resp, error) {
return r.Threshold(r.size)
}
// Threshold waits for a threshold number of successful responses.
// It returns the first response once the threshold is reached.
func (r *Responses[Resp]) Threshold(threshold int) (resp Resp, err error) {
var (
count int
errs []nodeError
)
for result := range r.ResponseSeq {
if result.Err != nil && !errors.Is(result.Err, ErrSkipNode) {
errs = append(errs, nodeError{nodeID: result.NodeID, cause: result.Err})
continue
}
if count == 0 {
resp = result.Value
}
count++
// Check if we have reached the threshold
if count >= threshold {
return resp, nil
}
}
return resp, QuorumCallError{cause: ErrIncomplete, errors: errs}
}