Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a3f382c
feat(event): add SubscriptionKey/NormalizeParams/Match fields; change…
liuxinyanglxy May 23, 2026
790d621
feat(event): add ComputeSubscriptionID for per-resource subscription …
liuxinyanglxy May 23, 2026
66a7f1d
feat(event/protocol): add SubscriptionID to Hello/PreShutdownCheck/Co…
liuxinyanglxy May 23, 2026
4da3e2b
feat(event/bus): store SubscriptionID on Conn with EventKey fallback
liuxinyanglxy May 23, 2026
7dcd0ce
feat(event/bus): key Hub dedup by SubscriptionID; add SubCount accessor
liuxinyanglxy May 23, 2026
6ee4ba0
test(event/bus): cover handleHello legacy vs modern SubscriptionID paths
liuxinyanglxy May 25, 2026
66e566d
feat(event/consume): wire NormalizeParams + ComputeSubscriptionID int…
liuxinyanglxy May 25, 2026
fe4622d
feat(event/consume): plumb SubscriptionID through checkLastForKey/con…
liuxinyanglxy May 25, 2026
279c9f5
feat(event/consume): surface cleanup errors as WARN with idempotency …
liuxinyanglxy May 25, 2026
d98ff47
feat(event/consume): add sync Match filter before Process
liuxinyanglxy May 25, 2026
224b928
feat(cmd/event): render SubscriptionKey marker in schema output
liuxinyanglxy May 25, 2026
37bef6f
feat(cmd/event): render SubscriptionID column in status table
liuxinyanglxy May 25, 2026
5cfcf54
feat(events/mail): add MailReceivedPayload union schema
liuxinyanglxy May 25, 2026
3688c1c
feat(events/mail): add normalizeMailParams to resolve 'me' alias
liuxinyanglxy May 25, 2026
a5f6ae1
feat(events/mail): add matchMailbox sync filter
liuxinyanglxy May 25, 2026
c7bb890
feat(events/mail): add processMailEvent for filter + format enrichment
liuxinyanglxy May 25, 2026
08e90f7
feat(events/mail): wire folders/labels/msg-format params + new framew…
liuxinyanglxy May 25, 2026
becaac0
docs(skills): document subscription identity + cleanup error semantics
liuxinyanglxy May 25, 2026
31c57a6
fix(events/mail): use /user_mailboxes/me/profile with primary_email_a…
liuxinyanglxy May 25, 2026
c2741c5
docs(skills): converge mail event docs into lark-event (IM-style)
liuxinyanglxy May 26, 2026
cbbc84c
fix(events/mail): decode subscriber as structured user_ids object
liuxinyanglxy May 30, 2026
95f5b80
fix(events): adapt minutes/vc PreConsume cleanup to func() error
liuxinyanglxy May 30, 2026
a42f734
chore: gofmt event test files
liuxinyanglxy May 30, 2026
654f656
fix(events): address CodeRabbit review on PR
liuxinyanglxy May 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions cmd/event/format_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,79 @@ func TestWriteStatusText_CoversAllStates(t *testing.T) {
}
}

func TestWriteStatusText_ShowsSubColumn(t *testing.T) {
var buf bytes.Buffer
writeStatusText(&buf, []appStatus{
{
AppID: "cli_RUNNINGXXXXXXXXX",
State: stateRunning,
PID: 1234,
UptimeSec: 60,
Active: 2,
Consumers: []protocol.ConsumerInfo{
{PID: 1001, EventKey: "mail.x", SubscriptionID: "mail.x:alice", Received: 5, Dropped: 0},
{PID: 1002, EventKey: "mail.x", SubscriptionID: "mail.x:bob", Received: 3, Dropped: 0},
},
},
})
out := buf.String()
if !strings.Contains(out, "SUB") {
t.Errorf("missing SUB column header: %s", out)
}
if !strings.Contains(out, "alice") {
t.Errorf("missing alice suffix in SUB column: %s", out)
}
if !strings.Contains(out, "bob") {
t.Errorf("missing bob suffix in SUB column: %s", out)
}
}

func TestWriteStatusText_LegacySubscriptionID_RendersDash(t *testing.T) {
var buf bytes.Buffer
writeStatusText(&buf, []appStatus{
{
AppID: "cli_RUNNINGXXXXXXXXX",
State: stateRunning,
PID: 1234,
UptimeSec: 60,
Active: 1,
Consumers: []protocol.ConsumerInfo{
{PID: 1001, EventKey: "im.x", SubscriptionID: "", Received: 5},
},
},
})
out := buf.String()
if !strings.Contains(out, "SUB") {
t.Errorf("missing SUB header: %s", out)
}
if !strings.Contains(out, "-") {
t.Errorf("missing dash placeholder for empty SubscriptionID: %s", out)
}
}

func TestWriteStatusText_EventKeyEqualSubscriptionID_RendersDash(t *testing.T) {
var buf bytes.Buffer
writeStatusText(&buf, []appStatus{
{
AppID: "cli_RUNNINGXXXXXXXXX",
State: stateRunning,
PID: 1234,
UptimeSec: 60,
Active: 1,
Consumers: []protocol.ConsumerInfo{
{PID: 1001, EventKey: "im.x", SubscriptionID: "im.x", Received: 5},
},
},
})
out := buf.String()
if !strings.Contains(out, "SUB") {
t.Errorf("missing SUB header: %s", out)
}
if !strings.Contains(out, "-") {
t.Errorf("missing dash placeholder when SubscriptionID==EventKey: %s", out)
}
}

func TestWriteStatusJSON_OrphanHint(t *testing.T) {
var buf bytes.Buffer
if err := writeStatusJSON(&buf, []appStatus{
Expand Down
8 changes: 6 additions & 2 deletions cmd/event/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,16 @@ func runSchema(f *cmdutil.Factory, key string, asJSON bool) error {
if len(def.Params) > 0 {
fmt.Fprintf(out, "\nParameters:\n")
w := tabwriter.NewWriter(out, 0, 4, 2, ' ', 0)
fmt.Fprintf(w, " NAME\tTYPE\tREQUIRED\tDEFAULT\tDESCRIPTION\n")
fmt.Fprintf(w, " NAME\tTYPE\tREQUIRED\tSUB-KEY\tDEFAULT\tDESCRIPTION\n")
for _, p := range def.Params {
required := "no"
if p.Required {
required = "yes"
}
subKey := "no"
if p.SubscriptionKey {
subKey = "yes"
}
defaultVal := p.Default
if defaultVal == "" {
defaultVal = "-"
Expand All @@ -145,7 +149,7 @@ func runSchema(f *cmdutil.Factory, key string, asJSON bool) error {
if desc == "" {
desc = "-"
}
fmt.Fprintf(w, " %s\t%s\t%s\t%s\t%s\n", p.Name, p.Type, required, defaultVal, desc)
fmt.Fprintf(w, " %s\t%s\t%s\t%s\t%s\t%s\n", p.Name, p.Type, required, subKey, defaultVal, desc)
}
w.Flush()

Expand Down
73 changes: 73 additions & 0 deletions cmd/event/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,79 @@ func TestRunSchema_JSONOutput(t *testing.T) {
}
}

func TestSchema_RendersSubscriptionKeyMarker(t *testing.T) {
const syntheticKey = "test.evt_sub"
t.Cleanup(func() { eventlib.UnregisterKeyForTest(syntheticKey) })

eventlib.RegisterKey(eventlib.KeyDefinition{
Key: syntheticKey,
EventType: syntheticKey,
Params: []eventlib.ParamDef{
{Name: "mailbox", SubscriptionKey: true, Description: "subscription id source"},
{Name: "folders", Description: "filter only"},
},
Schema: eventlib.SchemaDef{Native: &eventlib.SchemaSpec{Type: reflect.TypeOf(struct{ X string }{})}},
})

f, stdout, _, _ := cmdutil.TestFactory(t, &core.CliConfig{AppID: "test"})
if err := runSchema(f, syntheticKey, false); err != nil {
t.Fatalf("runSchema: %v", err)
}

out := stdout.String()
if !strings.Contains(out, "SUB-KEY") {
t.Errorf("missing SUB-KEY column header in:\n%s", out)
}

// Find the mailbox row and verify "yes" is present
var mailboxRow string
for _, ln := range strings.Split(out, "\n") {
if strings.Contains(ln, "mailbox") && !strings.Contains(ln, "NAME") {
mailboxRow = ln
break
}
}
if !strings.Contains(mailboxRow, "yes") {
t.Errorf("mailbox row missing yes SUB-KEY marker: %q", mailboxRow)
}

// Find the folders row and verify "no" is present
var foldersRow string
for _, ln := range strings.Split(out, "\n") {
if strings.Contains(ln, "folders") && !strings.Contains(ln, "NAME") {
foldersRow = ln
break
}
}
if !strings.Contains(foldersRow, "no") {
t.Errorf("folders row missing no SUB-KEY marker: %q", foldersRow)
}
}

func TestSchema_JSON_IncludesSubscriptionKey(t *testing.T) {
const syntheticKey = "test.evt_json"
t.Cleanup(func() { eventlib.UnregisterKeyForTest(syntheticKey) })

eventlib.RegisterKey(eventlib.KeyDefinition{
Key: syntheticKey,
EventType: syntheticKey,
Params: []eventlib.ParamDef{{Name: "mailbox", SubscriptionKey: true}},
Schema: eventlib.SchemaDef{Native: &eventlib.SchemaSpec{Type: reflect.TypeOf(struct{ X string }{})}},
})

f, stdout, _, _ := cmdutil.TestFactory(t, &core.CliConfig{AppID: "test"})
if err := runSchema(f, syntheticKey, true); err != nil {
t.Fatalf("runSchema json: %v", err)
}

if !strings.Contains(stdout.String(), `"subscription_key"`) {
t.Errorf("JSON output missing subscription_key field: %s", stdout.String())
}
if !strings.Contains(stdout.String(), `true`) {
t.Errorf("JSON output missing subscription_key: true value: %s", stdout.String())
}
}

func TestResolveSchemaJSON_CustomWithOverlay(t *testing.T) {
const syntheticKey = "t.custom.overlay"
t.Cleanup(func() { eventlib.UnregisterKeyForTest(syntheticKey) })
Expand Down
12 changes: 11 additions & 1 deletion cmd/event/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"fmt"
"io"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -242,12 +243,21 @@
s.PID, (time.Duration(s.UptimeSec) * time.Second).String())
fmt.Fprintf(out, " Active consumers: %d\n", s.Active)
if len(s.Consumers) > 0 {
headers := []string{"CONSUMER", "EVENT KEY", "RECEIVED", "DROPPED"}
headers := []string{"CONSUMER", "EVENT KEY", "SUB", "RECEIVED", "DROPPED"}
rows := make([][]string, 0, len(s.Consumers))
for _, c := range s.Consumers {
subDisplay := "-"
if c.SubscriptionID != "" && c.SubscriptionID != c.EventKey {
if idx := strings.Index(c.SubscriptionID, ":"); idx >= 0 {
subDisplay = c.SubscriptionID[idx+1:]
} else {
subDisplay = c.SubscriptionID

Check warning on line 254 in cmd/event/status.go

View check run for this annotation

Codecov / codecov/patch

cmd/event/status.go#L254

Added line #L254 was not covered by tests
}
}
rows = append(rows, []string{
fmt.Sprintf("pid=%d", c.PID),
c.EventKey,
subDisplay,
fmt.Sprintf("%d", c.Received),
fmt.Sprintf("%d", c.Dropped),
})
Expand Down
39 changes: 39 additions & 0 deletions events/mail/match.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) 2026 Lark Technologies Pte. Ltd.
// SPDX-License-Identifier: MIT

package mail

import (
"encoding/json"

"github.com/larksuite/cli/internal/event"
)

// matchMailbox compares the V2-envelope payload's event.mail_address against
// the normalized params.mailbox. Drops events whose mail_address doesn't match.
//
// Fail-open policy: if params.mailbox is empty (no filter), the payload can't
// be parsed, or the payload omits/moves event.mail_address (defensive —
// upstream schema may evolve), accept the event rather than silently dropping
// legitimate traffic. Only a present-but-mismatched mail_address drops.
//
// IMPORTANT: caller must ensure params.mailbox is already normalized to a real
// email (not "me"). normalizeMailParams handles this.
func matchMailbox(raw *event.RawEvent, params map[string]string) bool {
target := params["mailbox"]
if target == "" {
return true
}
var env struct {
Event struct {
MailAddress string `json:"mail_address"`
} `json:"event"`
}
if err := json.Unmarshal(raw.Payload, &env); err != nil {
return true // fail-open on unparseable payload
}
if env.Event.MailAddress == "" {
return true // fail-open on shape drift (field absent/moved); let Process decide

Check warning on line 36 in events/mail/match.go

View check run for this annotation

Codecov / codecov/patch

events/mail/match.go#L36

Added line #L36 was not covered by tests
}
return env.Event.MailAddress == target
Comment thread
liuxinyanglxy marked this conversation as resolved.
}
59 changes: 59 additions & 0 deletions events/mail/match_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2026 Lark Technologies Pte. Ltd.
// SPDX-License-Identifier: MIT

package mail

import (
"encoding/json"
"testing"

"github.com/larksuite/cli/internal/event"
)

func TestMatchMailbox(t *testing.T) {
makeV2Envelope := func(mailAddr string) json.RawMessage {
return json.RawMessage(`{"schema":"2.0","header":{},"event":{"mail_address":"` + mailAddr + `"}}`)
}
tests := []struct {
name string
payload json.RawMessage
params map[string]string
want bool
}{
{
name: "exact match",
payload: makeV2Envelope("alice@example.com"),
params: map[string]string{"mailbox": "alice@example.com"},
want: true,
},
{
name: "mismatch drops",
payload: makeV2Envelope("bob@example.com"),
params: map[string]string{"mailbox": "alice@example.com"},
want: false,
},
{
name: "empty params accepts all (fail-open: no filter)",
payload: makeV2Envelope("anything@example.com"),
params: map[string]string{},
want: true,
},
{
name: "malformed payload fail-open",
payload: json.RawMessage(`not json`),
params: map[string]string{"mailbox": "alice@example.com"},
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
raw := &event.RawEvent{Payload: tt.payload}
if got := matchMailbox(raw, tt.params); got != tt.want {
t.Errorf("got %v, want %v", got, tt.want)
}
})
}
}

// Compile-time: matchMailbox must match the Match signature.
var _ func(*event.RawEvent, map[string]string) bool = matchMailbox
61 changes: 61 additions & 0 deletions events/mail/normalize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) 2026 Lark Technologies Pte. Ltd.
// SPDX-License-Identifier: MIT

package mail

import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/larksuite/cli/internal/event"
)

// normalizeMailParams resolves the mailbox alias "me" (or empty) into the
// user's real primary email so fingerprint / Match / Process all see the
// canonical value.
//
// API: GET /open-apis/mail/v1/user_mailboxes/me/profile — returns
// {data:{primary_email_address:"..."}}. Mirrors the same OAPI call that
// shortcuts/mail/helpers.go::fetchMailboxPrimaryEmail uses, so both code
// paths (event consume and mail +watch) resolve "me" identically.
func normalizeMailParams(ctx context.Context, rt event.APIClient, params map[string]string) error {
mbox := strings.TrimSpace(params["mailbox"])
if mbox == "" || mbox == "me" {
data, err := rt.CallAPI(ctx, "GET", "/open-apis/mail/v1/user_mailboxes/me/profile", nil)
if err != nil {
return fmt.Errorf("resolve mailbox 'me': %w", err)
}
email, err := extractPrimaryEmail(data)
if err != nil {
return fmt.Errorf("decode user_mailboxes/me/profile response: %w", err)

Check warning on line 32 in events/mail/normalize.go

View check run for this annotation

Codecov / codecov/patch

events/mail/normalize.go#L32

Added line #L32 was not covered by tests
}
if email == "" {
return fmt.Errorf("user_mailboxes/me/profile returned empty primary_email_address")
}
params["mailbox"] = email
return nil
}
params["mailbox"] = mbox
return nil
}

// extractPrimaryEmail pulls primary_email_address out of the profile response.
// Tolerates both top-level shape (test fixtures) and the canonical nested
// `data` wrapper used by production responses.
func extractPrimaryEmail(raw json.RawMessage) (string, error) {
var asTop struct {
PrimaryEmailAddress string `json:"primary_email_address"`
Data struct {
PrimaryEmailAddress string `json:"primary_email_address"`
} `json:"data"`
}
if err := json.Unmarshal(raw, &asTop); err != nil {
return "", err

Check warning on line 55 in events/mail/normalize.go

View check run for this annotation

Codecov / codecov/patch

events/mail/normalize.go#L55

Added line #L55 was not covered by tests
}
if asTop.PrimaryEmailAddress != "" {
return asTop.PrimaryEmailAddress, nil

Check warning on line 58 in events/mail/normalize.go

View check run for this annotation

Codecov / codecov/patch

events/mail/normalize.go#L58

Added line #L58 was not covered by tests
}
return asTop.Data.PrimaryEmailAddress, nil
}
Loading
Loading