Skip to content

feat: Datar as DataRecorder, comma-separated RecorderType, remove DatarEnabled#692

Open
zhouzhuojie wants to merge 33 commits into
mainfrom
zz/research-datar
Open

feat: Datar as DataRecorder, comma-separated RecorderType, remove DatarEnabled#692
zhouzhuojie wants to merge 33 commits into
mainfrom
zz/research-datar

Conversation

@zhouzhuojie
Copy link
Copy Markdown
Collaborator

@zhouzhuojie zhouzhuojie commented May 22, 2026

Motivation

A simple, zero-dependency solution for trivial aggregate analytics built directly into Flagr. No external pipeline, no Kafka consumer, no separate analytics stack needed.

Counts evaluation events by (flag_id, variant_id, segment_id, hour) in-memory and periodically flushes to a datar_hourly_events table using additive UPSERT.

Key changes

Datar as a first-class DataRecorder. Datar now implements the DataRecorder interface (like Kafka/Kinesis/Pubsub) and is selected by listing "datar" in FLAGR_RECORDER_TYPE. The separate FLAGR_DATAR_ENABLED flag is removed.

Comma-separated RecorderType. FLAGR_RECORDER_TYPE=kafka,datar enables both. The field is now []string with envSeparator:",".

Master kill switch. FLAGR_RECORDER_ENABLED=false (default) disables all recorders including Datar. Set to true to activate whatever is listed in RecorderType.

Fan-out recorder. fanOutRecorder dispatches AsyncRecord to N recorders. Handles 0, 1, or N uniformly. noopRecorder removed.

Cleaner eval path. logEvalResult calls GetDataRecorder().AsyncRecord(*r) — zero knowledge of which recorders exist.

Aggregation moved to Engine. HandleGetDatarFlagSummary no longer contains inline aggregation logic. Engine.QueryFlagSummaryBreakdown handles variant/segment/day bucketing and sorting — the handler just converts engine types to swagger models.

Consistent constructor pattern. All recorders use New<Type>Recorder()NewDatarRecorder() matches NewKafkaRecorder() etc.

Dead code removed. aggregateFlagSummary deleted. fanOutRecorder.NewDataRecordFrame stub removed.

Quick Start

export FLAGR_RECORDER_ENABLED=true
export FLAGR_RECORDER_TYPE=kafka,datar
export FLAGR_RECORDER_DATAR_FLUSH_INTERVAL=60s

Per-flag opt-in via dataRecordsEnabled: true on the flag.

Endpoints

GET /api/v1/datar/summary — all flags with aggregate totals over a time window (default 7 days)

GET /api/v1/datar/flags/{flagID}/summary — per-flag breakdown by variant, segment, and day

Config changes

Old New
FLAGR_DATAR_ENABLED=true FLAGR_RECORDER_ENABLED=true + FLAGR_RECORDER_TYPE=kafka,datar
FLAGR_DATAR_FLUSH_INTERVAL=60s FLAGR_RECORDER_DATAR_FLUSH_INTERVAL=60s
RecorderType string RecorderType []string (comma-separated)

Files

  • pkg/datar/engine.go — In-memory aggregator, transactional flush. QueryFlagSummaryBreakdown for pre-aggregated flag breakdown. aggregateFlagSummary removed.
  • pkg/entity/datar.goHourlyEvent GORM model
  • pkg/handler/data_recorder.goGetDataRecorder() with RecorderEnabled master switch, comma-separated RecorderType, fanOutRecorder, removed noopRecorder
  • pkg/handler/data_recorder_datar.goNewDatarRecorder() constructor matching other recorder patterns
  • pkg/handler/datar.goGetDatar() singleton, hasDatar() helper
  • pkg/handler/datar_handler.go — Two HTTP handlers. HandleGetDatarFlagSummary delegates aggregation to engine.
  • pkg/handler/eval.goGetDataRecorder().AsyncRecord(*r) — no special cases
  • pkg/config/env.goRecorderType as []string, RecorderDatarFlushInterval moved to recorder block, removed DatarEnabled

Testing

  • 145+ Go tests passing across all packages
  • fanOutRecorder tested with 0, 1, N recorders
  • hasDatar tested with 6 subtables (nil, empty, with/without datar)
  • QueryFlagSummaryBreakdown tested at engine level (aggregation, nil engine)
  • Datar HTTP endpoints tested (summary, flag summary, pagination, disabled, query error)
  • Integration test validates real flow

@zhouzhuojie zhouzhuojie force-pushed the zz/research-datar branch 2 times, most recently from db82a6d to c3d645d Compare May 22, 2026 15:42
…shAggregates

The glebarez/sqlite pure-Go SQLite driver returns datetime values stored
by raw SQL functions (CURRENT_TIMESTAMP) as strings. When QuerySummary's
Raw().Scan() tries to scan MAX(updated_at) into *time.Time, the driver
returns a string that GORM cannot convert, causing a scan error.

Fix: pass time.Now() as a bound parameter instead of CURRENT_TIMESTAMP
in the UPSERT query, letting the SQL driver handle the type conversion
correctly.
…Go SQLite compatibility

The glebarez/sqlite driver (modernc.org/sqlite) returns all datetime
values as strings regardless of how they were stored - even time.Now()
bound parameters get serialized to text. GORM's Raw().Scan() cannot
convert these strings into *time.Time, causing a scan error.

Fix: change SummaryRow.LastEvaluatedAt from *time.Time to string, and
parse the RFC 3339 string back to time.Time in the handler before
converting to strfmt.DateTime for the API response.
…ggregation

The HandleGetDatarFlagSummary handler was making 3 separate raw-SQL
round-trips (QueryTraffic, QuerySegments, QueryTimeBuckets), each
filtering the same datar_hourly_events table with the same WHERE clause.

Replace with one GORM builder query (Table/Select/Joins/Where/Scan)
that fetches all matching raw rows, then aggregates into variant/
segment/day buckets in a single Go loop. This cuts DB round-trips from
3 to 1 for the /datar/flags/{flagID}/summary endpoint.

Changes:
- store.go: add RawEvent type + QueryFlagSummary using GORM query builder
- store.go: remove TrafficPoint, SegmentRow, TimeBucketRow types
- store.go: remove QueryTraffic, QuerySegments, QueryTimeBuckets methods
- store_test.go: replace 5 old tests with 2 QueryFlagSummary tests
- datar_handler.go: single QueryFlagSummary call + Go-level aggregation
… separately

SegmentDescription was identical for every row with the same segment_id
but was carried across hundreds of raw rows. Move it to a separate
SegmentDescriptions(ids) query that only fetches descriptions for the
distinct segment IDs that actually have data.
…atedAt

- Remove description from datarSegmentEntry swagger definition
- Remove SegmentDescriptions method from store (no longer needed)
- Simplify handler segment building: just SegmentID + EvalCount
- Regenerate swagger_gen models
…egment are sorted arrays with count

Changes:
- swagger: add datarVariantEntry model {variantID, count}
- swagger: trafficByVariant is now an array of datarVariantEntry (was map)
- swagger: rename evalCount → count in datarSegmentEntry
- handler: build variant array with sort (same as segments)
- handler: segIDs map keyed by int64 directly (no fmt.Sprintf)
- test: update assertions for array shape and count field
- integration: check for variantID, count instead of evalCount
- Remove start_test_datar wrapper (redundant wait-for-it, banner)
- step_13_test_datar now called directly from start_test like all steps
- Skip early with return 0 if service returns 503 (Datar not enabled)
- Only flagr_with_sqlite has Datar enabled, others gracefully skip
@zhouzhuojie zhouzhuojie changed the title feat: add Datar — in-memory aggregate analytics for Flagr feat: Datar — optional in-memory aggregate analytics for Flagr May 22, 2026
checkr/flagr:1.1.12 doesn't have the /datar route at all, returns 404
not 503. Changed the early-return check from [=503] to [!=200] to
handle any status indicating Datar is unavailable.
Add FLAGR_DATAR_ENABLED=true and FLAGR_DATAR_FLUSH_INTERVAL=500ms to
flagr_with_mysql, flagr_with_mysql8, flagr_with_postgres9, and
flagr_with_postgres13 so CI verifies Datar works across MySQL 5/8 and
PostgreSQL 9/13, not just SQLite.
…egates

The raw SQL UPSERT template used ON CONFLICT(...) DO UPDATE SET, which
is PostgreSQL/SQLite-only syntax. MySQL requires ON DUPLICATE KEY UPDATE.
Store now holds a dialect-aware flushQuery string instead of just the
value reference, so the full conflict clause differs per dialect.
Replace raw SQL loop with s.db.Clauses(clause.OnConflict{}).Create(&records),
which is a single batch INSERT with dialect-agnostic conflict resolution.
GORM handles ON CONFLICT(...) DO UPDATE SET (PG/SQLite) vs
ON DUPLICATE KEY UPDATE (MySQL) translation automatically.
Only the resolved-column reference prefix differs: EXCLUDED for
PG/SQLite, VALUES for MySQL.

Benefits:
- GORM manages column quoting and INSERT mapping
- Single batch query instead of N individual Exec calls
- Cleaner separation of dialect concerns
MySQL refers to the proposed insertion value via VALUES(col) (a function
call), not EXCLUDED.col (a pseudo-table reference). PG/SQLite use
EXCLUDED.col. Store the full dialect-specific expression strings in
initStore rather than trying to compose from a shared prefix.

Also use CURRENT_TIMESTAMP for updated_at on conflict — avoids needing
dialect-aware expression for that column.
Before: 5 files in pkg/datar/ (models.go, aggregator.go, store.go,
aggregator_test.go, store_test.go) + Entity model + handler wrapper.

After: 2 files in pkg/datar/ (engine.go, engine_test.go). One Engine
struct owns everything — buffer, flush loop, DB queries all in one
self-contained type.

Changes:
- Merge FlushKey, Aggregator, Store into single Engine struct
- Move lifecycle (flush loop, shutdown) into Engine
- Remove .Store() indirection: handlers call Engine.QuerySummary() directly
- Record() takes (flagID, variantID, segmentID) — no swagger model dep
- All methods nil-safe: no caller-side nil checks needed
- 91.7% test coverage across 27 tests
- Remove 60 lines of wrapper/indirection code
QuerySummary: Model(&entity.Flag{}) + Select + Joins + Group +
Order + Limit + Offset. GORM auto-filters deleted_at IS NULL
through the Flag entity's gorm.Model.

QueryFlagSummary: Model(&entity.HourlyEvent{}) instead of Table().

FlushAggregates was already using Clauses + Create (the GORM way).
Instead of LEFT JOIN datar_hourly_events directly on the flags table
(which scans every flag row even when most have zero traffic), first
aggregate events per flag in a subquery (touches only flags with traffic),
then LEFT JOIN the compact result to flags for COALESCE to 0.

This is more efficient when few flags carry real traffic.
- Replace closed-DB tests with DROP TABLE tests (SQLite recreates empty
  in-memory DB on reconnect, so closed-DB doesn't actually cause errors)
- Add TestFlush_DBError: covers flushAggregates error path in Shutdown
- Add TestFlushLoop_TickerFires: covers ticker path in flushLoop
- Remove stale duplicate test functions left by merge artifact

Uncovered remaining: MySQL dialect (needs MySQL), Record double-check
(race condition, ~30ns edge), flush error path, QueryFlagSummary error
path (coverage tool artifact — error IS logged in test output)
- TestParseTimeRange: covers from/to non-nil branches (75%→100%)
- TestDatarEndpoints_QueryError: covers 500 paths when DB queries fail
- Remove dead DatarStoreExists helper (unused after consolidation)
QuerySummary previously LEFT JOIN'd the aggregated subquery onto the
full flags table, then COALESCE'd zeros — forcing a scan of every flag
even when most have no traffic. Now uses INNER JOIN, so only flags with
actual evaluation events appear. The subquery aggregates events first
(bounded by time window, compact result), then JOINs for flag metadata.

This changes the API contract: flags with zero traffic in the time
window are excluded from the summary response.
…tion

- Response shapes: trafficByVariant is now []{variantID, count} not map
- Segment entries: removed description + evalCount renamed to count
- Summary: only flags with traffic appear (INNER JOIN, no COALESCE)
- Architecture: consolidated Engine type, not separate Aggregator/Store
- Added eval cache timing note
- File listing reflects engine.go, not old separate files
…rderType []string

- Remove aggregateFlagSummary dead code from engine
- Add datarRecorder adapter implementing DataRecorder
- Replace evalResultListeners with direct GetDataRecorder().AsyncRecord()
- Add fanOutRecorder for N-recorder support
- RecorderType is now []string with envSeparator comma support
- Remove DatarEnabled flag; list 'datar' in RecorderType instead
- Rename DatarFlushInterval → RecorderDatarFlushInterval
- Drop noopRecorder; empty fanOutRecorder handles null case
- Update all tests, docs, integration configs
…nfigs

- aggregateFlagSummary removed from engine (dead code)
- RecorderType changed to []string with envSeparator comma
- DatarEnabled config field removed
- DatarFlushInterval renamed to RecorderDatarFlushInterval
- logEvalResult simplified to single GetDataRecorder().AsyncRecord() call
- datarRecorder adapter implements DataRecorder interface
- fanOutRecorder handles 0-N recorders uniformly
- noopRecorder removed
- All docs, integration tests, and test stubs updated
@zhouzhuojie zhouzhuojie changed the title feat: Datar — optional in-memory aggregate analytics for Flagr feat: Datar as DataRecorder, comma-separated RecorderType, remove DatarEnabled May 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant