Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ stopping you from creating a client in any other language (see
- [reset_peer](#reset_peer)
- [slicer](#slicer)
- [limit_data](#limit_data)
- [packet_loss](#packet_loss)
- [HTTP API](#http-api)
- [Proxy fields:](#proxy-fields)
- [Toxic fields:](#toxic-fields)
Expand Down Expand Up @@ -446,6 +447,15 @@ Closes connection when transmitted data exceeded limit.

- `bytes`: number of bytes it should transmit before connection is closed

#### packet_loss

Randomly drops chunks flowing through the proxy simulating
flaky Wi-Fi, mobile, or satellite network conditions.

Attributes:
- `loss_rate`: probability [0.0–1.0] that a chunk is dropped (default 0.1)
- `correlation`: extra drop probability when the previous chunk was dropped, modeling burst loss (default 0.0)

### HTTP API

All communication with the Toxiproxy daemon from the client happens through the
Expand Down
130 changes: 130 additions & 0 deletions toxics/packet_loss.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package toxics

// PacketLossToxic randomly drops StreamChunks passing through the proxy,
// simulating packet loss / flaky network conditions.
//
// Attributes:
// - loss_rate : float64 probability [0.0–1.0] that a chunk is dropped
// (default 0.1 -> 10 %)
// - correlation : float64 probability [0.0–1.0] that the *next* chunk is
// also dropped when the previous one was (Gilbert-
// Elliott burst model; default 0.0 -> no burstiness)
//
// How it fits toxiproxy's pipeline:
//
// Client -> [noop] -> [packet_loss] -> [noop] -> Upstream
// |
// dropped chunks
// are discarded
//
// Registration happens automatically via init().

import (
"math/rand"
)

// PacketLossToxicState holds per-connection mutable state so that the
// main toxic struct (shared across connections) stays read-only.
type PacketLossToxicState struct {
// wasDropped records whether the previous chunk was dropped; used for
// burst-correlation logic.
wasDropped bool
// rng is a per-connection source so concurrent connections don't
// contend on a shared global rand.
rng *rand.Rand
}

// PacketLossToxic is the toxic struct. Fields are JSON-tagged to match
// toxiproxy's HTTP API convention.
type PacketLossToxic struct {
// LossRate is the baseline probability that any individual chunk is
// dropped. Range [0.0, 1.0]. Default 0.1 (10 %).
LossRate float64 `json:"loss_rate"`

// Correlation is the extra probability that the *next* chunk is dropped
// when the current one was dropped, modelling burst packet loss (Gilbert-
// Elliott model simplified). Range [0.0, 1.0]. Default 0.0.
Correlation float64 `json:"correlation"`
}

// NewState satisfies the StatefulToxic interface. toxiproxy calls this once
// per new connection so every connection gets its own RNG and drop state.
func (t *PacketLossToxic) NewState() interface{} {
return &PacketLossToxicState{
rng: rand.New(rand.NewSource(rand.Int63())),
}
}

// Pipe satisfies the Toxic interface. It reads chunks from stub.Input,
// decides whether to forward or drop each one, and writes survivors to
// stub.Output. It exits when the input channel is closed or an interrupt
// arrives.
func (t *PacketLossToxic) Pipe(stub *ToxicStub) {
state := stub.State.(*PacketLossToxicState)

// Clamp configuration to valid ranges once, up front.
lossRate := clamp(t.LossRate, 0.0, 1.0)
correlation := clamp(t.Correlation, 0.0, 1.0)

for {
select {
case <-stub.Interrupt:
// toxiproxy is removing or reconfiguring this toxic; drain cleanly.
return

case chunk, ok := <-stub.Input:
if !ok {
// Upstream closed the connection.
return
}

if t.shouldDrop(state, lossRate, correlation) {
// Drop: discard the chunk entirely. The byte slice is simply
// not forwarded – no close, no RST – mimicking a lost IP packet.
state.wasDropped = true
continue
}

state.wasDropped = false

// Forward the chunk unmodified.
select {
case stub.Output <- chunk:
case <-stub.Interrupt:
return
}
}
}
}

// shouldDrop returns true when the current chunk must be discarded.
// It implements a simplified Gilbert-Elliott two-state model:
// - In the "good" state -> drop with probability lossRate
// - In the "bad" state -> drop with probability (lossRate + correlation),
// where "bad" means the previous chunk was also dropped.
func (t *PacketLossToxic) shouldDrop(
state *PacketLossToxicState,
lossRate, correlation float64,
) bool {
p := lossRate
if state.wasDropped {
p = min(1.0, lossRate+correlation)
}
return state.rng.Float64() < p
}

// init registers the toxic with toxiproxy automatically at startup.
func init() {
Register("packet_loss", new(PacketLossToxic))
}

// clamp constrains v to the range [lo, hi].
func clamp(v, lo, hi float64) float64 {
if v < lo {
return lo
}
if v > hi {
return hi
}
return v
}
137 changes: 137 additions & 0 deletions toxics/packet_loss_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package toxics_test

import (
"testing"
"time"

"github.com/Shopify/toxiproxy/v2/stream"
"github.com/Shopify/toxiproxy/v2/toxics"
)

// TestPacketLossToxicNoLoss verifies that with loss_rate=0 every chunk passes.
func TestPacketLossToxicNoLoss(t *testing.T) {
toxic := &toxics.PacketLossToxic{LossRate: 0.0, Correlation: 0.0}
runPipeTest(t, toxic, 1000, 0)
}

// TestPacketLossToxicFullLoss verifies that with loss_rate=1 no chunk passes.
func TestPacketLossToxicFullLoss(t *testing.T) {
toxic := &toxics.PacketLossToxic{LossRate: 1.0, Correlation: 0.0}
runPipeTest(t, toxic, 100, 100)
}

// TestPacketLossToxicApproximateRate verifies drop rate is within ±10% of target.
func TestPacketLossToxicApproximateRate(t *testing.T) {
const targetRate = 0.20
toxic := &toxics.PacketLossToxic{LossRate: targetRate, Correlation: 0.0}

const n = 1000
dropped, _ := runPipeTestWithStats(t, toxic, n)

empirical := float64(dropped) / float64(n)
tolerance := 0.10

if empirical < targetRate-tolerance || empirical > targetRate+tolerance {
t.Errorf("empirical drop rate %.2f outside [%.2f, %.2f]",
empirical, targetRate-tolerance, targetRate+tolerance)
}
}

// TestPacketLossToxicIndependentConnections verifies independent RNG per connection.
func TestPacketLossToxicIndependentConnections(t *testing.T) {
toxic := &toxics.PacketLossToxic{LossRate: 0.5}
s1 := toxic.NewState()
s2 := toxic.NewState()

if s1 == s2 {
t.Error("NewState must return different instances per connection")
}
}

// TestPacketLossToxicPipeDrains confirms Pipe exits cleanly on interrupt.
func TestPacketLossToxicPipeDrains(t *testing.T) {
toxic := &toxics.PacketLossToxic{LossRate: 0.0}

input := make(chan *stream.StreamChunk, 1)
output := make(chan *stream.StreamChunk, 1)
interrupt := make(chan struct{})

stub := &toxics.ToxicStub{
Input: input,
Output: output,
Interrupt: interrupt,
State: toxic.NewState(),
}

done := make(chan struct{})
go func() {
toxic.Pipe(stub)
close(done)
}()

close(interrupt)

select {
case <-done:
// OK
case <-time.After(time.Second):
t.Fatal("Pipe did not exit within 1s after interrupt")
}
}

// runPipeTest sends n chunks and asserts exactly expectDropped were dropped.
func runPipeTest(t *testing.T, toxic *toxics.PacketLossToxic, n int, expectDropped int) {
t.Helper()
dropped, passed := runPipeTestWithStats(t, toxic, n)
if dropped != expectDropped {
t.Errorf("expected %d dropped, got %d (passed: %d)", expectDropped, dropped, passed)
}
}

// runPipeTestWithStats returns (dropped, passed) counts.
// It drains output concurrently to avoid blocking the pipe.
func runPipeTestWithStats(t *testing.T, toxic *toxics.PacketLossToxic, n int) (int, int) {
t.Helper()

// Buffer input generously so the sender never blocks.
input := make(chan *stream.StreamChunk, n)
// Unbuffered output — drained by a goroutine below.
output := make(chan *stream.StreamChunk)
interrupt := make(chan struct{})

stub := &toxics.ToxicStub{
Input: input,
Output: output,
Interrupt: interrupt,
State: toxic.NewState(),
}

// Fill input before starting Pipe so timing doesn't matter.
for i := 0; i < n; i++ {
input <- &stream.StreamChunk{
Data: []byte{byte(i % 256)},
Timestamp: time.Now(),
}
}
close(input)

// Drain output concurrently so Pipe never blocks on send.
passed := 0
drainDone := make(chan struct{})
go func() {
for range output {
passed++
}
close(drainDone)
}()

// Run Pipe — exits when input is closed.
toxic.Pipe(stub)

// Pipe has returned, close output so the drain goroutine finishes.
close(output)
<-drainDone

dropped := n - passed
return dropped, passed
}
Loading