Skip to content

Commit daf57dc

Browse files
committed
improving internal structure and responsibilities of various components
1 parent e24c9ba commit daf57dc

File tree

9 files changed

+376
-347
lines changed

9 files changed

+376
-347
lines changed

cmd/root.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ import (
1313
"github.com/spf13/cobra"
1414
"github.com/symonk/vessel/internal/collector"
1515
"github.com/symonk/vessel/internal/config"
16-
"github.com/symonk/vessel/internal/requester"
16+
"github.com/symonk/vessel/internal/coordinator"
17+
"github.com/symonk/vessel/internal/stats"
1718
"github.com/symonk/vessel/internal/validation"
1819
)
1920

@@ -88,7 +89,8 @@ var rootCmd = &cobra.Command{
8889
}
8990

9091
// build the single req req to clone later.
91-
req, err := requester.GenerateTemplateRequest(cfg)
92+
// TODO: should not be the responsibility of a 'coordinator'.
93+
req, err := coordinator.GenerateTemplateRequest(cfg)
9294
if err != nil {
9395
return fmt.Errorf("unable to create request: %v", err)
9496
}
@@ -147,22 +149,25 @@ var rootCmd = &cobra.Command{
147149
}
148150
req.Header.Set(userAgentHeader, cfg.UserAgent)
149151

150-
collector := collector.New(out, cfg)
152+
resultsChan := make(chan *stats.Stats, cfg.Concurrency)
153+
collector := collector.New(resultsChan, out, cfg)
151154

152-
// command ctx already has the signalling capabilities.
153-
// if duration is specified, wrap the ctx with that dead line
154-
// to cause Go() to exit and Wait() to unblock.
155+
// Enable signal handling to abort when requested (gracefully)
156+
// finish in flight requests and summarise work that was completed
157+
// prior.
155158
parent := cmd.Context()
156159
ctx, cancel := signal.NotifyContext(parent, os.Interrupt, syscall.SIGTERM)
157160
defer cancel()
158161

159-
requester := requester.New(
162+
coordinator := coordinator.New(
160163
ctx,
164+
resultsChan,
161165
cfg,
162166
collector,
163167
req,
164168
)
165-
requester.Wait()
169+
coordinator.Wait()
170+
close(resultsChan)
166171
collector.Summarise()
167172
return nil
168173
},

internal/collector/collector.go

Lines changed: 46 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,14 @@ import (
55
"fmt"
66
"html/template"
77
"io"
8-
"net/http"
98
"runtime"
10-
"sync"
11-
"sync/atomic"
129
"time"
1310

1411
"github.com/HdrHistogram/hdrhistogram-go"
1512
"github.com/symonk/vessel/internal/config"
16-
"github.com/symonk/vessel/internal/trace"
13+
"github.com/symonk/vessel/internal/stats"
1714
)
1815

19-
// Recorder is the interface for something which records metrics from
20-
// a HTTP Request->Response interactions.
21-
type Recorder interface {
22-
Record(response *http.Response, latency time.Duration, receivedBytes int64, sentBytes int64, err error)
23-
}
24-
2516
// Summariser is the interface for something which can display summary
2617
// information.
2718
type Summariser interface {
@@ -30,7 +21,6 @@ type Summariser interface {
3021

3122
type ResultCollector interface {
3223
Summariser
33-
Recorder
3424
}
3525

3626
// EventCollector collects execution data during the lifecycle of
@@ -55,76 +45,67 @@ type EventCollector struct {
5545
collectionRegistered time.Time
5646
rawErrors error
5747
errGrouper *ErrorGrouper
58-
mu sync.Mutex
5948
latency hdrhistogram.Histogram
60-
bytesReceived atomic.Int64
61-
bytesSent atomic.Int64
49+
bytesReceived int64
50+
bytesSent int64
6251
waitingDns time.Duration
6352
waitingTls time.Duration
6453
waitingConnect time.Duration
65-
newConnections atomic.Int64
54+
newConnections int64
6655
waitingGetConn time.Duration
56+
ingress chan *stats.Stats
6757
}
6858

69-
func New(writer io.Writer, cfg *config.Config) *EventCollector {
70-
return &EventCollector{
59+
func New(ingress chan *stats.Stats, writer io.Writer, cfg *config.Config) *EventCollector {
60+
e := &EventCollector{
7161
counter: NewStatusCodeCounter(),
7262
cfg: cfg,
7363
writer: writer,
7464
collectionRegistered: time.Now(),
7565
latency: *hdrhistogram.New(1, 60000, 3),
7666
rawErrors: nil,
7767
errGrouper: NewErrGrouper(),
68+
ingress: ingress,
7869
}
70+
go e.listen()
71+
return e
7972
}
8073

81-
// Record captures information about the completed request.
82-
// It keeps mutex locking to a minimum where possible and favours
83-
// CPU atomic operations where possible.
84-
func (e *EventCollector) Record(response *http.Response, latency time.Duration, bytesReceived int64, bytesSent int64, err error) {
85-
// It is possible response is nil in error cases.
86-
// Keep a reference to the error, we will categorise them later
87-
// based on the different types.
88-
if err != nil {
89-
e.mu.Lock()
90-
defer e.mu.Unlock()
91-
e.rawErrors = errors.Join(e.rawErrors, err)
92-
e.errGrouper.Record(err)
93-
// TODO: Error grouping for smarter summarising.
94-
// TODO: Implement a way to 'classify' the errors into appropriate groups.
95-
return
96-
}
97-
98-
// Pull out 'trace' data from the requests to paint a better picture in the summary
99-
// of how time was spent from a granular point of view.
100-
v := response.Request.Context().Value(trace.TraceDataKey)
101-
t, ok := v.(*trace.Trace)
102-
if ok {
103-
e.mu.Lock()
104-
e.waitingDns += t.DnsDone
105-
e.waitingTls += t.TlsDone
106-
e.waitingConnect += t.ConnectDone
107-
e.waitingGetConn += t.GotConnection
108-
e.mu.Unlock()
74+
// listen waits for stats from the worker pool before incremental internal
75+
// values in preparation for summary generation later.
76+
//
77+
// For now this is a single listener, but eventually the channel can be fanned
78+
// out for reads and merged back into a single result chan for efficiency.
79+
func (e *EventCollector) listen() {
80+
fmt.Println("listening for stats")
81+
for stat := range e.ingress {
82+
err := stat.Err
83+
if err != nil {
84+
e.rawErrors = errors.Join(e.rawErrors, err)
85+
e.errGrouper.Record(err)
86+
}
87+
e.waitingDns += stat.TimeOnDns
88+
e.waitingTls += stat.TimeOnTls
89+
e.waitingConnect += stat.TimeOnConnect
90+
e.waitingGetConn += stat.TimeOnConn
91+
92+
// We have a semi-successful response (in that sense that no error was returned)
93+
// Capture the histogram data for the latency of the response.
94+
e.latency.RecordValue(stat.Latency.Milliseconds())
95+
e.counter.Increment(stat.StatusCode)
96+
97+
// Track the byte size of the initial request aswell as content type of
98+
// the response from the server. The collector is not responsible for
99+
// reading the response, this should be handled elsewhere to ensure safety
100+
// of reading responses and avoiding attempting multiple reads etc.
101+
e.bytesReceived += stat.BytesReceived
102+
e.bytesSent += stat.BytesSent
103+
104+
// Keep track of keep-alives etc, useful for detecting if there is an issue
105+
// with your server, or our client.
106+
e.newConnections += stat.ReusedConn
109107
}
110108

111-
// We have a semi-successful response (in that sense that no error was returned)
112-
// Capture the histogram data for the latency of the response.
113-
e.latency.RecordValue(latency.Milliseconds())
114-
e.counter.Increment(response.StatusCode)
115-
116-
// Track the byte size of the initial request aswell as content type of
117-
// the response from the server. The collector is not responsible for
118-
// reading the response, this should be handled elsewhere to ensure safety
119-
// of reading responses and avoiding attempting multiple reads etc.
120-
e.bytesReceived.Add(bytesReceived)
121-
e.bytesSent.Add(bytesSent)
122-
123-
// Keep track of keep-alives etc, useful for detecting if there is an issue
124-
// with your server, or our client.
125-
if !t.ReusedConnection {
126-
e.newConnections.Add(1)
127-
}
128109
}
129110

130111
// Summarise calculates the final summary prior to exiting.
@@ -179,11 +160,11 @@ Waiting: {{.Waiting}}
179160
`
180161
// TODO: Smarter use of different terms, if the test was < 1MB transffered for example
181162
// fallback to bytesReceived/sec etc etc.
182-
bytesReceived := e.bytesReceived.Load()
163+
bytesReceived := e.bytesReceived
183164
receivedBytesPerSecond := (bytesReceived / int64(seconds))
184165
receivedMegabytes := float64(receivedBytesPerSecond) / 1_000_000
185166

186-
bytesSent := e.bytesSent.Load()
167+
bytesSent := e.bytesSent
187168
sentBytesPerSecond := (bytesSent / int64(seconds))
188169
sentMegabytes := float64(sentBytesPerSecond) / 1_000_000
189170

@@ -225,7 +206,7 @@ Waiting: {{.Waiting}}
225206
Workers: e.cfg.Concurrency,
226207
Version: e.cfg.Version,
227208
Waiting: waiting,
228-
OpenedConnections: e.newConnections.Load(),
209+
OpenedConnections: e.newConnections,
229210
MaxProcs: runtime.GOMAXPROCS(0),
230211
BytesTotal: fmt.Sprintf("%.2FMB", bytesTotal),
231212
}
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package coordinator
2+
3+
import (
4+
"context"
5+
"crypto/tls"
6+
"net/http"
7+
"sync"
8+
"time"
9+
10+
"github.com/symonk/vessel/internal/collector"
11+
"github.com/symonk/vessel/internal/config"
12+
"github.com/symonk/vessel/internal/stats"
13+
"github.com/symonk/vessel/internal/worker"
14+
)
15+
16+
// Coordinator sends HTTP requests to a server (typically at scale) and
17+
// can be signalled to wait until all requests have finalized through
18+
// Wait()
19+
type Coordinator interface {
20+
Wait()
21+
}
22+
23+
// RequestCoordinator takes a request and fans out many instances
24+
// of that request until either the maximum count is reached
25+
// or the duration has been surpassed.
26+
type RequestCoordinator struct {
27+
ctx context.Context // Parent cancelled on signal
28+
collector collector.ResultCollector
29+
out chan<- *stats.Stats
30+
cfg *config.Config
31+
client *http.Client
32+
template *http.Request
33+
workerCh chan *http.Request
34+
wg sync.WaitGroup
35+
}
36+
37+
// New instantiates a new instance of RequestCoordinator and returns
38+
// the ptr to it.
39+
func New(ctx context.Context, out chan<- *stats.Stats, cfg *config.Config, collector collector.ResultCollector, template *http.Request) *RequestCoordinator {
40+
maxWorkers := max(1, cfg.Concurrency)
41+
r := &RequestCoordinator{
42+
ctx: ctx,
43+
collector: collector,
44+
cfg: cfg,
45+
out: out,
46+
client: &http.Client{
47+
Timeout: cfg.Timeout,
48+
Transport: NewRateLimitingTransport(
49+
cfg.MaxRPS,
50+
&RateLimitingTransport{
51+
// TODO: Overhaul this.
52+
Next: &http.Transport{
53+
Proxy: http.ProxyFromEnvironment,
54+
ForceAttemptHTTP2: true,
55+
MaxConnsPerHost: cfg.MaxConnections,
56+
IdleConnTimeout: 90 * time.Second,
57+
TLSHandshakeTimeout: 10 * time.Second,
58+
ExpectContinueTimeout: 1 * time.Second,
59+
TLSClientConfig: &tls.Config{
60+
// Skip server verification checks. Enables testing against
61+
// self signed/expired certs, wrong domain or untrusted.
62+
InsecureSkipVerify: cfg.Insecure,
63+
},
64+
},
65+
}),
66+
},
67+
template: template,
68+
workerCh: make(chan *http.Request, maxWorkers),
69+
}
70+
r.wg.Add(maxWorkers)
71+
go r.spawn(maxWorkers)
72+
return r
73+
}
74+
75+
// Wait waits until all requests are finished and all workers
76+
// have cleanly shutdown.
77+
func (h *RequestCoordinator) Wait() {
78+
h.wg.Wait()
79+
}
80+
81+
// spawn fans out workers in the pool upto the configured
82+
// concurrency.
83+
func (h *RequestCoordinator) spawn(count int) {
84+
for range count {
85+
w := worker.New(h.client, h.workerCh, h.out, &h.wg, h.ctx, h.cfg)
86+
go w.Accept()
87+
}
88+
89+
// Asynchronously load requests into the queue.
90+
// Depending on -d or -a (duration || amount) the strategy
91+
// for loading requests onto the queues differs.
92+
var seen int64
93+
var tick <-chan time.Time
94+
if dur := h.cfg.Duration; dur > 0 {
95+
ticker := time.NewTicker(dur)
96+
defer ticker.Stop()
97+
tick = ticker.C
98+
}
99+
100+
defer func() {
101+
close(h.workerCh)
102+
}()
103+
for {
104+
select {
105+
case <-tick:
106+
// if a duration was set, we have reached it.
107+
// gracefully exit.
108+
// nil channel otherwise (never selects/blocks)
109+
return
110+
case <-h.ctx.Done():
111+
// A signal was received, cause a graceful exit
112+
return
113+
default:
114+
// keep track of seen requests and keep providing requests
115+
// to workers as fast as possible.
116+
if tick == nil && seen == h.cfg.Amount {
117+
return
118+
}
119+
seen++
120+
h.workerCh <- h.template
121+
}
122+
}
123+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package requester
1+
package coordinator
22

33
import (
44
"net/http"
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package requester
1+
package coordinator
22

33
import (
44
"net/http"

0 commit comments

Comments
 (0)