feat(events): per-resource subscription identity + Match hook + mail#1185
feat(events): per-resource subscription identity + Match hook + mail#1185liuxinyanglxy wants to merge 24 commits into
Conversation
… cleanup to func() error Change-Id: I23ba07c089b2c0eb3e42451d8118849a1c5f7a9d
…dedup Change-Id: Iaeb09c15c754c9e4ca8ce9784623f82f55fe308b
…nsumerInfo Add SubscriptionID field (omitempty) to Hello, PreShutdownCheck, and ConsumerInfo. Update NewHello and NewPreShutdownCheck constructors to accept subscriptionID param; patch all call sites with "" placeholder pending Tasks 7/8. Change-Id: I469a1fb0de2c31326513feef5693d8097dfc8415
Add subID field to Conn, update NewConn signature to accept it, add SubscriptionID() getter (falls back to EventKey when empty), update handleControlMessage PreShutdownCheck branch to use message SubscriptionID with EventKey fallback, and wire hello.SubscriptionID through handleHello. Change-Id: Iea9195dd2d343fd51c84317e4d7f2d5d8067d51d
Change-Id: I3cc81e8440656470f92f1b50074b840b0875c781
Change-Id: Ieb7c2237621a84721b6fc73967838d9caec2a837
…o Run Change-Id: Ifd43aa2a82c315ad3d496a39ce9f0dbdd43f3bb8
…sumeLoop Change-Id: Ib40b148c45f546ca112d30976749f42fae1f877a
…note Change-Id: I121ec0d3e7d50f3ebb728f47f1f54d89322e0143
Run keyDef.Match(raw, params) at the top of processAndOutput before any Process/sink work; drop the event (return false, nil) when it returns false. Move the RawEvent allocation above the Match guard so both Match and Process share the same struct. Three new unit tests cover drop, nil-accepts-all, and run-before-process ordering. Change-Id: I21571b3da91a9a918903b6fc3a6ff897eb83a747
Change-Id: I2f88774c58719af0419c471f40de1bf12242a5ab
Add a SUB column to the human-readable consumer table in `event status`. Renders the fingerprint hash suffix (after the colon) when present; falls back to "-" for legacy consumers where SubscriptionID is empty or equal to EventKey. Change-Id: Ia6271475e562844fc275f3751c64ca408ebda82c
Change-Id: Ie880d12e5dda73925ccaadc60ff8d3acda401dd6
Change-Id: I02cca113d50067e9a5964b896b2bc84210d2a35b
Change-Id: I3dcd481fa93ce8f5dbde1e065aa00c18330265c0
Change-Id: I99c64618900dde0b5420ce8a4f5c088960098e66
…ork hooks Adds events/mail/register.go with Keys() exposing the mail.user_mailbox.event.message_received_v1 EventKey: mailbox (SUB-KEY), folders, labels, msg-format params; wires NormalizeParams, PreConsume (subscribe/unsubscribe), Match, and Process hooks. Updates events/register.go to include mail.Keys() alongside im.Keys(). Note: Schema.Custom is used (not Native) because processMailEvent produces the complete output shape — Schema.Native is incompatible with Process per registry validation. Change-Id: Ibf48dc19dee5db65730810b0f2e4b5ebed73c4f0
Change-Id: I14636cae6a459cdf5a82455bc18022d5edd6c5ce
…ddress Change-Id: I03d0780aaac0fd7d59c1395df6912f5bb043f996
Change-Id: If46580dff10e15a56858157dbcdfb79f2c43d227
The mail message_received event's subscriber field is a
{user_ids:[{user_id,open_id,union_id}]} object per the Feishu SDK
(P2UserMailboxEventMessageReceivedV1Data.Subscriber), not a string.
Decoding it as string made Process fail at runtime with
'cannot unmarshal object into Go struct field .event.subscriber of
type string', silently dropping every mail event.
Change-Id: I91dcd7e82659e3962a2530fae8d5ee34fe40dbf6
The PreConsume cleanup signature changed from func() to func() error in this branch. minutes and vc (added on main since this branch forked) share a subscriptionPreConsume helper that returned the old signature; update both to return the unsubscribe error so the framework can surface cleanup failures uniformly. Change-Id: I808a4446e8b9327d4e6ec20e2b28a48345b5e1b9
📝 WalkthroughWalkthroughAdds subscription-scoped identity across the event system, implements a mail "message received" event with normalize/match/process hooks, updates Hub/Conn/Bus to track SubscriptionID, propagates subscription IDs through consume/handshake/shutdown flows, updates CLI schema/status displays, and adds tests and docs. ChangesSubscription identity system
Mail event integration
Hub/Bus subscription identity tracking
Consumer loop subscription integration
CLI output for subscription identity
Documentation
Estimated code review effort 🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
🚀 PR Preview Install Guide🧰 CLI updatenpm i -g https://pkg.pr.new/larksuite/cli/@larksuite/cli@654f65640ffce2918e9f1d0f144201f581cbfd8d🧩 Skill updatenpx skills add larksuite/cli#feat/event-subscription-id -y -g |
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/event/consume/consume.go (1)
61-76:⚠️ Potential issue | 🟠 Major | ⚡ Quick winApply
opts.TimeoutbeforeNormalizeParams.
NormalizeParamsis allowed to make OAPI calls, but the timeout context is created afterwards. That means--timeoutno longer bounds startup alias-resolution/canonicalization and the command can block longer than requested before the handshake even starts.Suggested fix
- // Normalize params (resolve aliases like "me" -> real email) before fingerprint - // compute, PreConsume, Match, Process. Must happen BEFORE doHello so the - // SubscriptionID we send to bus reflects canonical values. - if keyDef.NormalizeParams != nil { - if err := keyDef.NormalizeParams(ctx, opts.Runtime, opts.Params); err != nil { - return fmt.Errorf("normalize params for %s: %w", opts.EventKey, err) - } - } - - // Compute subscription identity from normalized params + SubscriptionKey flags. - subscriptionID := ComputeSubscriptionID(keyDef, opts.Params) - if opts.Timeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, opts.Timeout) defer cancel() } + + // Normalize params (resolve aliases like "me" -> real email) before fingerprint + // compute, PreConsume, Match, Process. Must happen BEFORE doHello so the + // SubscriptionID we send to bus reflects canonical values. + if keyDef.NormalizeParams != nil { + if err := keyDef.NormalizeParams(ctx, opts.Runtime, opts.Params); err != nil { + return fmt.Errorf("normalize params for %s: %w", opts.EventKey, err) + } + } + + // Compute subscription identity from normalized params + SubscriptionKey flags. + subscriptionID := ComputeSubscriptionID(keyDef, opts.Params)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/event/consume/consume.go` around lines 61 - 76, The timeout context is created after NormalizeParams, so alias-resolution can bypass --timeout; fix by creating/applying the timeout before calling keyDef.NormalizeParams (use opts.Timeout to wrap ctx via context.WithTimeout and defer cancel immediately), then call keyDef.NormalizeParams(ctx, opts.Runtime, opts.Params), and only after successful normalization compute subscriptionID with ComputeSubscriptionID; ensure all subsequent calls (including doHello/PreConsume/Match/Process) use the timed ctx.
🧹 Nitpick comments (1)
internal/event/protocol/messages_test.go (1)
20-30: ⚡ Quick winAssert the new
subscriptionIDconstructor args are preserved.These call sites were updated for the new parameter, but the test still only verifies
Type/EventKey. A constructor regression that dropsSubscriptionIDwould pass unnoticed.Diff
- if got := NewHello(1, "k", []string{"t"}, "v1", ""); got.Type != MsgTypeHello { + if got := NewHello(1, "k", []string{"t"}, "v1", "k:sub"); got.Type != MsgTypeHello || got.SubscriptionID != "k:sub" { t.Errorf("NewHello.Type = %q, want %q", got.Type, MsgTypeHello) } @@ - if got := NewPreShutdownCheck("k", ""); got.Type != MsgTypePreShutdownCheck || got.EventKey != "k" { + if got := NewPreShutdownCheck("k", "k:sub"); got.Type != MsgTypePreShutdownCheck || got.EventKey != "k" || got.SubscriptionID != "k:sub" { t.Errorf("NewPreShutdownCheck mismatch: %+v", got) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/event/protocol/messages_test.go` around lines 20 - 30, The test must assert that the new subscriptionID constructor argument is stored on the returned messages; update the NewHello, NewHelloAck, NewEvent and NewPreShutdownCheck assertions to also verify the SubscriptionID (or equivalent field name) equals the value passed in (e.g., the last argument in each constructor call) so a regression that drops SubscriptionID will fail; locate the checks around NewHello, NewHelloAck, NewEvent and NewPreShutdownCheck in messages_test.go and add corresponding SubscriptionID equality assertions.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@events/mail/match.go`:
- Around line 31-34: The Match implementation currently only fails open when
json.Unmarshal(raw.Payload, &env) errors, but returns false if the JSON is valid
yet env.Event.MailAddress is missing/empty; change Match so that after
unmarshalling it also treats a missing/empty env.Event.MailAddress as a
fail-open case (i.e., return true when env.Event.MailAddress == ""), otherwise
continue to return env.Event.MailAddress == target; reference symbols: Match,
raw.Payload, json.Unmarshal, env.Event.MailAddress, target.
In `@internal/event/bus/conn.go`:
- Around line 150-156: The code currently builds scope from the incoming message
(m.SubscriptionID or m.EventKey); instead, use the connection's authoritative
subscription identity via Conn.SubscriptionID() when computing scope so
PreShutdownCheck uses the stored scope. Replace the scope assignment to use
c.SubscriptionID() (fall back to m.EventKey only if c.SubscriptionID() is
empty), and keep the existing checkLastForKey(c.checkLastForKey(scope)) logic
unchanged so per-subscription cleanup is evaluated against the connection's
subscription.
In `@internal/event/consume/consume_test.go`:
- Around line 47-49: The test reconstructs the fmt.Errorf wrapper itself instead
of exercising the real production wrapping in Run(), so change the test to call
the actual code path that produces the wrapped error (invoke Run() with a keyDef
that triggers NormalizeParams to fail) and assert that the returned error string
contains the expected prefix "normalize params for <EventKey>:"; alternatively,
extract the wrapping into a helper function (e.g., wrapNormalizeError(eventKey
string, err error)) and update the test to call that helper directly to verify
the exact wrapper format; locate references to Run(), NormalizeParams and
keyDef.Key in consume_test.go to implement the change.
- Around line 103-107: The test currently builds `got` using the same format
string as `want`, making it impossible to detect regressions; instead call the
real production formatting/path that emits the cleanup warning (the
cleanup/unsubscribe code path or the formatter function used in production) to
produce `got` rather than using fmt.Sprintf; keep `want` as the expected literal
and assert that the output from the actual cleanup warning routine (the
function/method that logs or formats the cleanup warning in the
unsubscribe/cleanup implementation) equals `want`.
In `@internal/event/consume/loop_test.go`:
- Around line 259-286: TestProcessAndOutput_Match_RunsBeforeProcess currently
only checks counts; change the test to assert order by recording call sequence
when keyDef.Match and keyDef.Process run (e.g. append "match" / "process" to a
slice or set a matchDone flag), and then after processAndOutput return assert
that the first entry is "match" (or that matchDone was true when Process
executed). Locate the handlers on the keyDef object (Match and Process) and
update them to record the sequence or assert inside Process that the match-side
marker is already set so the test fails if Process ran before Match.
In `@skills/lark-event/SKILL.md`:
- Around line 155-163: The doc incorrectly uses “reference-counted” to describe
server behavior; update the "Cleanup error reporting" section to remove the
phrase "reference-counted" and reword the sentence that currently reads
“Subscriptions are reference-counted implicitly by Lark's server (one record per
`(app, user, event_type)`)" to clearly state the server stores a single
idempotent subscription record per (app, user, event_type) that is overwritten
on subscribe, not reference-counted, and keep the guidance that manual
unsubscribe is not a safe recovery action because a stray unsubscribe can remove
that single record used by other consumers.
---
Outside diff comments:
In `@internal/event/consume/consume.go`:
- Around line 61-76: The timeout context is created after NormalizeParams, so
alias-resolution can bypass --timeout; fix by creating/applying the timeout
before calling keyDef.NormalizeParams (use opts.Timeout to wrap ctx via
context.WithTimeout and defer cancel immediately), then call
keyDef.NormalizeParams(ctx, opts.Runtime, opts.Params), and only after
successful normalization compute subscriptionID with ComputeSubscriptionID;
ensure all subsequent calls (including doHello/PreConsume/Match/Process) use the
timed ctx.
---
Nitpick comments:
In `@internal/event/protocol/messages_test.go`:
- Around line 20-30: The test must assert that the new subscriptionID
constructor argument is stored on the returned messages; update the NewHello,
NewHelloAck, NewEvent and NewPreShutdownCheck assertions to also verify the
SubscriptionID (or equivalent field name) equals the value passed in (e.g., the
last argument in each constructor call) so a regression that drops
SubscriptionID will fail; locate the checks around NewHello, NewHelloAck,
NewEvent and NewPreShutdownCheck in messages_test.go and add corresponding
SubscriptionID equality assertions.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 0118b5c2-be99-4a98-a187-00e3b8e2bfd1
📒 Files selected for processing (44)
cmd/event/format_helpers_test.gocmd/event/schema.gocmd/event/schema_test.gocmd/event/status.goevents/mail/match.goevents/mail/match_test.goevents/mail/normalize.goevents/mail/normalize_test.goevents/mail/payload.goevents/mail/process.goevents/mail/process_test.goevents/mail/register.goevents/mail/register_test.goevents/minutes/preconsume.goevents/register.goevents/vc/preconsume.gointernal/event/bus/bus.gointernal/event/bus/bus_shutdown_test.gointernal/event/bus/conn.gointernal/event/bus/conn_test.gointernal/event/bus/handle_hello_test.gointernal/event/bus/hub.gointernal/event/bus/hub_observability_test.gointernal/event/bus/hub_publish_race_test.gointernal/event/bus/hub_test.gointernal/event/consume/consume.gointernal/event/consume/consume_test.gointernal/event/consume/fingerprint.gointernal/event/consume/fingerprint_test.gointernal/event/consume/handshake.gointernal/event/consume/handshake_test.gointernal/event/consume/loop.gointernal/event/consume/loop_test.gointernal/event/consume/shutdown.gointernal/event/consume/shutdown_test.gointernal/event/protocol/codec_test.gointernal/event/protocol/messages.gointernal/event/protocol/messages_test.gointernal/event/types.goskills/lark-event/SKILL.mdskills/lark-event/references/lark-event-mail.mdskills/lark-mail/SKILL.mdskills/lark-mail/references/lark-mail-triage.mdskills/lark-mail/references/lark-mail-watch.md
💤 Files with no reviewable changes (1)
- skills/lark-mail/references/lark-mail-watch.md
Change-Id: I9cd51022a644db509a070234fcc330783a4d08cc
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1185 +/- ##
==========================================
+ Coverage 68.92% 69.07% +0.15%
==========================================
Files 629 634 +5
Lines 58762 59023 +261
==========================================
+ Hits 40503 40772 +269
+ Misses 14952 14924 -28
- Partials 3307 3327 +20 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
- match.go: fail open when mail_address is absent/empty (shape drift), not only on unparseable payload - conn.go: PreShutdownCheck uses the connection's own SubscriptionID() instead of recomputing from the (possibly stale) incoming message - consume_test: drive the real Run() path for the normalize-error wrap; drop the self-fulfilling cleanup-format test (covered by sandbox E2E) - loop_test: record real call order instead of bare call counts - lark-event SKILL.md: correct server semantics (idempotent single record, not reference counting) Change-Id: I62319e84a990e5355a462280f190fd397a8a6c2d
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/event/consume/consume_test.go (1)
78-110:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winServer goroutine error branches hang the test instead of failing fast.
On
READ_ERR/DECODE_ERR/WRONG_TYPEthe goroutine sends todonebut never writes the ack and never closesb. The main goroutine is blocked indoHellowaiting for theHelloAck, so it never reachest.Fatalf/t.Errorfand never runs the deferredb.Close()— the test stalls until the package test timeout rather than reporting the failure. Closebin the failure branches sodoHelloreturns promptly.🐛 Proposed fix to fail fast on error branches
line, err := protocol.ReadFrame(br) if err != nil { done <- "READ_ERR:" + err.Error() + b.Close() return } msg, err := protocol.Decode(bytes.TrimRight(line, "\n")) if err != nil { done <- "DECODE_ERR:" + err.Error() + b.Close() return } if hello, ok := msg.(*protocol.Hello); ok { done <- hello.SubscriptionID // send ack so client can return ack := protocol.NewHelloAck("v1", true) _ = protocol.EncodeWithDeadline(b, ack, protocol.WriteTimeout) } else { done <- "WRONG_TYPE" + b.Close() }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/event/consume/consume_test.go` around lines 78 - 110, The server goroutine currently sends error markers to done on READ_ERR/DECODE_ERR/WRONG_TYPE but never closes b, so doHello blocks waiting for the HelloAck; update the error branches inside the goroutine (the blocks after protocol.ReadFrame, protocol.Decode and the else for type check) to call b.Close() after sending the error string to done so the client side (doHello) unblocks and the test fails fast; reference the goroutine, the done channel, b, protocol.ReadFrame/protocol.Decode, and doHello/HelloAck to locate where to add the b.Close() calls.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@internal/event/consume/consume_test.go`:
- Around line 78-110: The server goroutine currently sends error markers to done
on READ_ERR/DECODE_ERR/WRONG_TYPE but never closes b, so doHello blocks waiting
for the HelloAck; update the error branches inside the goroutine (the blocks
after protocol.ReadFrame, protocol.Decode and the else for type check) to call
b.Close() after sending the error string to done so the client side (doHello)
unblocks and the test fails fast; reference the goroutine, the done channel, b,
protocol.ReadFrame/protocol.Decode, and doHello/HelloAck to locate where to add
the b.Close() calls.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 7e096f61-8c93-4a8a-83ed-69bd91490b34
📒 Files selected for processing (5)
events/mail/match.gointernal/event/bus/conn.gointernal/event/consume/consume_test.gointernal/event/consume/loop_test.goskills/lark-event/SKILL.md
🚧 Files skipped from review as they are similar to previous changes (4)
- events/mail/match.go
- internal/event/bus/conn.go
- skills/lark-event/SKILL.md
- internal/event/consume/loop_test.go
Summary
Mail (and any future per-resource EventKey such as drive comments or task updates) subscribes at a finer granularity than IM: its subscription identity is
(EventKey, mailbox), notEventKeyalone. The current bus keys dedup onEventKeyonly, so a secondevent consumeagainst the same key with a different mailbox is silently skipped (PreConsume never fires, the consumer hangs with no error). Cleanup failures are also swallowed. This PR lifts the framework from one-dimensional to two-dimensional subscription identity, adds a syncMatchfilter and aNormalizeParamshook, surfaces cleanup errors, and adapts the mail EventKey end-to-end as the proving ground. Raised in response to the review on #851.Changes
ParamDef.SubscriptionKey,KeyDefinition.NormalizeParams,KeyDefinition.Match; changePreConsumecleanup fromfunc()tofunc() errorininternal/event/types.goComputeSubscriptionID(sha256 + base64url, 16-char) ininternal/event/consume/fingerprint.goSubscriptionIDthrough the wire:Hello,PreShutdownCheck,ConsumerInfoininternal/event/protocol/messages.goSubscriptionID(was EventKey); addSubCount; preserveEventKeyCountas cross-subscription aggregate ininternal/event/bus/hub.go,conn.go,bus.goNormalizeParams+ComputeSubscriptionIDinto the consume run, plumbSubscriptionIDthroughcheckLastForKey, surface cleanup errors with an idempotency note ininternal/event/consume/{consume,handshake,loop,shutdown}.goSUB-KEYcolumn incmd/event/schema.goandSUBcolumn incmd/event/status.gomealias normalization (/user_mailboxes/me/profile), mailboxMatch, folders/labels filter + msg-format enrichmentProcessinevents/mail/{payload,normalize,match,process,register}.goskills/lark-event/; remove the mail-domain+watchskill surface so event subscription converges underlark-event(IM-style)SubscriptionIDfalls back to EventKey, so old consumer ↔ new daemon and new consumer ↔ old daemon both degrade to today's single-dimension behaviorTest Plan
make unit-testpassed (allinternal/event/...,events/...,cmd/event/...green with-race)lark-cli event consume mail.user_mailbox.event.message_received_v1 -p mailbox=me -p msg-format=metadata --max-events 1 --as user— normalize resolvedme, real mail event emitted with from/subject/snippet, cleanup ran cleanly;event schema mail.xxxshows the SUB-KEY column;event statusshows the SUB columnRelated Issues
N/A
Summary by CodeRabbit
New Features
Improvements
Documentation