Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ environment or ignored local env file:
```bash
fincrawl sync --entities
fincrawl sync --updated-since 2h --limit 50
fincrawl sync --updated-since 180d --updated-before 90d --limit 0
fincrawl search "login code expired" --fields provider_id,subject,updated_at
```

Expand Down Expand Up @@ -98,6 +99,7 @@ Common flows:
| Check local config | `fincrawl doctor --offline` |
| Sync metadata | `fincrawl sync --entities` |
| Sync a recent window | `fincrawl sync --updated-since 2h --limit 50` |
| Backfill a bounded historical window | `fincrawl sync --updated-since 180d --updated-before 90d --limit 0` |
| Hydrate one conversation | `fincrawl sync --conversation <id>` |
| Search local archive | `fincrawl search "<query>" --fields provider_id,subject,updated_at` |
| Filter search results | `fincrawl search "<query>" --state open --tag billing` |
Expand Down
1 change: 1 addition & 0 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ fincrawl status --json
fincrawl sync --fixture testdata/synthetic
fincrawl sync --entities
fincrawl sync --updated-since 2h --limit 50
fincrawl sync --updated-since 180d --updated-before 90d --limit 0
fincrawl sync --resume
fincrawl sync --conversation <id>
fincrawl search "billing refund" --json
Expand Down
3 changes: 2 additions & 1 deletion docs/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ this repo.
- Search filters for state, tag, and Intercom-exposed Fin status.
- Read-only Intercom entity sync for admins, teams, tags, and capped contacts
when scopes allow.
- Exact conversation hydration and bounded updated-since tail sync.
- Exact conversation hydration and bounded updated-since / updated-before tail
sync.
- Resumable sync state with privacy-safe status output.
- Canonical JSONL export from fixtures or local SQLite.
- zstd + age encrypted `archive`, `publish`, and `import` flows.
Expand Down
2 changes: 2 additions & 0 deletions internal/cli/agent_dx.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func describeCommands(command string) (cliSchema, error) {
Flags: []paramSchema{
{Name: "fixture", Type: "path", Help: "Import synthetic fixture directory."},
{Name: "updated-since", Type: "duration|timestamp", Help: "Sync provider conversations updated since a duration or timestamp."},
{Name: "updated-before", Type: "duration|timestamp", Help: "Sync provider conversations updated before a duration or timestamp. Requires --updated-since."},
{Name: "conversation", Type: "provider-id", Help: "Hydrate one provider conversation ID."},
{Name: "entities", Type: "bool", Help: "Hydrate provider admins, teams, and tags."},
{Name: "contacts", Type: "bool", Help: "Include a capped contact/user list when used with --entities."},
Expand All @@ -135,6 +136,7 @@ func describeCommands(command string) (cliSchema, error) {
Examples: []string{
"fincrawl sync --fixture testdata/synthetic",
"fincrawl sync --updated-since 2h --limit 50 --dry-run",
"fincrawl sync --updated-since 180d --updated-before 90d --limit 0 --dry-run",
"fincrawl sync --conversation <provider-conversation-id> --dry-run",
},
Notes: []string{
Expand Down
62 changes: 41 additions & 21 deletions internal/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,16 @@ func (cmd statusCmd) Run(ctx commandContext) error {
}

type syncCmd struct {
Fixture string `help:"Import synthetic fixture directory."`
UpdatedSince string `name:"updated-since" help:"Sync provider conversations updated since a duration or timestamp."`
Conversation string `help:"Hydrate one provider conversation ID."`
Entities bool `help:"Hydrate provider admins, teams, and tags."`
Contacts bool `help:"Include a capped contact/user list when used with --entities."`
Resume bool `help:"Resume an interrupted Intercom updated-since sync window."`
Limit int `help:"Maximum provider conversations for --updated-since, or contacts for --entities --contacts. Use 0 for no conversation limit." default:"50"`
DryRun bool `name:"dry-run" help:"Validate and describe planned sync work without writing local state or calling provider APIs."`
JSON bool `help:"Print JSON output." default:"true"`
Fixture string `help:"Import synthetic fixture directory."`
UpdatedSince string `name:"updated-since" help:"Sync provider conversations updated since a duration or timestamp."`
UpdatedBefore string `name:"updated-before" help:"Sync provider conversations updated before a duration or timestamp. Requires --updated-since."`
Conversation string `help:"Hydrate one provider conversation ID."`
Entities bool `help:"Hydrate provider admins, teams, and tags."`
Contacts bool `help:"Include a capped contact/user list when used with --entities."`
Resume bool `help:"Resume an interrupted Intercom updated-since sync window."`
Limit int `help:"Maximum provider conversations for --updated-since, or contacts for --entities --contacts. Use 0 for no conversation limit." default:"50"`
DryRun bool `name:"dry-run" help:"Validate and describe planned sync work without writing local state or calling provider APIs."`
JSON bool `help:"Print JSON output." default:"true"`
}

func (cmd syncCmd) Run(ctx commandContext) error {
Expand Down Expand Up @@ -238,26 +239,42 @@ func (cmd syncCmd) Run(ctx commandContext) error {
return writeMaybeJSON(ctx.stdout, cmd.JSON, result)
}
if cmd.UpdatedSince != "" || cmd.Conversation != "" || cmd.Entities || cmd.Resume {
now := time.Now().UTC()
var updatedAfter time.Time
updatedBefore := now
if cmd.Conversation != "" {
if err := validateProviderID("conversation", cmd.Conversation); err != nil {
return output.UsageError{Err: err}
}
}
if cmd.UpdatedSince != "" {
updatedAfter, err = parseSince(cmd.UpdatedSince, time.Now().UTC())
updatedAfter, err = parseSince(cmd.UpdatedSince, now, "updated-since")
if err != nil {
return output.UsageError{Err: err}
}
}
if cmd.UpdatedBefore != "" {
updatedBefore, err = parseSince(cmd.UpdatedBefore, now, "updated-before")
if err != nil {
return output.UsageError{Err: err}
}
if !updatedBefore.After(updatedAfter) {
return output.UsageError{Err: fmt.Errorf("updated-before must be after updated-since")}
}
}
if cmd.DryRun {
plan := syncDryRun(cmd.mode(), rt.Config.DBPath, config.IntercomToken() != "", nil, map[string]any{
"contacts": cmd.Contacts,
"conversation": cmd.Conversation,
"limit": cmd.Limit,
"updated_after": formatOptionalTime(updatedAfter),
"updated_since": cmd.UpdatedSince,
})
params := map[string]any{
"contacts": cmd.Contacts,
"conversation": cmd.Conversation,
"limit": cmd.Limit,
}
if cmd.UpdatedSince != "" {
params["updated_after"] = formatOptionalTime(updatedAfter)
params["updated_before"] = formatOptionalTime(updatedBefore)
params["updated_since"] = cmd.UpdatedSince
params["updated_before_input"] = cmd.UpdatedBefore
}
plan := syncDryRun(cmd.mode(), rt.Config.DBPath, config.IntercomToken() != "", nil, params)
return writeMaybeJSON(ctx.stdout, cmd.JSON, plan)
}
if err := config.EnsureDirs(rt); err != nil {
Expand Down Expand Up @@ -298,7 +315,7 @@ func (cmd syncCmd) Run(ctx commandContext) error {
} else if cmd.Resume {
result, err = s.ResumeTail(ctx, rt.Config.DBPath, cmd.Limit)
} else {
result, err = s.SyncUpdatedSince(ctx, rt.Config.DBPath, updatedAfter, time.Now().UTC(), cmd.Limit)
result, err = s.SyncUpdatedSince(ctx, rt.Config.DBPath, updatedAfter, updatedBefore, cmd.Limit)
}
if err != nil {
return err
Expand Down Expand Up @@ -341,16 +358,19 @@ func (cmd syncCmd) validateMode() error {
if cmd.Contacts && !cmd.Entities {
return output.UsageError{Err: fmt.Errorf("--contacts requires --entities")}
}
if cmd.UpdatedBefore != "" && cmd.UpdatedSince == "" {
return output.UsageError{Err: fmt.Errorf("--updated-before requires --updated-since")}
}
if (cmd.UpdatedSince != "" || cmd.Resume || cmd.Entities) && cmd.Limit < 0 {
return output.UsageError{Err: fmt.Errorf("--limit must be >= 0")}
}
return nil
}

func parseSince(value string, now time.Time) (time.Time, error) {
func parseSince(value string, now time.Time, field string) (time.Time, error) {
value = strings.TrimSpace(value)
if value == "" {
return time.Time{}, fmt.Errorf("updated-since is required")
return time.Time{}, fmt.Errorf("%s is required", field)
}
if strings.HasSuffix(value, "d") {
days, err := strconv.Atoi(strings.TrimSuffix(value, "d"))
Expand All @@ -371,7 +391,7 @@ func parseSince(value string, now time.Time) (time.Time, error) {
if unix, err := strconv.ParseInt(value, 10, 64); err == nil {
return time.Unix(unix, 0).UTC(), nil
}
return time.Time{}, fmt.Errorf("invalid updated-since %q; use a duration like 2h, 30d, RFC3339 timestamp, or unix seconds", value)
return time.Time{}, fmt.Errorf("invalid %s %q; use a duration like 2h, 30d, RFC3339 timestamp, or unix seconds", field, value)
}

type searchCmd struct {
Expand Down
72 changes: 71 additions & 1 deletion internal/cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,76 @@ func TestSyncRejectsNegativeUpdatedSinceLimit(t *testing.T) {
}
}

func TestSyncRejectsUpdatedBeforeWithoutUpdatedSince(t *testing.T) {
t.Setenv("FINCRAWL_HOME", t.TempDir())
var stdout bytes.Buffer
var stderr bytes.Buffer

err := Run(context.Background(), []string{
"sync",
"--updated-before", "90d",
}, &stdout, &stderr)
if !output.IsUsage(err) {
t.Fatalf("expected usage error, got %v", err)
}
if stdout.Len() != 0 {
t.Fatalf("stdout = %q", stdout.String())
}
}

func TestSyncRejectsInvertedUpdatedWindow(t *testing.T) {
t.Setenv("FINCRAWL_HOME", t.TempDir())
var stdout bytes.Buffer
var stderr bytes.Buffer

err := Run(context.Background(), []string{
"sync",
"--updated-since", "2026-05-17T12:00:00Z",
"--updated-before", "2026-05-17T11:00:00Z",
}, &stdout, &stderr)
if !output.IsUsage(err) {
t.Fatalf("expected usage error, got %v", err)
}
if stdout.Len() != 0 {
t.Fatalf("stdout = %q", stdout.String())
}
}

func TestSyncUpdatedSinceDryRunAcceptsBoundedWindow(t *testing.T) {
t.Setenv("FINCRAWL_HOME", t.TempDir())
var stdout bytes.Buffer
var stderr bytes.Buffer

err := Run(context.Background(), []string{
"sync",
"--updated-since", "2026-02-17T00:00:00Z",
"--updated-before", "2026-05-17T00:00:00Z",
"--dry-run",
}, &stdout, &stderr)
if err != nil {
t.Fatal(err)
}
var plan struct {
Mode string `json:"mode"`
Parameters map[string]any `json:"parameters"`
}
if err := json.Unmarshal(stdout.Bytes(), &plan); err != nil {
t.Fatal(err)
}
if plan.Mode != "updated-since" {
t.Fatalf("mode = %q", plan.Mode)
}
if plan.Parameters["updated_after"] != "2026-02-17T00:00:00Z" {
t.Fatalf("updated_after = %#v", plan.Parameters["updated_after"])
}
if plan.Parameters["updated_before"] != "2026-05-17T00:00:00Z" {
t.Fatalf("updated_before = %#v", plan.Parameters["updated_before"])
}
if plan.Parameters["updated_before_input"] != "2026-05-17T00:00:00Z" {
t.Fatalf("updated_before_input = %#v", plan.Parameters["updated_before_input"])
}
}

func TestSyncRejectsNegativeResumeLimit(t *testing.T) {
t.Setenv("FINCRAWL_HOME", t.TempDir())
var stdout bytes.Buffer
Expand Down Expand Up @@ -479,7 +549,7 @@ func TestPublishImportEncryptedSnapshotRoundTrip(t *testing.T) {

func TestParseSinceAcceptsDayDurations(t *testing.T) {
now := time.Date(2026, 5, 16, 12, 0, 0, 0, time.UTC)
got, err := parseSince("2d", now)
got, err := parseSince("2d", now, "updated-since")
if err != nil {
t.Fatal(err)
}
Expand Down
23 changes: 22 additions & 1 deletion internal/syncer/intercom.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (s IntercomSyncer) SyncTail(ctx context.Context, dbPath string, opts TailSy
}
result.WorkspaceID = empty.WorkspaceID
}
state.HighWaterMark = state.ActiveWindowEnd
state.HighWaterMark = advanceHighWaterMark(state.HighWaterMark, state.ActiveWindowEnd)
state.ActiveWindowStart = ""
state.ActiveWindowEnd = ""
state.PageCursor = ""
Expand Down Expand Up @@ -253,6 +253,27 @@ func stateWindow(state store.SyncState) (time.Time, time.Time, error) {
return start, end, nil
}

func advanceHighWaterMark(current, candidate string) string {
if strings.TrimSpace(candidate) == "" {
return current
}
if strings.TrimSpace(current) == "" {
return candidate
}
currentTime, currentErr := time.Parse(time.RFC3339, current)
candidateTime, candidateErr := time.Parse(time.RFC3339, candidate)
if candidateErr != nil && currentErr == nil {
return current
}
if candidateErr != nil || currentErr != nil {
return candidate
}
if candidateTime.After(currentTime) {
return candidate
}
return current
}

func (s IntercomSyncer) searchConversations(ctx context.Context, updatedAfter, updatedBefore time.Time, cursor string) (intercom.ConversationSearchResult, error) {
var result intercom.ConversationSearchResult
err := s.withRateLimitRetry(ctx, func() error {
Expand Down
41 changes: 41 additions & 0 deletions internal/syncer/intercom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,47 @@ func TestResumeTailContinuesAfterLimitedRun(t *testing.T) {
}
}

func TestSyncUpdatedSincePreservesNewerHighWaterMark(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/conversations/search" {
t.Fatalf("path = %s", r.URL.Path)
}
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"conversations":[],"pages":{"next":{}}}`))
}))
defer server.Close()
dbPath := filepath.Join(t.TempDir(), "fincrawl.db")
newer := time.Unix(1770001000, 0).UTC().Format(time.RFC3339)
if err := store.SaveSyncState(context.Background(), dbPath, store.SyncState{
ID: store.IntercomTailSyncStateID,
HighWaterMark: newer,
}); err != nil {
t.Fatal(err)
}
s := IntercomSyncer{
Client: intercom.Client{BaseURL: server.URL, Token: "fake-token", HTTPClient: server.Client()},
Now: func() time.Time { return time.Unix(1770000000, 0) },
}
_, err := s.SyncUpdatedSince(context.Background(), dbPath, time.Unix(1769990000, 0), time.Unix(1770000500, 0), 0)
if err != nil {
t.Fatal(err)
}
state, ok, err := store.LoadSyncState(context.Background(), dbPath, store.IntercomTailSyncStateID)
if err != nil {
t.Fatal(err)
}
if !ok || state.HighWaterMark != newer {
t.Fatalf("state = %#v, want high water mark %q", state, newer)
}
}

func TestAdvanceHighWaterMarkPreservesValidCurrentFromMalformedCandidate(t *testing.T) {
current := "2026-05-17T18:00:00Z"
if got := advanceHighWaterMark(current, "not-a-timestamp"); got != current {
t.Fatalf("high water mark = %q, want %q", got, current)
}
}

func TestSyncUpdatedSinceRequiresResumeWhenActive(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), "fincrawl.db")
if err := store.SaveSyncState(context.Background(), dbPath, store.SyncState{
Expand Down