Skip to content

Commit 62804bf

Browse files
feat(throughput): add NotReceivedP2PEventCount to ThroughoutMeasure
1 parent 5b7e232 commit 62804bf

File tree

1 file changed

+21
-13
lines changed
  • pkg/coordinator/tasks/tx_pool_throughput_analysis

1 file changed

+21
-13
lines changed

pkg/coordinator/tasks/tx_pool_throughput_analysis/task.go

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ var (
2727
)
2828

2929
type ThroughoutMeasure struct {
30-
LoadTPS int `json:"load_tps"`
31-
ProcessedTPS int `json:"processed_tps"`
30+
LoadTPS int `json:"load_tps"`
31+
ProcessedTPS int `json:"processed_tps"`
32+
NotReceivedP2PEventCount int `json:"not_received_p2p_event_count"`
3233
}
3334

3435
type Task struct {
@@ -136,9 +137,11 @@ func (t *Task) Execute(ctx context.Context) error {
136137
t.logger.Infof("Iterating over the TPS range, starting TPS: %d, ending TPS: %d, increment TPS: %d",
137138
t.config.StartingTPS, t.config.EndingTPS, t.config.IncrementTPS)
138139

140+
missedP2PEventCount := 0
141+
139142
for sendingTps := t.config.StartingTPS; sendingTps <= t.config.EndingTPS; sendingTps += t.config.IncrementTPS {
140143
// measure the throughput with the current sendingTps
141-
processedTps, err := t.measureTpsWithLoad(loadTarget, sendingTps, t.config.DurationS, singleMeasureDeadline, percentile)
144+
processedTps, notReceivedP2PEventCount, err := t.measureTpsWithLoad(loadTarget, sendingTps, t.config.DurationS, singleMeasureDeadline, percentile)
142145
if err != nil {
143146
t.logger.Errorf("Error during throughput measurement with sendingTps=%d, duration=%d: %v", sendingTps, t.config.DurationS, err)
144147
t.ctx.SetResult(types.TaskResultFailure)
@@ -148,9 +151,12 @@ func (t *Task) Execute(ctx context.Context) error {
148151

149152
// add to throughoutMeasures
150153
throughoutMeasures = append(throughoutMeasures, ThroughoutMeasure{
151-
LoadTPS: sendingTps,
152-
ProcessedTPS: processedTps,
154+
LoadTPS: sendingTps,
155+
ProcessedTPS: processedTps,
156+
NotReceivedP2PEventCount: notReceivedP2PEventCount,
153157
})
158+
159+
missedP2PEventCount += notReceivedP2PEventCount
154160
}
155161

156162
t.logger.Infof("Finished measuring throughput, collected %d measures", len(throughoutMeasures))
@@ -160,7 +166,8 @@ func (t *Task) Execute(ctx context.Context) error {
160166
t.ctx.Outputs.SetVar("throughput_measures", throughoutMeasures) // log coordinated_omission_event_count and missed_p2p_event_count?
161167

162168
outputs := map[string]interface{}{
163-
"throughput_measures": throughoutMeasures,
169+
"throughput_measures": throughoutMeasures,
170+
"missed_p2p_event_count": missedP2PEventCount,
164171
}
165172

166173
outputsJSON, _ := json.Marshal(outputs)
@@ -173,7 +180,7 @@ func (t *Task) Execute(ctx context.Context) error {
173180
}
174181

175182
func (t *Task) measureTpsWithLoad(loadTarget *txloadtool.LoadTarget, sendingTps, durationS int,
176-
testDeadline time.Time, percentile float64) (int, error) {
183+
testDeadline time.Time, percentile float64) (processedTps, notReceivedP2PEventCount int, err error) {
177184
t.logger.Infof("Single measure of throughput, sending TPS: %d, duration: %d secs", sendingTps, durationS)
178185

179186
// Prepare to collect transaction latencies
@@ -185,7 +192,7 @@ func (t *Task) measureTpsWithLoad(loadTarget *txloadtool.LoadTarget, sendingTps,
185192
t.logger.Errorf("Error during transaction load execution: %v", execErr)
186193
t.ctx.SetResult(types.TaskResultFailure)
187194

188-
return 0, execErr
195+
return 0, 0, execErr
189196
}
190197

191198
// Collect the transactions and their latencies
@@ -194,12 +201,12 @@ func (t *Task) measureTpsWithLoad(loadTarget *txloadtool.LoadTarget, sendingTps,
194201
t.logger.Errorf("Error measuring transaction propagation latencies: %v", measureErr)
195202
t.ctx.SetResult(types.TaskResultFailure)
196203

197-
return 0, measureErr
204+
return 0, 0, measureErr
198205
}
199206

200207
// Check if the context was cancelled or other errors occurred
201208
if result.Failed {
202-
return 0, fmt.Errorf("error measuring transaction propagation latencies: load failed")
209+
return 0, 0, fmt.Errorf("error measuring transaction propagation latencies: load failed")
203210
}
204211

205212
// Send txes to other clients, for speeding up tx mining
@@ -226,19 +233,20 @@ func (t *Task) measureTpsWithLoad(loadTarget *txloadtool.LoadTarget, sendingTps,
226233
// Calculate the percentile of latencies using result.LatenciesMus
227234
// Not implemented yet
228235
notImpl := errors.New("percentile selection not implemented, use 0.99")
229-
return 0, notImpl
236+
return 0, 0, notImpl
230237
}
231238

232239
t.logger.Infof("Using 0.99 percentile for latency calculation")
233240

234241
t.logger.Infof("Last measure delay since start time: %s", result.LastMeasureDelay)
235242

236243
processedTpsF := float64(result.TotalTxs) / result.LastMeasureDelay.Seconds()
237-
processedTps := int(processedTpsF) // round
244+
processedTps = int(processedTpsF) // round
245+
notReceivedP2PEventCount = result.NotReceivedP2PEventCount
238246

239247
t.logger.Infof("Processed %d transactions in %.2fs, mean throughput: %.2f tx/s",
240248
result.TotalTxs, result.LastMeasureDelay.Seconds(), processedTpsF)
241249
t.logger.Infof("Sent %d transactions in %.2fs", result.TotalTxs, result.LastMeasureDelay.Seconds())
242250

243-
return processedTps, nil
251+
return processedTps, notReceivedP2PEventCount, nil
244252
}

0 commit comments

Comments
 (0)