Skip to content

Commit e0e6fd5

Browse files
committed
Add circuit breaker + format
1 parent c111f96 commit e0e6fd5

File tree

4 files changed

+924
-77
lines changed

4 files changed

+924
-77
lines changed

telemetry/DESIGN.md

Lines changed: 130 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ func (m *clientManager) releaseClient(host string) error {
379379

380380
### 3.3 circuitBreaker
381381

382-
**Purpose**: Implement circuit breaker pattern to protect against failing telemetry endpoint.
382+
**Purpose**: Implement circuit breaker pattern to protect against failing telemetry endpoint using a sliding window and failure rate percentage algorithm (matching JDBC's Resilience4j implementation).
383383

384384
**Location**: `telemetry/circuitbreaker.go`
385385

@@ -388,11 +388,28 @@ func (m *clientManager) releaseClient(host string) error {
388388
- **Not just rate limiting**: Protects against 5xx errors, timeouts, network failures
389389
- **Resource efficiency**: Prevents wasting resources on a failing endpoint
390390
- **Auto-recovery**: Automatically detects when endpoint becomes healthy again
391+
- **JDBC alignment**: Uses sliding window with failure rate percentage, matching JDBC driver behavior exactly
392+
393+
#### Algorithm: Sliding Window with Failure Rate
394+
The circuit breaker tracks recent calls in a **sliding window** (ring buffer) and calculates the **failure rate percentage**:
395+
- Tracks the last N calls (default: 30)
396+
- Opens circuit when failure rate >= threshold (default: 50%)
397+
- Requires minimum calls before evaluation (default: 20)
398+
- Uses percentage-based evaluation instead of consecutive failures
399+
400+
**Example**: With 30 calls in window, if 15 or more fail (50%), circuit opens. This is more robust than consecutive-failure counting as it considers overall reliability.
391401

392402
#### States
393403
1. **Closed**: Normal operation, requests pass through
394-
2. **Open**: After threshold failures, all requests rejected immediately (drop events)
395-
3. **Half-Open**: After timeout, allows test requests to check if endpoint recovered
404+
2. **Open**: After failure rate exceeds threshold, all requests rejected immediately (drop events)
405+
3. **Half-Open**: After wait duration, allows test requests to check if endpoint recovered
406+
407+
#### Configuration (matching JDBC defaults)
408+
- **failureRateThreshold**: 50% - Opens circuit if failure rate >= 50%
409+
- **minimumNumberOfCalls**: 20 - Minimum calls before evaluating failure rate
410+
- **slidingWindowSize**: 30 - Track last 30 calls in sliding window
411+
- **waitDurationInOpenState**: 30s - Wait before transitioning to half-open
412+
- **permittedCallsInHalfOpen**: 3 - Test with 3 successful calls before closing
396413

397414
#### Interface
398415

@@ -416,32 +433,53 @@ const (
416433
stateHalfOpen
417434
)
418435

436+
// callResult represents the result of a call (success or failure).
437+
type callResult bool
438+
439+
const (
440+
callSuccess callResult = true
441+
callFailure callResult = false
442+
)
443+
419444
// circuitBreaker implements the circuit breaker pattern.
445+
// It protects against failing telemetry endpoints by tracking failures
446+
// using a sliding window and failure rate percentage.
420447
type circuitBreaker struct {
421448
mu sync.RWMutex
422449

423450
state atomic.Int32 // circuitState
424-
failureCount int
425-
successCount int
426-
lastFailTime time.Time
427451
lastStateTime time.Time
428452

453+
// Sliding window for tracking calls
454+
window []callResult
455+
windowIndex int
456+
windowFilled bool
457+
totalCalls int
458+
failureCount int
459+
460+
// Half-open state tracking
461+
halfOpenSuccesses int
462+
429463
config circuitBreakerConfig
430464
}
431465

432466
// circuitBreakerConfig holds circuit breaker configuration.
433467
type circuitBreakerConfig struct {
434-
failureThreshold int // Open after N failures
435-
successThreshold int // Close after N successes in half-open
436-
timeout time.Duration // Try again after timeout
468+
failureRateThreshold int // Open if failure rate >= this percentage (0-100)
469+
minimumNumberOfCalls int // Minimum calls before evaluating failure rate
470+
slidingWindowSize int // Number of recent calls to track
471+
waitDurationInOpenState time.Duration // Wait before transitioning to half-open
472+
permittedCallsInHalfOpen int // Number of test calls in half-open state
437473
}
438474

439-
// defaultCircuitBreakerConfig returns default configuration.
475+
// defaultCircuitBreakerConfig returns default configuration matching JDBC.
440476
func defaultCircuitBreakerConfig() circuitBreakerConfig {
441477
return circuitBreakerConfig{
442-
failureThreshold: 5,
443-
successThreshold: 2,
444-
timeout: 1 * time.Minute,
478+
failureRateThreshold: 50, // 50% failure rate
479+
minimumNumberOfCalls: 20, // Minimum sample size
480+
slidingWindowSize: 30, // Keep recent 30 calls
481+
waitDurationInOpenState: 30 * time.Second,
482+
permittedCallsInHalfOpen: 3, // Test with 3 calls
445483
}
446484
}
447485

@@ -450,6 +488,7 @@ func newCircuitBreaker(cfg circuitBreakerConfig) *circuitBreaker {
450488
cb := &circuitBreaker{
451489
config: cfg,
452490
lastStateTime: time.Now(),
491+
window: make([]callResult, cfg.slidingWindowSize),
453492
}
454493
cb.state.Store(int32(stateClosed))
455494
return cb
@@ -464,9 +503,9 @@ func (cb *circuitBreaker) execute(ctx context.Context, fn func() error) error {
464503

465504
switch state {
466505
case stateOpen:
467-
// Check if timeout has passed
506+
// Check if wait duration has passed
468507
cb.mu.RLock()
469-
shouldRetry := time.Since(cb.lastStateTime) > cb.config.timeout
508+
shouldRetry := time.Since(cb.lastStateTime) > cb.config.waitDurationInOpenState
470509
cb.mu.RUnlock()
471510

472511
if shouldRetry {
@@ -491,48 +530,96 @@ func (cb *circuitBreaker) tryExecute(ctx context.Context, fn func() error) error
491530
err := fn()
492531

493532
if err != nil {
494-
cb.recordFailure()
533+
cb.recordCall(callFailure)
495534
return err
496535
}
497536

498-
cb.recordSuccess()
537+
cb.recordCall(callSuccess)
499538
return nil
500539
}
501540

502-
// recordFailure records a failure and potentially opens the circuit.
503-
func (cb *circuitBreaker) recordFailure() {
541+
// recordCall records a call result in the sliding window and evaluates state transitions.
542+
func (cb *circuitBreaker) recordCall(result callResult) {
504543
cb.mu.Lock()
505544
defer cb.mu.Unlock()
506545

507-
cb.failureCount++
508-
cb.successCount = 0
509-
cb.lastFailTime = time.Now()
510-
511546
state := circuitState(cb.state.Load())
512547

548+
// Handle half-open state specially
513549
if state == stateHalfOpen {
514-
// Failure in half-open immediately opens circuit
515-
cb.setStateUnlocked(stateOpen)
516-
} else if cb.failureCount >= cb.config.failureThreshold {
517-
cb.setStateUnlocked(stateOpen)
550+
if result == callFailure {
551+
// Any failure in half-open immediately reopens circuit
552+
cb.resetWindowUnlocked()
553+
cb.setStateUnlocked(stateOpen)
554+
return
555+
}
556+
557+
cb.halfOpenSuccesses++
558+
if cb.halfOpenSuccesses >= cb.config.permittedCallsInHalfOpen {
559+
// Enough successes to close circuit
560+
cb.resetWindowUnlocked()
561+
cb.setStateUnlocked(stateClosed)
562+
}
563+
return
564+
}
565+
566+
// Record in sliding window
567+
// Remove old value from count if window is full
568+
if cb.windowFilled && cb.window[cb.windowIndex] == callFailure {
569+
cb.failureCount--
570+
}
571+
572+
// Add new value
573+
cb.window[cb.windowIndex] = result
574+
if result == callFailure {
575+
cb.failureCount++
576+
}
577+
578+
// Move to next position
579+
cb.windowIndex = (cb.windowIndex + 1) % cb.config.slidingWindowSize
580+
if cb.windowIndex == 0 {
581+
cb.windowFilled = true
582+
}
583+
584+
cb.totalCalls++
585+
586+
// Evaluate if we should open the circuit
587+
if state == stateClosed {
588+
cb.evaluateStateUnlocked()
518589
}
519590
}
520591

521-
// recordSuccess records a success and potentially closes the circuit.
522-
func (cb *circuitBreaker) recordSuccess() {
523-
cb.mu.Lock()
524-
defer cb.mu.Unlock()
592+
// evaluateStateUnlocked checks if the circuit should open based on failure rate.
593+
// Caller must hold cb.mu lock.
594+
func (cb *circuitBreaker) evaluateStateUnlocked() {
595+
// Need minimum number of calls before evaluating
596+
windowSize := cb.totalCalls
597+
if cb.windowFilled {
598+
windowSize = cb.config.slidingWindowSize
599+
}
525600

526-
cb.failureCount = 0
527-
cb.successCount++
601+
if windowSize < cb.config.minimumNumberOfCalls {
602+
return
603+
}
528604

529-
state := circuitState(cb.state.Load())
605+
// Calculate failure rate
606+
failureRate := (cb.failureCount * 100) / windowSize
530607

531-
if state == stateHalfOpen && cb.successCount >= cb.config.successThreshold {
532-
cb.setStateUnlocked(stateClosed)
608+
if failureRate >= cb.config.failureRateThreshold {
609+
cb.setStateUnlocked(stateOpen)
533610
}
534611
}
535612

613+
// resetWindowUnlocked clears the sliding window.
614+
// Caller must hold cb.mu lock.
615+
func (cb *circuitBreaker) resetWindowUnlocked() {
616+
cb.windowIndex = 0
617+
cb.windowFilled = false
618+
cb.totalCalls = 0
619+
cb.failureCount = 0
620+
cb.halfOpenSuccesses = 0
621+
}
622+
536623
// setState transitions to a new state.
537624
func (cb *circuitBreaker) setState(newState circuitState) {
538625
cb.mu.Lock()
@@ -541,6 +628,7 @@ func (cb *circuitBreaker) setState(newState circuitState) {
541628
}
542629

543630
// setStateUnlocked transitions to a new state without locking.
631+
// Caller must hold cb.mu lock.
544632
func (cb *circuitBreaker) setStateUnlocked(newState circuitState) {
545633
oldState := circuitState(cb.state.Load())
546634
if oldState == newState {
@@ -549,14 +637,13 @@ func (cb *circuitBreaker) setStateUnlocked(newState circuitState) {
549637

550638
cb.state.Store(int32(newState))
551639
cb.lastStateTime = time.Now()
552-
cb.failureCount = 0
553-
cb.successCount = 0
554640

555641
// Log state transition at DEBUG level
556642
// logger.Debug().Msgf("circuit breaker: %v -> %v", oldState, newState)
557643
}
558644

559645
// circuitBreakerManager manages circuit breakers per host.
646+
// Each host gets its own circuit breaker to provide isolation.
560647
type circuitBreakerManager struct {
561648
mu sync.RWMutex
562649
breakers map[string]*circuitBreaker
@@ -578,6 +665,7 @@ func getCircuitBreakerManager() *circuitBreakerManager {
578665
}
579666

580667
// getCircuitBreaker gets or creates a circuit breaker for the host.
668+
// Thread-safe for concurrent access.
581669
func (m *circuitBreakerManager) getCircuitBreaker(host string) *circuitBreaker {
582670
m.mu.RLock()
583671
cb, exists := m.breakers[host]
@@ -1888,38 +1976,7 @@ func BenchmarkInterceptor_Disabled(b *testing.B) {
18881976

18891977
---
18901978

1891-
## 11. Partial Launch Strategy
1892-
1893-
### Launch Phases
1894-
1895-
**Phase 1: Internal Testing**
1896-
- Server flag: `enabled=false`
1897-
- Internal users use: `forceEnableTelemetry=true` via connection params
1898-
- Validate: implementation, performance, privacy
1899-
1900-
**Phase 2: Beta Opt-In**
1901-
- Server flag: `enabled=false`
1902-
- Beta users use: `enableTelemetry=true` via connection params
1903-
- Collect feedback, monitor data quality
1904-
1905-
**Phase 3: Server-Controlled Launch**
1906-
- Server flag: `enabled=true`
1907-
- Telemetry enabled for all users (who haven't opted out)
1908-
- Users can opt-out: `enableTelemetry=false`
1909-
1910-
**Phase 4 (Future): Gradual Rollout**
1911-
- Add rollout percentage support in feature flag API
1912-
- Enable for subset of users based on workspace ID
1913-
- Implement after Phase 1-3 are validated
1914-
1915-
### Rollback
1916-
- **Emergency**: Set server flag `enabled=false`
1917-
- **Effect**: Disables telemetry for all users (except `forceEnableTelemetry=true`)
1918-
- **Time**: ~15 minutes (feature flag cache TTL)
1919-
1920-
---
1921-
1922-
## 12. Implementation Checklist
1979+
## 11. Implementation Checklist
19231980

19241981
**Strategy**: Build infrastructure bottom-up: Circuit Breaker → Export (POST to endpoint) → Opt-In Configuration → Collection & Aggregation → Driver Integration. This allows unit testing each layer before adding metric collection.
19251982

@@ -2106,16 +2163,16 @@ func BenchmarkInterceptor_Disabled(b *testing.B) {
21062163

21072164
---
21082165

2109-
## 13. References
2166+
## 12. References
21102167

2111-
### 13.1 Go Standards
2168+
### 12.1 Go Standards
21122169
- [context package](https://pkg.go.dev/context)
21132170
- [database/sql/driver](https://pkg.go.dev/database/sql/driver)
21142171
- [net/http](https://pkg.go.dev/net/http)
21152172
- [sync package](https://pkg.go.dev/sync)
21162173
- [Effective Go](https://go.dev/doc/effective_go)
21172174

2118-
### 13.2 Existing Code References
2175+
### 12.2 Existing Code References
21192176

21202177
**Databricks SQL Go Driver**:
21212178
- `connection.go`: Connection management

0 commit comments

Comments
 (0)