feat: per-resource subscription identity + Match hook#1185
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThis PR adds comprehensive subscription identity support to the event bus system, enabling stable per-subscription tracking across protocol, bus, consumer, and CLI layers. Protocol messages gain ChangesEvent subscription identity support
🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
🚀 PR Preview Install Guide🧰 CLI updatenpm i -g https://pkg.pr.new/larksuite/cli/@larksuite/cli@a3101badd793bdfa49d24f61c971cb0167a19088🧩 Skill updatenpx skills add larksuite/cli#feat/event-subscription-id -y -g |
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/event/consume/consume.go (1)
61-76:⚠️ Potential issue | 🟠 Major | ⚡ Quick winApply
opts.TimeoutbeforeNormalizeParams.
NormalizeParamsis allowed to make OAPI calls, but the timeout context is created afterwards. That means--timeoutno longer bounds startup alias-resolution/canonicalization and the command can block longer than requested before the handshake even starts.Suggested fix
- // Normalize params (resolve aliases like "me" -> real email) before fingerprint - // compute, PreConsume, Match, Process. Must happen BEFORE doHello so the - // SubscriptionID we send to bus reflects canonical values. - if keyDef.NormalizeParams != nil { - if err := keyDef.NormalizeParams(ctx, opts.Runtime, opts.Params); err != nil { - return fmt.Errorf("normalize params for %s: %w", opts.EventKey, err) - } - } - - // Compute subscription identity from normalized params + SubscriptionKey flags. - subscriptionID := ComputeSubscriptionID(keyDef, opts.Params) - if opts.Timeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, opts.Timeout) defer cancel() } + + // Normalize params (resolve aliases like "me" -> real email) before fingerprint + // compute, PreConsume, Match, Process. Must happen BEFORE doHello so the + // SubscriptionID we send to bus reflects canonical values. + if keyDef.NormalizeParams != nil { + if err := keyDef.NormalizeParams(ctx, opts.Runtime, opts.Params); err != nil { + return fmt.Errorf("normalize params for %s: %w", opts.EventKey, err) + } + } + + // Compute subscription identity from normalized params + SubscriptionKey flags. + subscriptionID := ComputeSubscriptionID(keyDef, opts.Params)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/event/consume/consume.go` around lines 61 - 76, The timeout context is created after NormalizeParams, so alias-resolution can bypass --timeout; fix by creating/applying the timeout before calling keyDef.NormalizeParams (use opts.Timeout to wrap ctx via context.WithTimeout and defer cancel immediately), then call keyDef.NormalizeParams(ctx, opts.Runtime, opts.Params), and only after successful normalization compute subscriptionID with ComputeSubscriptionID; ensure all subsequent calls (including doHello/PreConsume/Match/Process) use the timed ctx.
🧹 Nitpick comments (1)
internal/event/protocol/messages_test.go (1)
20-30: ⚡ Quick winAssert the new
subscriptionIDconstructor args are preserved.These call sites were updated for the new parameter, but the test still only verifies
Type/EventKey. A constructor regression that dropsSubscriptionIDwould pass unnoticed.Diff
- if got := NewHello(1, "k", []string{"t"}, "v1", ""); got.Type != MsgTypeHello { + if got := NewHello(1, "k", []string{"t"}, "v1", "k:sub"); got.Type != MsgTypeHello || got.SubscriptionID != "k:sub" { t.Errorf("NewHello.Type = %q, want %q", got.Type, MsgTypeHello) } @@ - if got := NewPreShutdownCheck("k", ""); got.Type != MsgTypePreShutdownCheck || got.EventKey != "k" { + if got := NewPreShutdownCheck("k", "k:sub"); got.Type != MsgTypePreShutdownCheck || got.EventKey != "k" || got.SubscriptionID != "k:sub" { t.Errorf("NewPreShutdownCheck mismatch: %+v", got) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/event/protocol/messages_test.go` around lines 20 - 30, The test must assert that the new subscriptionID constructor argument is stored on the returned messages; update the NewHello, NewHelloAck, NewEvent and NewPreShutdownCheck assertions to also verify the SubscriptionID (or equivalent field name) equals the value passed in (e.g., the last argument in each constructor call) so a regression that drops SubscriptionID will fail; locate the checks around NewHello, NewHelloAck, NewEvent and NewPreShutdownCheck in messages_test.go and add corresponding SubscriptionID equality assertions.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@events/mail/match.go`:
- Around line 31-34: The Match implementation currently only fails open when
json.Unmarshal(raw.Payload, &env) errors, but returns false if the JSON is valid
yet env.Event.MailAddress is missing/empty; change Match so that after
unmarshalling it also treats a missing/empty env.Event.MailAddress as a
fail-open case (i.e., return true when env.Event.MailAddress == ""), otherwise
continue to return env.Event.MailAddress == target; reference symbols: Match,
raw.Payload, json.Unmarshal, env.Event.MailAddress, target.
In `@internal/event/bus/conn.go`:
- Around line 150-156: The code currently builds scope from the incoming message
(m.SubscriptionID or m.EventKey); instead, use the connection's authoritative
subscription identity via Conn.SubscriptionID() when computing scope so
PreShutdownCheck uses the stored scope. Replace the scope assignment to use
c.SubscriptionID() (fall back to m.EventKey only if c.SubscriptionID() is
empty), and keep the existing checkLastForKey(c.checkLastForKey(scope)) logic
unchanged so per-subscription cleanup is evaluated against the connection's
subscription.
In `@internal/event/consume/consume_test.go`:
- Around line 47-49: The test reconstructs the fmt.Errorf wrapper itself instead
of exercising the real production wrapping in Run(), so change the test to call
the actual code path that produces the wrapped error (invoke Run() with a keyDef
that triggers NormalizeParams to fail) and assert that the returned error string
contains the expected prefix "normalize params for <EventKey>:"; alternatively,
extract the wrapping into a helper function (e.g., wrapNormalizeError(eventKey
string, err error)) and update the test to call that helper directly to verify
the exact wrapper format; locate references to Run(), NormalizeParams and
keyDef.Key in consume_test.go to implement the change.
- Around line 103-107: The test currently builds `got` using the same format
string as `want`, making it impossible to detect regressions; instead call the
real production formatting/path that emits the cleanup warning (the
cleanup/unsubscribe code path or the formatter function used in production) to
produce `got` rather than using fmt.Sprintf; keep `want` as the expected literal
and assert that the output from the actual cleanup warning routine (the
function/method that logs or formats the cleanup warning in the
unsubscribe/cleanup implementation) equals `want`.
In `@internal/event/consume/loop_test.go`:
- Around line 259-286: TestProcessAndOutput_Match_RunsBeforeProcess currently
only checks counts; change the test to assert order by recording call sequence
when keyDef.Match and keyDef.Process run (e.g. append "match" / "process" to a
slice or set a matchDone flag), and then after processAndOutput return assert
that the first entry is "match" (or that matchDone was true when Process
executed). Locate the handlers on the keyDef object (Match and Process) and
update them to record the sequence or assert inside Process that the match-side
marker is already set so the test fails if Process ran before Match.
In `@skills/lark-event/SKILL.md`:
- Around line 155-163: The doc incorrectly uses “reference-counted” to describe
server behavior; update the "Cleanup error reporting" section to remove the
phrase "reference-counted" and reword the sentence that currently reads
“Subscriptions are reference-counted implicitly by Lark's server (one record per
`(app, user, event_type)`)" to clearly state the server stores a single
idempotent subscription record per (app, user, event_type) that is overwritten
on subscribe, not reference-counted, and keep the guidance that manual
unsubscribe is not a safe recovery action because a stray unsubscribe can remove
that single record used by other consumers.
---
Outside diff comments:
In `@internal/event/consume/consume.go`:
- Around line 61-76: The timeout context is created after NormalizeParams, so
alias-resolution can bypass --timeout; fix by creating/applying the timeout
before calling keyDef.NormalizeParams (use opts.Timeout to wrap ctx via
context.WithTimeout and defer cancel immediately), then call
keyDef.NormalizeParams(ctx, opts.Runtime, opts.Params), and only after
successful normalization compute subscriptionID with ComputeSubscriptionID;
ensure all subsequent calls (including doHello/PreConsume/Match/Process) use the
timed ctx.
---
Nitpick comments:
In `@internal/event/protocol/messages_test.go`:
- Around line 20-30: The test must assert that the new subscriptionID
constructor argument is stored on the returned messages; update the NewHello,
NewHelloAck, NewEvent and NewPreShutdownCheck assertions to also verify the
SubscriptionID (or equivalent field name) equals the value passed in (e.g., the
last argument in each constructor call) so a regression that drops
SubscriptionID will fail; locate the checks around NewHello, NewHelloAck,
NewEvent and NewPreShutdownCheck in messages_test.go and add corresponding
SubscriptionID equality assertions.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 0118b5c2-be99-4a98-a187-00e3b8e2bfd1
📒 Files selected for processing (44)
cmd/event/format_helpers_test.gocmd/event/schema.gocmd/event/schema_test.gocmd/event/status.goevents/mail/match.goevents/mail/match_test.goevents/mail/normalize.goevents/mail/normalize_test.goevents/mail/payload.goevents/mail/process.goevents/mail/process_test.goevents/mail/register.goevents/mail/register_test.goevents/minutes/preconsume.goevents/register.goevents/vc/preconsume.gointernal/event/bus/bus.gointernal/event/bus/bus_shutdown_test.gointernal/event/bus/conn.gointernal/event/bus/conn_test.gointernal/event/bus/handle_hello_test.gointernal/event/bus/hub.gointernal/event/bus/hub_observability_test.gointernal/event/bus/hub_publish_race_test.gointernal/event/bus/hub_test.gointernal/event/consume/consume.gointernal/event/consume/consume_test.gointernal/event/consume/fingerprint.gointernal/event/consume/fingerprint_test.gointernal/event/consume/handshake.gointernal/event/consume/handshake_test.gointernal/event/consume/loop.gointernal/event/consume/loop_test.gointernal/event/consume/shutdown.gointernal/event/consume/shutdown_test.gointernal/event/protocol/codec_test.gointernal/event/protocol/messages.gointernal/event/protocol/messages_test.gointernal/event/types.goskills/lark-event/SKILL.mdskills/lark-event/references/lark-event-mail.mdskills/lark-mail/SKILL.mdskills/lark-mail/references/lark-mail-triage.mdskills/lark-mail/references/lark-mail-watch.md
💤 Files with no reviewable changes (1)
- skills/lark-mail/references/lark-mail-watch.md
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1185 +/- ##
==========================================
+ Coverage 72.75% 72.82% +0.06%
==========================================
Files 730 731 +1
Lines 69030 69092 +62
==========================================
+ Hits 50224 50316 +92
+ Misses 15034 14999 -35
- Partials 3772 3777 +5 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/event/consume/consume_test.go (1)
78-110:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winServer goroutine error branches hang the test instead of failing fast.
On
READ_ERR/DECODE_ERR/WRONG_TYPEthe goroutine sends todonebut never writes the ack and never closesb. The main goroutine is blocked indoHellowaiting for theHelloAck, so it never reachest.Fatalf/t.Errorfand never runs the deferredb.Close()— the test stalls until the package test timeout rather than reporting the failure. Closebin the failure branches sodoHelloreturns promptly.🐛 Proposed fix to fail fast on error branches
line, err := protocol.ReadFrame(br) if err != nil { done <- "READ_ERR:" + err.Error() + b.Close() return } msg, err := protocol.Decode(bytes.TrimRight(line, "\n")) if err != nil { done <- "DECODE_ERR:" + err.Error() + b.Close() return } if hello, ok := msg.(*protocol.Hello); ok { done <- hello.SubscriptionID // send ack so client can return ack := protocol.NewHelloAck("v1", true) _ = protocol.EncodeWithDeadline(b, ack, protocol.WriteTimeout) } else { done <- "WRONG_TYPE" + b.Close() }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/event/consume/consume_test.go` around lines 78 - 110, The server goroutine currently sends error markers to done on READ_ERR/DECODE_ERR/WRONG_TYPE but never closes b, so doHello blocks waiting for the HelloAck; update the error branches inside the goroutine (the blocks after protocol.ReadFrame, protocol.Decode and the else for type check) to call b.Close() after sending the error string to done so the client side (doHello) unblocks and the test fails fast; reference the goroutine, the done channel, b, protocol.ReadFrame/protocol.Decode, and doHello/HelloAck to locate where to add the b.Close() calls.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@internal/event/consume/consume_test.go`:
- Around line 78-110: The server goroutine currently sends error markers to done
on READ_ERR/DECODE_ERR/WRONG_TYPE but never closes b, so doHello blocks waiting
for the HelloAck; update the error branches inside the goroutine (the blocks
after protocol.ReadFrame, protocol.Decode and the else for type check) to call
b.Close() after sending the error string to done so the client side (doHello)
unblocks and the test fails fast; reference the goroutine, the done channel, b,
protocol.ReadFrame/protocol.Decode, and doHello/HelloAck to locate where to add
the b.Close() calls.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 7e096f61-8c93-4a8a-83ed-69bd91490b34
📒 Files selected for processing (5)
events/mail/match.gointernal/event/bus/conn.gointernal/event/consume/consume_test.gointernal/event/consume/loop_test.goskills/lark-event/SKILL.md
🚧 Files skipped from review as they are similar to previous changes (4)
- events/mail/match.go
- internal/event/bus/conn.go
- skills/lark-event/SKILL.md
- internal/event/consume/loop_test.go
7e44b8b to
0dfab92
Compare
084b0cc to
f24496c
Compare
Framework support for resource-scoped event subscriptions, so one EventKey can fan out into independent per-resource subscription scopes: - KeyDefinition gains SubscriptionKey / NormalizeParams / Match hooks - ComputeSubscriptionID derives a dedup identity from (EventKey, sub-key params); plumbed through bus Hub, consume loop, and the Hello / PreShutdownCheck / ConsumerInfo protocol messages - add a synchronous Match filter stage before Process - change PreConsume cleanup to func() error and surface cleanup (unsubscribe) failures as WARN with an idempotency note - adapt minutes/vc/whiteboard PreConsume to the new cleanup signature - render SubscriptionID / SubscriptionKey in event status & schema output No domain wires these hooks yet; covered by unit tests using bus/protocol doubles. (Mail, the original exerciser, is intentionally not included.) Change-Id: Ifc743f1aa0bc4dff0c8a1e35da24883694fe7699
f24496c to
a3101ba
Compare
Summary
Event consumers currently dedup and gate subscribe/cleanup at
EventKeygranularity, so one EventKey cannot fan out into independent per-resource subscriptions, and cleanup (unsubscribe) failures are swallowed. This PR adds the framework plumbing for resource-scoped subscription identity — aMatchfilter hook, aNormalizeParamshook, andSubscriptionKey-based dedup — and surfaces cleanup errors. It is groundwork only: no EventKey wires these hooks yet (the mail exerciser that drove the design is not included here).Changes
ParamDef.SubscriptionKey,KeyDefinition.NormalizeParams,KeyDefinition.Match; changePreConsumecleanup fromfunc()tofunc() errorininternal/event/types.goComputeSubscriptionID(sha256 + base64url over SubscriptionKey params) ininternal/event/consume/fingerprint.goSubscriptionIDthrough the wire —Hello,PreShutdownCheck,ConsumerInfo— ininternal/event/protocol/messages.goSubscriptionID(EventKey fallback for legacy), addSubCount, keepEventKeyCountas cross-subscription aggregate ininternal/event/bus/{hub,conn,bus}.goNormalizeParams+ComputeSubscriptionIDinto the consume run, run a synchronousMatchfilter beforeProcess, surface cleanup errors with an idempotency note ininternal/event/consume/{consume,handshake,loop,shutdown}.gominutes/vc/whiteboardPreConsumeto the newfunc() errorcleanup signature inevents/{minutes,vc,whiteboard}/preconsume.goSUB-KEY/SUBcolumns incmd/event/{schema,status}.goSubscriptionIDfalls back to EventKey, so old-consumer/new-daemon and new-consumer/old-daemon both degrade to today's single-dimension behaviorTest Plan
go build ./...,go vet ./...,gofmtall clean (go 1.23, rebased on latest main)go test ./internal/event/... ./cmd/event/... ./events/...— new hooks covered byconsume/bus/protocoltests using wire/transport doublesevent consumebehavior to exercise end-to-endSUB/SUB-KEYcolumn rendering is covered bycmd/eventunit testslark-cli;event schema im.message.receive_v1andevent statusrun cleanly (event command surface intact post-change)Related Issues
N/A