feat: Datar as DataRecorder, comma-separated RecorderType, remove DatarEnabled#692
Open
zhouzhuojie wants to merge 33 commits into
Open
feat: Datar as DataRecorder, comma-separated RecorderType, remove DatarEnabled#692zhouzhuojie wants to merge 33 commits into
zhouzhuojie wants to merge 33 commits into
Conversation
db82a6d to
c3d645d
Compare
c3d645d to
74839a2
Compare
…shAggregates The glebarez/sqlite pure-Go SQLite driver returns datetime values stored by raw SQL functions (CURRENT_TIMESTAMP) as strings. When QuerySummary's Raw().Scan() tries to scan MAX(updated_at) into *time.Time, the driver returns a string that GORM cannot convert, causing a scan error. Fix: pass time.Now() as a bound parameter instead of CURRENT_TIMESTAMP in the UPSERT query, letting the SQL driver handle the type conversion correctly.
29c8de0 to
e093c00
Compare
…Go SQLite compatibility The glebarez/sqlite driver (modernc.org/sqlite) returns all datetime values as strings regardless of how they were stored - even time.Now() bound parameters get serialized to text. GORM's Raw().Scan() cannot convert these strings into *time.Time, causing a scan error. Fix: change SummaryRow.LastEvaluatedAt from *time.Time to string, and parse the RFC 3339 string back to time.Time in the handler before converting to strfmt.DateTime for the API response.
…ggregation
The HandleGetDatarFlagSummary handler was making 3 separate raw-SQL
round-trips (QueryTraffic, QuerySegments, QueryTimeBuckets), each
filtering the same datar_hourly_events table with the same WHERE clause.
Replace with one GORM builder query (Table/Select/Joins/Where/Scan)
that fetches all matching raw rows, then aggregates into variant/
segment/day buckets in a single Go loop. This cuts DB round-trips from
3 to 1 for the /datar/flags/{flagID}/summary endpoint.
Changes:
- store.go: add RawEvent type + QueryFlagSummary using GORM query builder
- store.go: remove TrafficPoint, SegmentRow, TimeBucketRow types
- store.go: remove QueryTraffic, QuerySegments, QueryTimeBuckets methods
- store_test.go: replace 5 old tests with 2 QueryFlagSummary tests
- datar_handler.go: single QueryFlagSummary call + Go-level aggregation
… separately SegmentDescription was identical for every row with the same segment_id but was carried across hundreds of raw rows. Move it to a separate SegmentDescriptions(ids) query that only fetches descriptions for the distinct segment IDs that actually have data.
…atedAt - Remove description from datarSegmentEntry swagger definition - Remove SegmentDescriptions method from store (no longer needed) - Simplify handler segment building: just SegmentID + EvalCount - Regenerate swagger_gen models
…egment are sorted arrays with count
Changes:
- swagger: add datarVariantEntry model {variantID, count}
- swagger: trafficByVariant is now an array of datarVariantEntry (was map)
- swagger: rename evalCount → count in datarSegmentEntry
- handler: build variant array with sort (same as segments)
- handler: segIDs map keyed by int64 directly (no fmt.Sprintf)
- test: update assertions for array shape and count field
- integration: check for variantID, count instead of evalCount
- Remove start_test_datar wrapper (redundant wait-for-it, banner) - step_13_test_datar now called directly from start_test like all steps - Skip early with return 0 if service returns 503 (Datar not enabled) - Only flagr_with_sqlite has Datar enabled, others gracefully skip
39a630d to
5f22416
Compare
checkr/flagr:1.1.12 doesn't have the /datar route at all, returns 404 not 503. Changed the early-return check from [=503] to [!=200] to handle any status indicating Datar is unavailable.
Add FLAGR_DATAR_ENABLED=true and FLAGR_DATAR_FLUSH_INTERVAL=500ms to flagr_with_mysql, flagr_with_mysql8, flagr_with_postgres9, and flagr_with_postgres13 so CI verifies Datar works across MySQL 5/8 and PostgreSQL 9/13, not just SQLite.
…egates The raw SQL UPSERT template used ON CONFLICT(...) DO UPDATE SET, which is PostgreSQL/SQLite-only syntax. MySQL requires ON DUPLICATE KEY UPDATE. Store now holds a dialect-aware flushQuery string instead of just the value reference, so the full conflict clause differs per dialect.
Replace raw SQL loop with s.db.Clauses(clause.OnConflict{}).Create(&records),
which is a single batch INSERT with dialect-agnostic conflict resolution.
GORM handles ON CONFLICT(...) DO UPDATE SET (PG/SQLite) vs
ON DUPLICATE KEY UPDATE (MySQL) translation automatically.
Only the resolved-column reference prefix differs: EXCLUDED for
PG/SQLite, VALUES for MySQL.
Benefits:
- GORM manages column quoting and INSERT mapping
- Single batch query instead of N individual Exec calls
- Cleaner separation of dialect concerns
MySQL refers to the proposed insertion value via VALUES(col) (a function call), not EXCLUDED.col (a pseudo-table reference). PG/SQLite use EXCLUDED.col. Store the full dialect-specific expression strings in initStore rather than trying to compose from a shared prefix. Also use CURRENT_TIMESTAMP for updated_at on conflict — avoids needing dialect-aware expression for that column.
Before: 5 files in pkg/datar/ (models.go, aggregator.go, store.go, aggregator_test.go, store_test.go) + Entity model + handler wrapper. After: 2 files in pkg/datar/ (engine.go, engine_test.go). One Engine struct owns everything — buffer, flush loop, DB queries all in one self-contained type. Changes: - Merge FlushKey, Aggregator, Store into single Engine struct - Move lifecycle (flush loop, shutdown) into Engine - Remove .Store() indirection: handlers call Engine.QuerySummary() directly - Record() takes (flagID, variantID, segmentID) — no swagger model dep - All methods nil-safe: no caller-side nil checks needed - 91.7% test coverage across 27 tests - Remove 60 lines of wrapper/indirection code
QuerySummary: Model(&entity.Flag{}) + Select + Joins + Group +
Order + Limit + Offset. GORM auto-filters deleted_at IS NULL
through the Flag entity's gorm.Model.
QueryFlagSummary: Model(&entity.HourlyEvent{}) instead of Table().
FlushAggregates was already using Clauses + Create (the GORM way).
Instead of LEFT JOIN datar_hourly_events directly on the flags table (which scans every flag row even when most have zero traffic), first aggregate events per flag in a subquery (touches only flags with traffic), then LEFT JOIN the compact result to flags for COALESCE to 0. This is more efficient when few flags carry real traffic.
- Replace closed-DB tests with DROP TABLE tests (SQLite recreates empty in-memory DB on reconnect, so closed-DB doesn't actually cause errors) - Add TestFlush_DBError: covers flushAggregates error path in Shutdown - Add TestFlushLoop_TickerFires: covers ticker path in flushLoop - Remove stale duplicate test functions left by merge artifact Uncovered remaining: MySQL dialect (needs MySQL), Record double-check (race condition, ~30ns edge), flush error path, QueryFlagSummary error path (coverage tool artifact — error IS logged in test output)
- TestParseTimeRange: covers from/to non-nil branches (75%→100%) - TestDatarEndpoints_QueryError: covers 500 paths when DB queries fail - Remove dead DatarStoreExists helper (unused after consolidation)
QuerySummary previously LEFT JOIN'd the aggregated subquery onto the full flags table, then COALESCE'd zeros — forcing a scan of every flag even when most have no traffic. Now uses INNER JOIN, so only flags with actual evaluation events appear. The subquery aggregates events first (bounded by time window, compact result), then JOINs for flag metadata. This changes the API contract: flags with zero traffic in the time window are excluded from the summary response.
…tion
- Response shapes: trafficByVariant is now []{variantID, count} not map
- Segment entries: removed description + evalCount renamed to count
- Summary: only flags with traffic appear (INNER JOIN, no COALESCE)
- Architecture: consolidated Engine type, not separate Aggregator/Store
- Added eval cache timing note
- File listing reflects engine.go, not old separate files
…rderType []string - Remove aggregateFlagSummary dead code from engine - Add datarRecorder adapter implementing DataRecorder - Replace evalResultListeners with direct GetDataRecorder().AsyncRecord() - Add fanOutRecorder for N-recorder support - RecorderType is now []string with envSeparator comma support - Remove DatarEnabled flag; list 'datar' in RecorderType instead - Rename DatarFlushInterval → RecorderDatarFlushInterval - Drop noopRecorder; empty fanOutRecorder handles null case - Update all tests, docs, integration configs
…nfigs - aggregateFlagSummary removed from engine (dead code) - RecorderType changed to []string with envSeparator comma - DatarEnabled config field removed - DatarFlushInterval renamed to RecorderDatarFlushInterval - logEvalResult simplified to single GetDataRecorder().AsyncRecord() call - datarRecorder adapter implements DataRecorder interface - fanOutRecorder handles 0-N recorders uniformly - noopRecorder removed - All docs, integration tests, and test stubs updated
…ka/Kinesis/Pubsub pattern
… fix GetDatar comment, clean up
…Recorder, engine aggregation
…tar(), clean up spacing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
A simple, zero-dependency solution for trivial aggregate analytics built directly into Flagr. No external pipeline, no Kafka consumer, no separate analytics stack needed.
Counts evaluation events by
(flag_id, variant_id, segment_id, hour)in-memory and periodically flushes to adatar_hourly_eventstable using additive UPSERT.Key changes
Datar as a first-class
DataRecorder. Datar now implements theDataRecorderinterface (like Kafka/Kinesis/Pubsub) and is selected by listing"datar"inFLAGR_RECORDER_TYPE. The separateFLAGR_DATAR_ENABLEDflag is removed.Comma-separated
RecorderType.FLAGR_RECORDER_TYPE=kafka,datarenables both. The field is now[]stringwithenvSeparator:",".Master kill switch.
FLAGR_RECORDER_ENABLED=false(default) disables all recorders including Datar. Set totrueto activate whatever is listed inRecorderType.Fan-out recorder.
fanOutRecorderdispatchesAsyncRecordto N recorders. Handles 0, 1, or N uniformly.noopRecorderremoved.Cleaner eval path.
logEvalResultcallsGetDataRecorder().AsyncRecord(*r)— zero knowledge of which recorders exist.Aggregation moved to Engine.
HandleGetDatarFlagSummaryno longer contains inline aggregation logic.Engine.QueryFlagSummaryBreakdownhandles variant/segment/day bucketing and sorting — the handler just converts engine types to swagger models.Consistent constructor pattern. All recorders use
New<Type>Recorder()—NewDatarRecorder()matchesNewKafkaRecorder()etc.Dead code removed.
aggregateFlagSummarydeleted.fanOutRecorder.NewDataRecordFramestub removed.Quick Start
Per-flag opt-in via
dataRecordsEnabled: trueon the flag.Endpoints
GET /api/v1/datar/summary— all flags with aggregate totals over a time window (default 7 days)GET /api/v1/datar/flags/{flagID}/summary— per-flag breakdown by variant, segment, and dayConfig changes
FLAGR_DATAR_ENABLED=trueFLAGR_RECORDER_ENABLED=true+FLAGR_RECORDER_TYPE=kafka,datarFLAGR_DATAR_FLUSH_INTERVAL=60sFLAGR_RECORDER_DATAR_FLUSH_INTERVAL=60sRecorderType stringRecorderType []string(comma-separated)Files
pkg/datar/engine.go— In-memory aggregator, transactional flush.QueryFlagSummaryBreakdownfor pre-aggregated flag breakdown.aggregateFlagSummaryremoved.pkg/entity/datar.go—HourlyEventGORM modelpkg/handler/data_recorder.go—GetDataRecorder()withRecorderEnabledmaster switch, comma-separatedRecorderType,fanOutRecorder, removednoopRecorderpkg/handler/data_recorder_datar.go—NewDatarRecorder()constructor matching other recorder patternspkg/handler/datar.go—GetDatar()singleton,hasDatar()helperpkg/handler/datar_handler.go— Two HTTP handlers.HandleGetDatarFlagSummarydelegates aggregation to engine.pkg/handler/eval.go—GetDataRecorder().AsyncRecord(*r)— no special casespkg/config/env.go—RecorderTypeas[]string,RecorderDatarFlushIntervalmoved to recorder block, removedDatarEnabledTesting
fanOutRecordertested with 0, 1, N recordershasDatartested with 6 subtables (nil, empty, with/without datar)QueryFlagSummaryBreakdowntested at engine level (aggregation, nil engine)