Skip to content

feat(events): per-resource subscription identity + Match hook + mail#1185

Open
liuxinyanglxy wants to merge 24 commits into
mainfrom
feat/event-subscription-id
Open

feat(events): per-resource subscription identity + Match hook + mail#1185
liuxinyanglxy wants to merge 24 commits into
mainfrom
feat/event-subscription-id

Conversation

@liuxinyanglxy
Copy link
Copy Markdown
Collaborator

@liuxinyanglxy liuxinyanglxy commented May 30, 2026

Summary

Mail (and any future per-resource EventKey such as drive comments or task updates) subscribes at a finer granularity than IM: its subscription identity is (EventKey, mailbox), not EventKey alone. The current bus keys dedup on EventKey only, so a second event consume against the same key with a different mailbox is silently skipped (PreConsume never fires, the consumer hangs with no error). Cleanup failures are also swallowed. This PR lifts the framework from one-dimensional to two-dimensional subscription identity, adds a sync Match filter and a NormalizeParams hook, surfaces cleanup errors, and adapts the mail EventKey end-to-end as the proving ground. Raised in response to the review on #851.

Changes

  • Add ParamDef.SubscriptionKey, KeyDefinition.NormalizeParams, KeyDefinition.Match; change PreConsume cleanup from func() to func() error in internal/event/types.go
  • Add ComputeSubscriptionID (sha256 + base64url, 16-char) in internal/event/consume/fingerprint.go
  • Thread SubscriptionID through the wire: Hello, PreShutdownCheck, ConsumerInfo in internal/event/protocol/messages.go
  • Key Hub dedup by SubscriptionID (was EventKey); add SubCount; preserve EventKeyCount as cross-subscription aggregate in internal/event/bus/hub.go, conn.go, bus.go
  • Wire NormalizeParams + ComputeSubscriptionID into the consume run, plumb SubscriptionID through checkLastForKey, surface cleanup errors with an idempotency note in internal/event/consume/{consume,handshake,loop,shutdown}.go
  • Render SUB-KEY column in cmd/event/schema.go and SUB column in cmd/event/status.go
  • Adapt the mail EventKey: union payload schema, me alias normalization (/user_mailboxes/me/profile), mailbox Match, folders/labels filter + msg-format enrichment Process in events/mail/{payload,normalize,match,process,register}.go
  • Document subscription identity, cleanup semantics, and the mail EventKey under skills/lark-event/; remove the mail-domain +watch skill surface so event subscription converges under lark-event (IM-style)
  • Wire compatibility: empty SubscriptionID falls back to EventKey, so old consumer ↔ new daemon and new consumer ↔ old daemon both degrade to today's single-dimension behavior

Test Plan

  • make unit-test passed (all internal/event/..., events/..., cmd/event/... green with -race)
  • validate passed (build + vet + unit + integration + convention + security)
  • local-eval passed (E2E 7/7 for this change; sandbox total 157/16, the 16 are pre-existing failures unrelated to this diff; skillave 5/5)
  • acceptance-reviewer passed (7/7 scenarios)
  • manual verification: lark-cli event consume mail.user_mailbox.event.message_received_v1 -p mailbox=me -p msg-format=metadata --max-events 1 --as user — normalize resolved me, real mail event emitted with from/subject/snippet, cleanup ran cleanly; event schema mail.xxx shows the SUB-KEY column; event status shows the SUB column

Related Issues

N/A

Summary by CodeRabbit

  • New Features

    • Mail events: consume new-mail events with mailbox/folders/labels filters and msg-format modes (event/metadata/full/plain_text_full).
    • Alias resolution: mailbox=me resolves to your primary email at startup.
    • Independent subscriptions: multiple subscriptions for the same event key are tracked separately.
  • Improvements

    • Status and schema tables show SUB / SUB-KEY markers.
    • Filtering and fetch behavior refined (drop non-matching events; optional on-demand fetch).
    • Cleanup now surfaces unsubscribe failures.
  • Documentation

    • Mail event docs, examples, and guidance added.

Review Change Stack

… cleanup to func() error

Change-Id: I23ba07c089b2c0eb3e42451d8118849a1c5f7a9d
…dedup

Change-Id: Iaeb09c15c754c9e4ca8ce9784623f82f55fe308b
…nsumerInfo

Add SubscriptionID field (omitempty) to Hello, PreShutdownCheck, and ConsumerInfo.
Update NewHello and NewPreShutdownCheck constructors to accept subscriptionID param;
patch all call sites with "" placeholder pending Tasks 7/8.

Change-Id: I469a1fb0de2c31326513feef5693d8097dfc8415
Add subID field to Conn, update NewConn signature to accept it, add
SubscriptionID() getter (falls back to EventKey when empty), update
handleControlMessage PreShutdownCheck branch to use message SubscriptionID
with EventKey fallback, and wire hello.SubscriptionID through handleHello.

Change-Id: Iea9195dd2d343fd51c84317e4d7f2d5d8067d51d
Change-Id: I3cc81e8440656470f92f1b50074b840b0875c781
Change-Id: Ieb7c2237621a84721b6fc73967838d9caec2a837
…o Run

Change-Id: Ifd43aa2a82c315ad3d496a39ce9f0dbdd43f3bb8
…sumeLoop

Change-Id: Ib40b148c45f546ca112d30976749f42fae1f877a
…note

Change-Id: I121ec0d3e7d50f3ebb728f47f1f54d89322e0143
Run keyDef.Match(raw, params) at the top of processAndOutput before any
Process/sink work; drop the event (return false, nil) when it returns false.
Move the RawEvent allocation above the Match guard so both Match and Process
share the same struct. Three new unit tests cover drop, nil-accepts-all,
and run-before-process ordering.

Change-Id: I21571b3da91a9a918903b6fc3a6ff897eb83a747
Change-Id: I2f88774c58719af0419c471f40de1bf12242a5ab
Add a SUB column to the human-readable consumer table in `event status`.
Renders the fingerprint hash suffix (after the colon) when present;
falls back to "-" for legacy consumers where SubscriptionID is empty
or equal to EventKey.

Change-Id: Ia6271475e562844fc275f3751c64ca408ebda82c
Change-Id: Ie880d12e5dda73925ccaadc60ff8d3acda401dd6
Change-Id: I02cca113d50067e9a5964b896b2bc84210d2a35b
Change-Id: I3dcd481fa93ce8f5dbde1e065aa00c18330265c0
Change-Id: I99c64618900dde0b5420ce8a4f5c088960098e66
…ork hooks

Adds events/mail/register.go with Keys() exposing the mail.user_mailbox.event.message_received_v1
EventKey: mailbox (SUB-KEY), folders, labels, msg-format params; wires NormalizeParams,
PreConsume (subscribe/unsubscribe), Match, and Process hooks. Updates events/register.go to
include mail.Keys() alongside im.Keys().

Note: Schema.Custom is used (not Native) because processMailEvent produces the complete output
shape — Schema.Native is incompatible with Process per registry validation.

Change-Id: Ibf48dc19dee5db65730810b0f2e4b5ebed73c4f0
Change-Id: I14636cae6a459cdf5a82455bc18022d5edd6c5ce
…ddress

Change-Id: I03d0780aaac0fd7d59c1395df6912f5bb043f996
Change-Id: If46580dff10e15a56858157dbcdfb79f2c43d227
The mail message_received event's subscriber field is a
{user_ids:[{user_id,open_id,union_id}]} object per the Feishu SDK
(P2UserMailboxEventMessageReceivedV1Data.Subscriber), not a string.
Decoding it as string made Process fail at runtime with
'cannot unmarshal object into Go struct field .event.subscriber of
type string', silently dropping every mail event.

Change-Id: I91dcd7e82659e3962a2530fae8d5ee34fe40dbf6
The PreConsume cleanup signature changed from func() to func() error
in this branch. minutes and vc (added on main since this branch forked)
share a subscriptionPreConsume helper that returned the old signature;
update both to return the unsubscribe error so the framework can surface
cleanup failures uniformly.

Change-Id: I808a4446e8b9327d4e6ec20e2b28a48345b5e1b9
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 30, 2026

📝 Walkthrough

Walkthrough

Adds subscription-scoped identity across the event system, implements a mail "message received" event with normalize/match/process hooks, updates Hub/Conn/Bus to track SubscriptionID, propagates subscription IDs through consume/handshake/shutdown flows, updates CLI schema/status displays, and adds tests and docs.

Changes

Subscription identity system

Layer / File(s) Summary
Protocol messages and type definitions
internal/event/protocol/messages.go, internal/event/protocol/messages_test.go, internal/event/protocol/codec_test.go, internal/event/types.go
Protocol messages gain subscription_id; ParamDef adds SubscriptionKey; KeyDefinition adds NormalizeParams and Match; PreConsume cleanup now returns func() error.
Subscription ID fingerprinting
internal/event/consume/fingerprint.go, internal/event/consume/fingerprint_test.go
ComputeSubscriptionID computes stable EventKey:fingerprint identifiers from SubscriptionKey params; tests validate determinism, format, and edge cases.

Mail event integration

Layer / File(s) Summary
Mail event schema and payload types
events/mail/payload.go
Defines MailReceivedPayload and supporting types for attachments and subscriber identifiers.
Mail parameter normalization
events/mail/normalize.go, events/mail/normalize_test.go
Resolves mailbox=me (or empty) to primary email via mailbox profile API; extracts primary_email_address tolerant of response shapes.
Mail event filtering
events/mail/match.go, events/mail/match_test.go
matchMailbox filters by envelope mail_address with fail-open semantics for empty filter or malformed payload.
Mail event enrichment and processing
events/mail/process.go, events/mail/process_test.go
processMailEvent optionally fetches message metadata/body per msg-format, applies folder/label filters, enriches payload, and can drop events; extensive tests added.
Mail event registration
events/mail/register.go, events/mail/register_test.go, events/register.go
Registers mail.user_mailbox.event.message_received_v1 with subscription-keyed mailbox, hooks wired for NormalizeParams/PreConsume/Match/Process, and preconsume subscribe/unsubscribe cleanup logic.

Hub/Bus subscription identity tracking

Layer / File(s) Summary
Connection subscription identity
internal/event/bus/conn.go, internal/event/bus/conn_test.go
Conn stores subID, NewConn accepts subID, and SubscriptionID() falls back to event key when empty; tests added.
Hub subscription-keyed tracking
internal/event/bus/hub.go, internal/event/bus/hub_test.go
Hub replaces keyCounts with subCounts keyed by SubscriptionID; Subscriber interface adds SubscriptionID(); Register/Unregister, cleanup locks, and Consumers() updated.
Bus handshake and cleanup routing
internal/event/bus/bus.go, internal/event/bus/bus_shutdown_test.go, internal/event/bus/handle_hello_test.go
handleHello derives subID from Hello.SubscriptionID (fallback EventKey) and constructs Conn with subID; cleanup lock release now uses SubscriptionID(); tests for legacy/modern clients added.

Consumer loop subscription integration

Layer / File(s) Summary
Handshake with subscription identity
internal/event/consume/handshake.go, internal/event/consume/handshake_test.go, internal/event/consume/consume.go, internal/event/consume/consume_test.go
Run normalizes params, computes subscriptionID via ComputeSubscriptionID, passes subscriptionID to doHello which sends it in Hello; deferred cleanup checks returned error.
Event filtering and processing pipeline
internal/event/consume/loop.go, internal/event/consume/loop_test.go
consumeLoop accepts subscriptionID and passes it to checkLastForKey; processAndOutput builds RawEvent early and runs KeyDefinition.Match synchronously before Process and sink writes; tests added for Match behavior.
Shutdown with subscription scope
internal/event/consume/shutdown.go, internal/event/consume/shutdown_test.go
checkLastForKey includes subscriptionID in PreShutdownCheck; tests validate on-the-wire subscription_id.
Cleanup error handling
events/minutes/preconsume.go, events/vc/preconsume.go
PreConsume hooks now return cleanup funcs that themselves return error; unsubscribe failures are wrapped and propagated rather than ignored.

CLI output for subscription identity

Layer / File(s) Summary
Status table subscription column
cmd/event/status.go, cmd/event/format_helpers_test.go
Adds SUB column to consumer status table, showing subscription suffix or - when absent/legacy; tests added for rendering cases.
Schema table subscription-key column
cmd/event/schema.go, cmd/event/schema_test.go
Adds SUB-KEY column to parameters table rendering yes/no for SubscriptionKey-marked params; tests for text and JSON outputs added.

Documentation

Layer / File(s) Summary
Mail event docs & skills
skills/lark-event/references/lark-event-mail.md, skills/lark-event/SKILL.md, skills/lark-mail/*
Adds mail event reference, examples, subscription-identity explanation, and removes legacy mail +watch docs/shortcuts.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • larksuite/cli#654: Prior work on subscription identity and CLI schema/status rendering that this PR builds upon.

Suggested labels

domain/vc

Suggested reviewers

  • liangshuo-1
  • zhaoleibd
  • haidaodashushu

Poem

"🐇 I hopped through params and hashed a string,
Mailboxes split so each subscription can sing.
Hello carries a neat little ID,
Hub counts by scope, tidy and free.
Tests nibble bugs while docs tidy the spring."

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 22.64% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely summarizes the main changes: adding per-resource subscription identity, a Match hook, and mail event support.
Description check ✅ Passed The description is comprehensive and complete, covering summary, detailed changes, test plan with passing results, and related issues.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/event-subscription-id

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions github-actions Bot added domain/mail PR touches the mail domain size/L Large or sensitive change across domains or core paths labels May 30, 2026
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 30, 2026

🚀 PR Preview Install Guide

🧰 CLI update

npm i -g https://pkg.pr.new/larksuite/cli/@larksuite/cli@654f65640ffce2918e9f1d0f144201f581cbfd8d

🧩 Skill update

npx skills add larksuite/cli#feat/event-subscription-id -y -g

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/event/consume/consume.go (1)

61-76: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Apply opts.Timeout before NormalizeParams.

NormalizeParams is allowed to make OAPI calls, but the timeout context is created afterwards. That means --timeout no longer bounds startup alias-resolution/canonicalization and the command can block longer than requested before the handshake even starts.

Suggested fix
-	// Normalize params (resolve aliases like "me" -> real email) before fingerprint
-	// compute, PreConsume, Match, Process. Must happen BEFORE doHello so the
-	// SubscriptionID we send to bus reflects canonical values.
-	if keyDef.NormalizeParams != nil {
-		if err := keyDef.NormalizeParams(ctx, opts.Runtime, opts.Params); err != nil {
-			return fmt.Errorf("normalize params for %s: %w", opts.EventKey, err)
-		}
-	}
-
-	// Compute subscription identity from normalized params + SubscriptionKey flags.
-	subscriptionID := ComputeSubscriptionID(keyDef, opts.Params)
-
 	if opts.Timeout > 0 {
 		var cancel context.CancelFunc
 		ctx, cancel = context.WithTimeout(ctx, opts.Timeout)
 		defer cancel()
 	}
+
+	// Normalize params (resolve aliases like "me" -> real email) before fingerprint
+	// compute, PreConsume, Match, Process. Must happen BEFORE doHello so the
+	// SubscriptionID we send to bus reflects canonical values.
+	if keyDef.NormalizeParams != nil {
+		if err := keyDef.NormalizeParams(ctx, opts.Runtime, opts.Params); err != nil {
+			return fmt.Errorf("normalize params for %s: %w", opts.EventKey, err)
+		}
+	}
+
+	// Compute subscription identity from normalized params + SubscriptionKey flags.
+	subscriptionID := ComputeSubscriptionID(keyDef, opts.Params)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/event/consume/consume.go` around lines 61 - 76, The timeout context
is created after NormalizeParams, so alias-resolution can bypass --timeout; fix
by creating/applying the timeout before calling keyDef.NormalizeParams (use
opts.Timeout to wrap ctx via context.WithTimeout and defer cancel immediately),
then call keyDef.NormalizeParams(ctx, opts.Runtime, opts.Params), and only after
successful normalization compute subscriptionID with ComputeSubscriptionID;
ensure all subsequent calls (including doHello/PreConsume/Match/Process) use the
timed ctx.
🧹 Nitpick comments (1)
internal/event/protocol/messages_test.go (1)

20-30: ⚡ Quick win

Assert the new subscriptionID constructor args are preserved.

These call sites were updated for the new parameter, but the test still only verifies Type/EventKey. A constructor regression that drops SubscriptionID would pass unnoticed.

Diff
-	if got := NewHello(1, "k", []string{"t"}, "v1", ""); got.Type != MsgTypeHello {
+	if got := NewHello(1, "k", []string{"t"}, "v1", "k:sub"); got.Type != MsgTypeHello || got.SubscriptionID != "k:sub" {
 		t.Errorf("NewHello.Type = %q, want %q", got.Type, MsgTypeHello)
 	}
@@
-	if got := NewPreShutdownCheck("k", ""); got.Type != MsgTypePreShutdownCheck || got.EventKey != "k" {
+	if got := NewPreShutdownCheck("k", "k:sub"); got.Type != MsgTypePreShutdownCheck || got.EventKey != "k" || got.SubscriptionID != "k:sub" {
 		t.Errorf("NewPreShutdownCheck mismatch: %+v", got)
 	}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/event/protocol/messages_test.go` around lines 20 - 30, The test must
assert that the new subscriptionID constructor argument is stored on the
returned messages; update the NewHello, NewHelloAck, NewEvent and
NewPreShutdownCheck assertions to also verify the SubscriptionID (or equivalent
field name) equals the value passed in (e.g., the last argument in each
constructor call) so a regression that drops SubscriptionID will fail; locate
the checks around NewHello, NewHelloAck, NewEvent and NewPreShutdownCheck in
messages_test.go and add corresponding SubscriptionID equality assertions.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@events/mail/match.go`:
- Around line 31-34: The Match implementation currently only fails open when
json.Unmarshal(raw.Payload, &env) errors, but returns false if the JSON is valid
yet env.Event.MailAddress is missing/empty; change Match so that after
unmarshalling it also treats a missing/empty env.Event.MailAddress as a
fail-open case (i.e., return true when env.Event.MailAddress == ""), otherwise
continue to return env.Event.MailAddress == target; reference symbols: Match,
raw.Payload, json.Unmarshal, env.Event.MailAddress, target.

In `@internal/event/bus/conn.go`:
- Around line 150-156: The code currently builds scope from the incoming message
(m.SubscriptionID or m.EventKey); instead, use the connection's authoritative
subscription identity via Conn.SubscriptionID() when computing scope so
PreShutdownCheck uses the stored scope. Replace the scope assignment to use
c.SubscriptionID() (fall back to m.EventKey only if c.SubscriptionID() is
empty), and keep the existing checkLastForKey(c.checkLastForKey(scope)) logic
unchanged so per-subscription cleanup is evaluated against the connection's
subscription.

In `@internal/event/consume/consume_test.go`:
- Around line 47-49: The test reconstructs the fmt.Errorf wrapper itself instead
of exercising the real production wrapping in Run(), so change the test to call
the actual code path that produces the wrapped error (invoke Run() with a keyDef
that triggers NormalizeParams to fail) and assert that the returned error string
contains the expected prefix "normalize params for <EventKey>:"; alternatively,
extract the wrapping into a helper function (e.g., wrapNormalizeError(eventKey
string, err error)) and update the test to call that helper directly to verify
the exact wrapper format; locate references to Run(), NormalizeParams and
keyDef.Key in consume_test.go to implement the change.
- Around line 103-107: The test currently builds `got` using the same format
string as `want`, making it impossible to detect regressions; instead call the
real production formatting/path that emits the cleanup warning (the
cleanup/unsubscribe code path or the formatter function used in production) to
produce `got` rather than using fmt.Sprintf; keep `want` as the expected literal
and assert that the output from the actual cleanup warning routine (the
function/method that logs or formats the cleanup warning in the
unsubscribe/cleanup implementation) equals `want`.

In `@internal/event/consume/loop_test.go`:
- Around line 259-286: TestProcessAndOutput_Match_RunsBeforeProcess currently
only checks counts; change the test to assert order by recording call sequence
when keyDef.Match and keyDef.Process run (e.g. append "match" / "process" to a
slice or set a matchDone flag), and then after processAndOutput return assert
that the first entry is "match" (or that matchDone was true when Process
executed). Locate the handlers on the keyDef object (Match and Process) and
update them to record the sequence or assert inside Process that the match-side
marker is already set so the test fails if Process ran before Match.

In `@skills/lark-event/SKILL.md`:
- Around line 155-163: The doc incorrectly uses “reference-counted” to describe
server behavior; update the "Cleanup error reporting" section to remove the
phrase "reference-counted" and reword the sentence that currently reads
“Subscriptions are reference-counted implicitly by Lark's server (one record per
`(app, user, event_type)`)" to clearly state the server stores a single
idempotent subscription record per (app, user, event_type) that is overwritten
on subscribe, not reference-counted, and keep the guidance that manual
unsubscribe is not a safe recovery action because a stray unsubscribe can remove
that single record used by other consumers.

---

Outside diff comments:
In `@internal/event/consume/consume.go`:
- Around line 61-76: The timeout context is created after NormalizeParams, so
alias-resolution can bypass --timeout; fix by creating/applying the timeout
before calling keyDef.NormalizeParams (use opts.Timeout to wrap ctx via
context.WithTimeout and defer cancel immediately), then call
keyDef.NormalizeParams(ctx, opts.Runtime, opts.Params), and only after
successful normalization compute subscriptionID with ComputeSubscriptionID;
ensure all subsequent calls (including doHello/PreConsume/Match/Process) use the
timed ctx.

---

Nitpick comments:
In `@internal/event/protocol/messages_test.go`:
- Around line 20-30: The test must assert that the new subscriptionID
constructor argument is stored on the returned messages; update the NewHello,
NewHelloAck, NewEvent and NewPreShutdownCheck assertions to also verify the
SubscriptionID (or equivalent field name) equals the value passed in (e.g., the
last argument in each constructor call) so a regression that drops
SubscriptionID will fail; locate the checks around NewHello, NewHelloAck,
NewEvent and NewPreShutdownCheck in messages_test.go and add corresponding
SubscriptionID equality assertions.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 0118b5c2-be99-4a98-a187-00e3b8e2bfd1

📥 Commits

Reviewing files that changed from the base of the PR and between b1ecf2d and 95f5b80.

📒 Files selected for processing (44)
  • cmd/event/format_helpers_test.go
  • cmd/event/schema.go
  • cmd/event/schema_test.go
  • cmd/event/status.go
  • events/mail/match.go
  • events/mail/match_test.go
  • events/mail/normalize.go
  • events/mail/normalize_test.go
  • events/mail/payload.go
  • events/mail/process.go
  • events/mail/process_test.go
  • events/mail/register.go
  • events/mail/register_test.go
  • events/minutes/preconsume.go
  • events/register.go
  • events/vc/preconsume.go
  • internal/event/bus/bus.go
  • internal/event/bus/bus_shutdown_test.go
  • internal/event/bus/conn.go
  • internal/event/bus/conn_test.go
  • internal/event/bus/handle_hello_test.go
  • internal/event/bus/hub.go
  • internal/event/bus/hub_observability_test.go
  • internal/event/bus/hub_publish_race_test.go
  • internal/event/bus/hub_test.go
  • internal/event/consume/consume.go
  • internal/event/consume/consume_test.go
  • internal/event/consume/fingerprint.go
  • internal/event/consume/fingerprint_test.go
  • internal/event/consume/handshake.go
  • internal/event/consume/handshake_test.go
  • internal/event/consume/loop.go
  • internal/event/consume/loop_test.go
  • internal/event/consume/shutdown.go
  • internal/event/consume/shutdown_test.go
  • internal/event/protocol/codec_test.go
  • internal/event/protocol/messages.go
  • internal/event/protocol/messages_test.go
  • internal/event/types.go
  • skills/lark-event/SKILL.md
  • skills/lark-event/references/lark-event-mail.md
  • skills/lark-mail/SKILL.md
  • skills/lark-mail/references/lark-mail-triage.md
  • skills/lark-mail/references/lark-mail-watch.md
💤 Files with no reviewable changes (1)
  • skills/lark-mail/references/lark-mail-watch.md

Comment thread events/mail/match.go
Comment thread internal/event/bus/conn.go Outdated
Comment thread internal/event/consume/consume_test.go Outdated
Comment thread internal/event/consume/consume_test.go Outdated
Comment thread internal/event/consume/loop_test.go
Comment thread skills/lark-event/SKILL.md
Change-Id: I9cd51022a644db509a070234fcc330783a4d08cc
@codecov
Copy link
Copy Markdown

codecov Bot commented May 30, 2026

Codecov Report

❌ Patch coverage is 83.22981% with 54 lines in your changes missing coverage. Please review.
✅ Project coverage is 69.07%. Comparing base (b1ecf2d) to head (654f656).

Files with missing lines Patch % Lines
events/mail/register.go 75.38% 16 Missing ⚠️
internal/event/consume/consume.go 16.66% 15 Missing ⚠️
events/mail/process.go 91.57% 4 Missing and 4 partials ⚠️
events/mail/normalize.go 76.92% 3 Missing and 3 partials ⚠️
cmd/event/status.go 75.00% 1 Missing and 1 partial ⚠️
events/mail/match.go 84.61% 1 Missing and 1 partial ⚠️
events/minutes/preconsume.go 66.66% 1 Missing and 1 partial ⚠️
events/vc/preconsume.go 66.66% 1 Missing and 1 partial ⚠️
internal/event/bus/conn.go 88.88% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1185      +/-   ##
==========================================
+ Coverage   68.92%   69.07%   +0.15%     
==========================================
  Files         629      634       +5     
  Lines       58762    59023     +261     
==========================================
+ Hits        40503    40772     +269     
+ Misses      14952    14924      -28     
- Partials     3307     3327      +20     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

- match.go: fail open when mail_address is absent/empty (shape drift),
  not only on unparseable payload
- conn.go: PreShutdownCheck uses the connection's own SubscriptionID()
  instead of recomputing from the (possibly stale) incoming message
- consume_test: drive the real Run() path for the normalize-error wrap;
  drop the self-fulfilling cleanup-format test (covered by sandbox E2E)
- loop_test: record real call order instead of bare call counts
- lark-event SKILL.md: correct server semantics (idempotent single record,
  not reference counting)

Change-Id: I62319e84a990e5355a462280f190fd397a8a6c2d
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/event/consume/consume_test.go (1)

78-110: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Server goroutine error branches hang the test instead of failing fast.

On READ_ERR/DECODE_ERR/WRONG_TYPE the goroutine sends to done but never writes the ack and never closes b. The main goroutine is blocked in doHello waiting for the HelloAck, so it never reaches t.Fatalf/t.Errorf and never runs the deferred b.Close() — the test stalls until the package test timeout rather than reporting the failure. Close b in the failure branches so doHello returns promptly.

🐛 Proposed fix to fail fast on error branches
 		line, err := protocol.ReadFrame(br)
 		if err != nil {
 			done <- "READ_ERR:" + err.Error()
+			b.Close()
 			return
 		}
 		msg, err := protocol.Decode(bytes.TrimRight(line, "\n"))
 		if err != nil {
 			done <- "DECODE_ERR:" + err.Error()
+			b.Close()
 			return
 		}
 		if hello, ok := msg.(*protocol.Hello); ok {
 			done <- hello.SubscriptionID
 			// send ack so client can return
 			ack := protocol.NewHelloAck("v1", true)
 			_ = protocol.EncodeWithDeadline(b, ack, protocol.WriteTimeout)
 		} else {
 			done <- "WRONG_TYPE"
+			b.Close()
 		}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/event/consume/consume_test.go` around lines 78 - 110, The server
goroutine currently sends error markers to done on
READ_ERR/DECODE_ERR/WRONG_TYPE but never closes b, so doHello blocks waiting for
the HelloAck; update the error branches inside the goroutine (the blocks after
protocol.ReadFrame, protocol.Decode and the else for type check) to call
b.Close() after sending the error string to done so the client side (doHello)
unblocks and the test fails fast; reference the goroutine, the done channel, b,
protocol.ReadFrame/protocol.Decode, and doHello/HelloAck to locate where to add
the b.Close() calls.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@internal/event/consume/consume_test.go`:
- Around line 78-110: The server goroutine currently sends error markers to done
on READ_ERR/DECODE_ERR/WRONG_TYPE but never closes b, so doHello blocks waiting
for the HelloAck; update the error branches inside the goroutine (the blocks
after protocol.ReadFrame, protocol.Decode and the else for type check) to call
b.Close() after sending the error string to done so the client side (doHello)
unblocks and the test fails fast; reference the goroutine, the done channel, b,
protocol.ReadFrame/protocol.Decode, and doHello/HelloAck to locate where to add
the b.Close() calls.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 7e096f61-8c93-4a8a-83ed-69bd91490b34

📥 Commits

Reviewing files that changed from the base of the PR and between a42f734 and 654f656.

📒 Files selected for processing (5)
  • events/mail/match.go
  • internal/event/bus/conn.go
  • internal/event/consume/consume_test.go
  • internal/event/consume/loop_test.go
  • skills/lark-event/SKILL.md
🚧 Files skipped from review as they are similar to previous changes (4)
  • events/mail/match.go
  • internal/event/bus/conn.go
  • skills/lark-event/SKILL.md
  • internal/event/consume/loop_test.go

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

domain/mail PR touches the mail domain feature size/L Large or sensitive change across domains or core paths

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant