Skip to content

feat: per-resource subscription identity + Match hook#1185

Merged
liuxinyanglxy merged 1 commit into
mainfrom
feat/event-subscription-id
Jun 11, 2026
Merged

feat: per-resource subscription identity + Match hook#1185
liuxinyanglxy merged 1 commit into
mainfrom
feat/event-subscription-id

Conversation

@liuxinyanglxy

@liuxinyanglxy liuxinyanglxy commented May 30, 2026

Copy link
Copy Markdown
Collaborator

Summary

Event consumers currently dedup and gate subscribe/cleanup at EventKey granularity, so one EventKey cannot fan out into independent per-resource subscriptions, and cleanup (unsubscribe) failures are swallowed. This PR adds the framework plumbing for resource-scoped subscription identity — a Match filter hook, a NormalizeParams hook, and SubscriptionKey-based dedup — and surfaces cleanup errors. It is groundwork only: no EventKey wires these hooks yet (the mail exerciser that drove the design is not included here).

Changes

  • Add ParamDef.SubscriptionKey, KeyDefinition.NormalizeParams, KeyDefinition.Match; change PreConsume cleanup from func() to func() error in internal/event/types.go
  • Add ComputeSubscriptionID (sha256 + base64url over SubscriptionKey params) in internal/event/consume/fingerprint.go
  • Thread SubscriptionID through the wire — Hello, PreShutdownCheck, ConsumerInfo — in internal/event/protocol/messages.go
  • Key Hub dedup by SubscriptionID (EventKey fallback for legacy), add SubCount, keep EventKeyCount as cross-subscription aggregate in internal/event/bus/{hub,conn,bus}.go
  • Wire NormalizeParams + ComputeSubscriptionID into the consume run, run a synchronous Match filter before Process, surface cleanup errors with an idempotency note in internal/event/consume/{consume,handshake,loop,shutdown}.go
  • Adapt minutes/vc/whiteboard PreConsume to the new func() error cleanup signature in events/{minutes,vc,whiteboard}/preconsume.go
  • Render SUB-KEY / SUB columns in cmd/event/{schema,status}.go
  • Backward compatible: empty SubscriptionID falls back to EventKey, so old-consumer/new-daemon and new-consumer/old-daemon both degrade to today's single-dimension behavior

Test Plan

  • go build ./..., go vet ./..., gofmt all clean (go 1.23, rebased on latest main)
  • unit tests pass for all changed packages: go test ./internal/event/... ./cmd/event/... ./events/... — new hooks covered by consume/bus/protocol tests using wire/transport doubles
  • sandbox E2E / skill-eval: N/A — framework plumbing only; no EventKey wires the hooks, so there is no event consume behavior to exercise end-to-end
  • acceptance-reviewer: N/A — no user-facing behavior added; the SUB/SUB-KEY column rendering is covered by cmd/event unit tests
  • manual verification: rebuilt lark-cli; event schema im.message.receive_v1 and event status run cleanly (event command surface intact post-change)

Related Issues

N/A

@coderabbitai

coderabbitai Bot commented May 30, 2026

Copy link
Copy Markdown

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR adds comprehensive subscription identity support to the event bus system, enabling stable per-subscription tracking across protocol, bus, consumer, and CLI layers. Protocol messages gain SubscriptionID fields; the Hub refactors from event-key counting to subscription-scoped tracking; consumers compute stable subscription fingerprints from SubscriptionKey parameters before handshake; and CLI commands display subscription identity in schema and status tables.

Changes

Event subscription identity support

Layer / File(s) Summary
Protocol messages and type contracts
internal/event/protocol/messages.go, internal/event/protocol/messages_test.go, internal/event/protocol/codec_test.go, internal/event/types.go
Hello, PreShutdownCheck, and ConsumerInfo protocol messages add optional SubscriptionID fields; NewHello and NewPreShutdownCheck constructors accept and populate subscription ID. ParamDef gains SubscriptionKey flag to mark identity-affecting parameters. KeyDefinition adds NormalizeParams and Match hooks; PreConsume cleanup now returns func() error for error propagation.
Bus connection subscription tracking
internal/event/bus/conn.go, internal/event/bus/conn_test.go, internal/event/bus/bus_shutdown_test.go
Conn stores per-connection subID field with SubscriptionID() method that falls back to eventKey when empty. NewConn constructor accepts subID parameter. PreShutdownCheck message construction uses c.SubscriptionID() for authoritative scope. Tests verify fallback and explicit subscription behavior.
Hub subscription identity coordination
internal/event/bus/hub.go, internal/event/bus/hub_test.go, internal/event/bus/handle_hello_test.go, internal/event/bus/hub_publish_race_test.go, internal/event/bus/hub_observability_test.go
Subscriber interface adds SubscriptionID() method. Hub replaces keyCounts with subCounts keyed by subscription ID; RegisterAndIsFirst, UnregisterAndIsLast, and cleanup-lock methods (AcquireCleanupLock, ReleaseCleanupLock) now coordinate per-subscription scope. EventKeyCount aggregates across subscriptions; new SubCount() returns per-subscription totals. Consumers() populates SubscriptionID in returned entries. Tests validate isolation, counting semantics, and registration behavior.
Hello handshake and subscription registration
internal/event/bus/bus.go
handleHello derives subscription ID from incoming message with fallback to event key, passes it to NewConn. On disconnect, cleanup lock is released using SubscriptionID() instead of bare event key.
Client-side subscription identity computation
internal/event/consume/fingerprint.go, internal/event/consume/fingerprint_test.go, internal/event/consume/consume.go, internal/event/consume/consume_test.go
New ComputeSubscriptionID() derives stable per-subscription fingerprint: filters parameters marked SubscriptionKey=true, sorts by name for order-independence, hashes with SHA-256, base64url-encodes to 16-character suffix. Returns key:fingerprint format or bare key when no subscription params exist. Run() normalizes parameters before computing subscription ID; captures cleanup function with error-handling semantics.
Handshake with subscription scope
internal/event/consume/handshake.go, internal/event/consume/handshake_test.go
doHello() accepts computed subscriptionID and passes it to protocol.NewHello() for server transmission in initial handshake.
Consume loop with event filtering and subscription scope
internal/event/consume/loop.go, internal/event/consume/loop_test.go
consumeLoop() accepts subscriptionID and passes it to checkLastForKey() for subscription-scoped "last subscriber" check. processAndOutput() constructs RawEvent upfront, applies optional Match filter as synchronous early gate before Process and sink write; rejects events on filter mismatch without downstream processing.
Shutdown with subscription scope
internal/event/consume/shutdown.go, internal/event/consume/shutdown_test.go
checkLastForKey() accepts subscriptionID and passes it to protocol.NewPreShutdownCheck() for subscription-scoped "last subscriber" detection. Tests verify on-the-wire subscription ID transmission.
Event type cleanup callback updates
events/minutes/preconsume.go, events/vc/preconsume.go
subscriptionPreConsume cleanup functions now return error type, allowing unsubscribe failures to be wrapped and propagated to the framework instead of silently discarded.
CLI schema display
cmd/event/schema.go, cmd/event/schema_test.go
Schema command adds SUB-KEY column indicating which parameters are marked SubscriptionKey=true. JSON output includes subscription_key field with boolean value.
CLI status display
cmd/event/status.go, cmd/event/format_helpers_test.go
Status command adds SUB column showing consumer subscription identity. Renders dash (-) for empty/legacy consumers or when subscription ID equals event key; shows suffix after : delimiter if present for compact display. Tests verify rendering for various subscription states.

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

  • larksuite/cli#654: Prior PR established the basic event consume/bus infrastructure; this PR extends it with subscription-scope identity, stable fingerprinting, and multi-subscriber coordination.

Suggested labels

size/XL, enhancement

Suggested reviewers

  • evandance
  • kongenpei
  • liangshuo-1

Poem

🐰 A rabbit hops through subscription lands,
Where each mailbox now holds its own stands.
Fingerprints stable, parameters bound—
Multi-subscribers dance 'round, safe and sound!
Protocol whispers subscription IDs clear,
The event bus purrs: "Welcome here, dear!"

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 23.36% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarizes the main changes: adding per-resource subscription identity and a Match hook to the event framework.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description check ✅ Passed The pull request provides a comprehensive description matching the template structure with Summary, Changes, Test Plan, and Related Issues sections clearly delineated.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/event-subscription-id

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions github-actions Bot added domain/mail PR touches the mail domain size/L Large or sensitive change across domains or core paths labels May 30, 2026
@github-actions

github-actions Bot commented May 30, 2026

Copy link
Copy Markdown

🚀 PR Preview Install Guide

🧰 CLI update

npm i -g https://pkg.pr.new/larksuite/cli/@larksuite/cli@a3101badd793bdfa49d24f61c971cb0167a19088

🧩 Skill update

npx skills add larksuite/cli#feat/event-subscription-id -y -g

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/event/consume/consume.go (1)

61-76: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Apply opts.Timeout before NormalizeParams.

NormalizeParams is allowed to make OAPI calls, but the timeout context is created afterwards. That means --timeout no longer bounds startup alias-resolution/canonicalization and the command can block longer than requested before the handshake even starts.

Suggested fix
-	// Normalize params (resolve aliases like "me" -> real email) before fingerprint
-	// compute, PreConsume, Match, Process. Must happen BEFORE doHello so the
-	// SubscriptionID we send to bus reflects canonical values.
-	if keyDef.NormalizeParams != nil {
-		if err := keyDef.NormalizeParams(ctx, opts.Runtime, opts.Params); err != nil {
-			return fmt.Errorf("normalize params for %s: %w", opts.EventKey, err)
-		}
-	}
-
-	// Compute subscription identity from normalized params + SubscriptionKey flags.
-	subscriptionID := ComputeSubscriptionID(keyDef, opts.Params)
-
 	if opts.Timeout > 0 {
 		var cancel context.CancelFunc
 		ctx, cancel = context.WithTimeout(ctx, opts.Timeout)
 		defer cancel()
 	}
+
+	// Normalize params (resolve aliases like "me" -> real email) before fingerprint
+	// compute, PreConsume, Match, Process. Must happen BEFORE doHello so the
+	// SubscriptionID we send to bus reflects canonical values.
+	if keyDef.NormalizeParams != nil {
+		if err := keyDef.NormalizeParams(ctx, opts.Runtime, opts.Params); err != nil {
+			return fmt.Errorf("normalize params for %s: %w", opts.EventKey, err)
+		}
+	}
+
+	// Compute subscription identity from normalized params + SubscriptionKey flags.
+	subscriptionID := ComputeSubscriptionID(keyDef, opts.Params)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/event/consume/consume.go` around lines 61 - 76, The timeout context
is created after NormalizeParams, so alias-resolution can bypass --timeout; fix
by creating/applying the timeout before calling keyDef.NormalizeParams (use
opts.Timeout to wrap ctx via context.WithTimeout and defer cancel immediately),
then call keyDef.NormalizeParams(ctx, opts.Runtime, opts.Params), and only after
successful normalization compute subscriptionID with ComputeSubscriptionID;
ensure all subsequent calls (including doHello/PreConsume/Match/Process) use the
timed ctx.
🧹 Nitpick comments (1)
internal/event/protocol/messages_test.go (1)

20-30: ⚡ Quick win

Assert the new subscriptionID constructor args are preserved.

These call sites were updated for the new parameter, but the test still only verifies Type/EventKey. A constructor regression that drops SubscriptionID would pass unnoticed.

Diff
-	if got := NewHello(1, "k", []string{"t"}, "v1", ""); got.Type != MsgTypeHello {
+	if got := NewHello(1, "k", []string{"t"}, "v1", "k:sub"); got.Type != MsgTypeHello || got.SubscriptionID != "k:sub" {
 		t.Errorf("NewHello.Type = %q, want %q", got.Type, MsgTypeHello)
 	}
@@
-	if got := NewPreShutdownCheck("k", ""); got.Type != MsgTypePreShutdownCheck || got.EventKey != "k" {
+	if got := NewPreShutdownCheck("k", "k:sub"); got.Type != MsgTypePreShutdownCheck || got.EventKey != "k" || got.SubscriptionID != "k:sub" {
 		t.Errorf("NewPreShutdownCheck mismatch: %+v", got)
 	}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/event/protocol/messages_test.go` around lines 20 - 30, The test must
assert that the new subscriptionID constructor argument is stored on the
returned messages; update the NewHello, NewHelloAck, NewEvent and
NewPreShutdownCheck assertions to also verify the SubscriptionID (or equivalent
field name) equals the value passed in (e.g., the last argument in each
constructor call) so a regression that drops SubscriptionID will fail; locate
the checks around NewHello, NewHelloAck, NewEvent and NewPreShutdownCheck in
messages_test.go and add corresponding SubscriptionID equality assertions.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@events/mail/match.go`:
- Around line 31-34: The Match implementation currently only fails open when
json.Unmarshal(raw.Payload, &env) errors, but returns false if the JSON is valid
yet env.Event.MailAddress is missing/empty; change Match so that after
unmarshalling it also treats a missing/empty env.Event.MailAddress as a
fail-open case (i.e., return true when env.Event.MailAddress == ""), otherwise
continue to return env.Event.MailAddress == target; reference symbols: Match,
raw.Payload, json.Unmarshal, env.Event.MailAddress, target.

In `@internal/event/bus/conn.go`:
- Around line 150-156: The code currently builds scope from the incoming message
(m.SubscriptionID or m.EventKey); instead, use the connection's authoritative
subscription identity via Conn.SubscriptionID() when computing scope so
PreShutdownCheck uses the stored scope. Replace the scope assignment to use
c.SubscriptionID() (fall back to m.EventKey only if c.SubscriptionID() is
empty), and keep the existing checkLastForKey(c.checkLastForKey(scope)) logic
unchanged so per-subscription cleanup is evaluated against the connection's
subscription.

In `@internal/event/consume/consume_test.go`:
- Around line 47-49: The test reconstructs the fmt.Errorf wrapper itself instead
of exercising the real production wrapping in Run(), so change the test to call
the actual code path that produces the wrapped error (invoke Run() with a keyDef
that triggers NormalizeParams to fail) and assert that the returned error string
contains the expected prefix "normalize params for <EventKey>:"; alternatively,
extract the wrapping into a helper function (e.g., wrapNormalizeError(eventKey
string, err error)) and update the test to call that helper directly to verify
the exact wrapper format; locate references to Run(), NormalizeParams and
keyDef.Key in consume_test.go to implement the change.
- Around line 103-107: The test currently builds `got` using the same format
string as `want`, making it impossible to detect regressions; instead call the
real production formatting/path that emits the cleanup warning (the
cleanup/unsubscribe code path or the formatter function used in production) to
produce `got` rather than using fmt.Sprintf; keep `want` as the expected literal
and assert that the output from the actual cleanup warning routine (the
function/method that logs or formats the cleanup warning in the
unsubscribe/cleanup implementation) equals `want`.

In `@internal/event/consume/loop_test.go`:
- Around line 259-286: TestProcessAndOutput_Match_RunsBeforeProcess currently
only checks counts; change the test to assert order by recording call sequence
when keyDef.Match and keyDef.Process run (e.g. append "match" / "process" to a
slice or set a matchDone flag), and then after processAndOutput return assert
that the first entry is "match" (or that matchDone was true when Process
executed). Locate the handlers on the keyDef object (Match and Process) and
update them to record the sequence or assert inside Process that the match-side
marker is already set so the test fails if Process ran before Match.

In `@skills/lark-event/SKILL.md`:
- Around line 155-163: The doc incorrectly uses “reference-counted” to describe
server behavior; update the "Cleanup error reporting" section to remove the
phrase "reference-counted" and reword the sentence that currently reads
“Subscriptions are reference-counted implicitly by Lark's server (one record per
`(app, user, event_type)`)" to clearly state the server stores a single
idempotent subscription record per (app, user, event_type) that is overwritten
on subscribe, not reference-counted, and keep the guidance that manual
unsubscribe is not a safe recovery action because a stray unsubscribe can remove
that single record used by other consumers.

---

Outside diff comments:
In `@internal/event/consume/consume.go`:
- Around line 61-76: The timeout context is created after NormalizeParams, so
alias-resolution can bypass --timeout; fix by creating/applying the timeout
before calling keyDef.NormalizeParams (use opts.Timeout to wrap ctx via
context.WithTimeout and defer cancel immediately), then call
keyDef.NormalizeParams(ctx, opts.Runtime, opts.Params), and only after
successful normalization compute subscriptionID with ComputeSubscriptionID;
ensure all subsequent calls (including doHello/PreConsume/Match/Process) use the
timed ctx.

---

Nitpick comments:
In `@internal/event/protocol/messages_test.go`:
- Around line 20-30: The test must assert that the new subscriptionID
constructor argument is stored on the returned messages; update the NewHello,
NewHelloAck, NewEvent and NewPreShutdownCheck assertions to also verify the
SubscriptionID (or equivalent field name) equals the value passed in (e.g., the
last argument in each constructor call) so a regression that drops
SubscriptionID will fail; locate the checks around NewHello, NewHelloAck,
NewEvent and NewPreShutdownCheck in messages_test.go and add corresponding
SubscriptionID equality assertions.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 0118b5c2-be99-4a98-a187-00e3b8e2bfd1

📥 Commits

Reviewing files that changed from the base of the PR and between b1ecf2d and 95f5b80.

📒 Files selected for processing (44)
  • cmd/event/format_helpers_test.go
  • cmd/event/schema.go
  • cmd/event/schema_test.go
  • cmd/event/status.go
  • events/mail/match.go
  • events/mail/match_test.go
  • events/mail/normalize.go
  • events/mail/normalize_test.go
  • events/mail/payload.go
  • events/mail/process.go
  • events/mail/process_test.go
  • events/mail/register.go
  • events/mail/register_test.go
  • events/minutes/preconsume.go
  • events/register.go
  • events/vc/preconsume.go
  • internal/event/bus/bus.go
  • internal/event/bus/bus_shutdown_test.go
  • internal/event/bus/conn.go
  • internal/event/bus/conn_test.go
  • internal/event/bus/handle_hello_test.go
  • internal/event/bus/hub.go
  • internal/event/bus/hub_observability_test.go
  • internal/event/bus/hub_publish_race_test.go
  • internal/event/bus/hub_test.go
  • internal/event/consume/consume.go
  • internal/event/consume/consume_test.go
  • internal/event/consume/fingerprint.go
  • internal/event/consume/fingerprint_test.go
  • internal/event/consume/handshake.go
  • internal/event/consume/handshake_test.go
  • internal/event/consume/loop.go
  • internal/event/consume/loop_test.go
  • internal/event/consume/shutdown.go
  • internal/event/consume/shutdown_test.go
  • internal/event/protocol/codec_test.go
  • internal/event/protocol/messages.go
  • internal/event/protocol/messages_test.go
  • internal/event/types.go
  • skills/lark-event/SKILL.md
  • skills/lark-event/references/lark-event-mail.md
  • skills/lark-mail/SKILL.md
  • skills/lark-mail/references/lark-mail-triage.md
  • skills/lark-mail/references/lark-mail-watch.md
💤 Files with no reviewable changes (1)
  • skills/lark-mail/references/lark-mail-watch.md

Comment thread events/mail/match.go Outdated
Comment thread internal/event/bus/conn.go Outdated
Comment thread internal/event/consume/consume_test.go Outdated
Comment thread internal/event/consume/consume_test.go Outdated
Comment thread internal/event/consume/loop_test.go
Comment thread skills/lark-event/SKILL.md Outdated
@codecov

codecov Bot commented May 30, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 80.00000% with 26 lines in your changes missing coverage. Please review.
✅ Project coverage is 72.82%. Comparing base (154ecdb) to head (a3101ba).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
internal/event/consume/consume.go 19.04% 16 Missing and 1 partial ⚠️
events/minutes/preconsume.go 66.66% 1 Missing and 1 partial ⚠️
events/vc/preconsume.go 66.66% 1 Missing and 1 partial ⚠️
events/whiteboard/preconsume.go 66.66% 1 Missing and 1 partial ⚠️
internal/event/bus/bus.go 71.42% 1 Missing and 1 partial ⚠️
internal/event/bus/conn.go 88.88% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1185      +/-   ##
==========================================
+ Coverage   72.75%   72.82%   +0.06%     
==========================================
  Files         730      731       +1     
  Lines       69030    69092      +62     
==========================================
+ Hits        50224    50316      +92     
+ Misses      15034    14999      -35     
- Partials     3772     3777       +5     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/event/consume/consume_test.go (1)

78-110: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Server goroutine error branches hang the test instead of failing fast.

On READ_ERR/DECODE_ERR/WRONG_TYPE the goroutine sends to done but never writes the ack and never closes b. The main goroutine is blocked in doHello waiting for the HelloAck, so it never reaches t.Fatalf/t.Errorf and never runs the deferred b.Close() — the test stalls until the package test timeout rather than reporting the failure. Close b in the failure branches so doHello returns promptly.

🐛 Proposed fix to fail fast on error branches
 		line, err := protocol.ReadFrame(br)
 		if err != nil {
 			done <- "READ_ERR:" + err.Error()
+			b.Close()
 			return
 		}
 		msg, err := protocol.Decode(bytes.TrimRight(line, "\n"))
 		if err != nil {
 			done <- "DECODE_ERR:" + err.Error()
+			b.Close()
 			return
 		}
 		if hello, ok := msg.(*protocol.Hello); ok {
 			done <- hello.SubscriptionID
 			// send ack so client can return
 			ack := protocol.NewHelloAck("v1", true)
 			_ = protocol.EncodeWithDeadline(b, ack, protocol.WriteTimeout)
 		} else {
 			done <- "WRONG_TYPE"
+			b.Close()
 		}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/event/consume/consume_test.go` around lines 78 - 110, The server
goroutine currently sends error markers to done on
READ_ERR/DECODE_ERR/WRONG_TYPE but never closes b, so doHello blocks waiting for
the HelloAck; update the error branches inside the goroutine (the blocks after
protocol.ReadFrame, protocol.Decode and the else for type check) to call
b.Close() after sending the error string to done so the client side (doHello)
unblocks and the test fails fast; reference the goroutine, the done channel, b,
protocol.ReadFrame/protocol.Decode, and doHello/HelloAck to locate where to add
the b.Close() calls.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@internal/event/consume/consume_test.go`:
- Around line 78-110: The server goroutine currently sends error markers to done
on READ_ERR/DECODE_ERR/WRONG_TYPE but never closes b, so doHello blocks waiting
for the HelloAck; update the error branches inside the goroutine (the blocks
after protocol.ReadFrame, protocol.Decode and the else for type check) to call
b.Close() after sending the error string to done so the client side (doHello)
unblocks and the test fails fast; reference the goroutine, the done channel, b,
protocol.ReadFrame/protocol.Decode, and doHello/HelloAck to locate where to add
the b.Close() calls.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 7e096f61-8c93-4a8a-83ed-69bd91490b34

📥 Commits

Reviewing files that changed from the base of the PR and between a42f734 and 654f656.

📒 Files selected for processing (5)
  • events/mail/match.go
  • internal/event/bus/conn.go
  • internal/event/consume/consume_test.go
  • internal/event/consume/loop_test.go
  • skills/lark-event/SKILL.md
🚧 Files skipped from review as they are similar to previous changes (4)
  • events/mail/match.go
  • internal/event/bus/conn.go
  • skills/lark-event/SKILL.md
  • internal/event/consume/loop_test.go

@liuxinyanglxy liuxinyanglxy force-pushed the feat/event-subscription-id branch from 7e44b8b to 0dfab92 Compare June 9, 2026 08:40
@github-actions github-actions Bot removed the domain/mail PR touches the mail domain label Jun 9, 2026
@liuxinyanglxy liuxinyanglxy changed the title feat(events): per-resource subscription identity + Match hook + mail feat: per-resource subscription identity + Match hook Jun 9, 2026
@liuxinyanglxy liuxinyanglxy force-pushed the feat/event-subscription-id branch 5 times, most recently from 084b0cc to f24496c Compare June 11, 2026 06:18
Framework support for resource-scoped event subscriptions, so one
EventKey can fan out into independent per-resource subscription scopes:

- KeyDefinition gains SubscriptionKey / NormalizeParams / Match hooks
- ComputeSubscriptionID derives a dedup identity from (EventKey, sub-key
  params); plumbed through bus Hub, consume loop, and the
  Hello / PreShutdownCheck / ConsumerInfo protocol messages
- add a synchronous Match filter stage before Process
- change PreConsume cleanup to func() error and surface cleanup
  (unsubscribe) failures as WARN with an idempotency note
- adapt minutes/vc/whiteboard PreConsume to the new cleanup signature
- render SubscriptionID / SubscriptionKey in event status & schema output

No domain wires these hooks yet; covered by unit tests using bus/protocol
doubles. (Mail, the original exerciser, is intentionally not included.)

Change-Id: Ifc743f1aa0bc4dff0c8a1e35da24883694fe7699
@liuxinyanglxy liuxinyanglxy force-pushed the feat/event-subscription-id branch from f24496c to a3101ba Compare June 11, 2026 06:58
@liuxinyanglxy liuxinyanglxy merged commit 3f77ede into main Jun 11, 2026
36 of 40 checks passed
@liuxinyanglxy liuxinyanglxy deleted the feat/event-subscription-id branch June 11, 2026 08:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature size/L Large or sensitive change across domains or core paths

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants