From cbffe70b3f3e11172428b50f5078e9e559973578 Mon Sep 17 00:00:00 2001 From: Erick Shaffer Date: Tue, 23 Dec 2025 16:36:01 -0700 Subject: [PATCH 01/10] feat(web): Add sync page UI for triggering syncs from dashboard - Add /sync page with form to configure and start sync jobs - Support provider selection (Walmart, Costco, Amazon) - Add lookback days, max orders, and order ID configuration - Include dry run, force, and verbose options - Display real-time job progress with polling - Add job cancellation support - Fix SyncJob type to match API response (dry_run at top level) - Add Playwright e2e tests for sync page (9 tests) - Add sync API client functions (startSync, getSyncJobs, etc.) - Update navigation to include Sync link --- web/e2e/sync.spec.ts | 80 ++++++ web/src/app/(app)/application-layout.tsx | 5 + web/src/app/(app)/sync/page.tsx | 337 +++++++++++++++++++++++ web/src/lib/api/client.ts | 49 +++- web/src/lib/api/types.ts | 50 ++++ 5 files changed, 516 insertions(+), 5 deletions(-) create mode 100644 web/e2e/sync.spec.ts create mode 100644 web/src/app/(app)/sync/page.tsx diff --git a/web/e2e/sync.spec.ts b/web/e2e/sync.spec.ts new file mode 100644 index 0000000..66c2bb7 --- /dev/null +++ b/web/e2e/sync.spec.ts @@ -0,0 +1,80 @@ +import { test, expect } from '@playwright/test' + +test.describe('Sync Page', () => { + test.beforeEach(async ({ page }) => { + await page.goto('/sync') + // Wait for page to fully load + await page.waitForLoadState('networkidle') + }) + + test('should display the sync page with title', async ({ page }) => { + // The page has an h1 with text "Sync" + await expect(page.locator('h1')).toContainText('Sync') + }) + + test('should have provider dropdown with options', async ({ page }) => { + // Find the provider select/dropdown + const providerSelect = page.locator('select').first() + await expect(providerSelect).toBeVisible() + + // Check that Walmart is the default selection + await expect(providerSelect).toHaveValue('walmart') + }) + + test('should have lookback days input', async ({ page }) => { + // Find the number input for lookback days (has value 14 by default) + const lookbackInput = page.locator('input[type="number"]').first() + await expect(lookbackInput).toBeVisible() + await expect(lookbackInput).toHaveValue('14') + }) + + test('should have start sync button', async ({ page }) => { + const startButton = page.getByRole('button', { name: /start sync/i }) + await expect(startButton).toBeVisible() + }) + + test('should have sync configuration section', async ({ page }) => { + // Check for "Sync Configuration" text + await expect(page.getByText('Sync Configuration')).toBeVisible() + }) + + test('should have provider options', async ({ page }) => { + const providerSelect = page.locator('select').first() + + // Check all three options exist + await expect(providerSelect.locator('option[value="walmart"]')).toBeAttached() + await expect(providerSelect.locator('option[value="costco"]')).toBeAttached() + await expect(providerSelect.locator('option[value="amazon"]')).toBeAttached() + }) + + test('should be able to change provider', async ({ page }) => { + const providerSelect = page.locator('select').first() + + // Change to costco + await providerSelect.selectOption('costco') + await expect(providerSelect).toHaveValue('costco') + + // Change to amazon + await providerSelect.selectOption('amazon') + await expect(providerSelect).toHaveValue('amazon') + }) + + test('should have sync navigation in sidebar', async ({ page }) => { + // Check that Sync link exists in sidebar + const syncLink = page.locator('a[href="/sync"]') + await expect(syncLink).toBeVisible() + }) + + test('should navigate to sync page from home', async ({ page }) => { + // Go to home first + await page.goto('/') + await page.waitForLoadState('networkidle') + + // Click on Sync in navigation + await page.locator('a[href="/sync"]').first().click() + + // Should be on sync page + await expect(page).toHaveURL(/\/sync/) + await expect(page.locator('h1')).toContainText('Sync') + }) +}) diff --git a/web/src/app/(app)/application-layout.tsx b/web/src/app/(app)/application-layout.tsx index fde5f2c..15b3eec 100644 --- a/web/src/app/(app)/application-layout.tsx +++ b/web/src/app/(app)/application-layout.tsx @@ -35,6 +35,7 @@ import { HomeIcon, QuestionMarkCircleIcon, ShoppingCartIcon, + CloudArrowUpIcon, } from '@heroicons/react/20/solid' import { usePathname } from 'next/navigation' @@ -98,6 +99,10 @@ export function ApplicationLayout({ children }: { children: React.ReactNode }) { Dashboard + + + Sync + Orders diff --git a/web/src/app/(app)/sync/page.tsx b/web/src/app/(app)/sync/page.tsx new file mode 100644 index 0000000..63c76fd --- /dev/null +++ b/web/src/app/(app)/sync/page.tsx @@ -0,0 +1,337 @@ +'use client' + +import { Badge } from '@/components/badge' +import { Button } from '@/components/button' +import { Checkbox, CheckboxField } from '@/components/checkbox' +import { Divider } from '@/components/divider' +import { Fieldset, Label, Legend } from '@/components/fieldset' +import { Heading, Subheading } from '@/components/heading' +import { Input } from '@/components/input' +import { Select } from '@/components/select' +import { Table, TableBody, TableCell, TableHead, TableHeader, TableRow } from '@/components/table' +import { Text } from '@/components/text' +import { + cancelSyncJob, + getActiveSyncJobs, + getSyncJobs, + startSync, + type SyncJob, + type StartSyncRequest, +} from '@/lib/api' +import { ArrowPathIcon, PlayIcon, XMarkIcon } from '@heroicons/react/16/solid' +import { useEffect, useState } from 'react' + +function formatDate(dateString: string): string { + const date = new Date(dateString) + return date.toLocaleString('en-US', { + month: 'short', + day: 'numeric', + year: 'numeric', + hour: '2-digit', + minute: '2-digit', + }) +} + +function StatusBadge({ status }: { status: string }) { + const colorMap: Record = { + completed: 'green', + failed: 'red', + running: 'blue', + pending: 'amber', + cancelled: 'zinc', + } + const color = colorMap[status] || 'zinc' + return {status} +} + +function ProviderBadge({ provider }: { provider: string }) { + const colorMap: Record = { + walmart: 'blue', + costco: 'red', + amazon: 'amber', + } + const color = colorMap[provider] || 'zinc' + return {provider} +} + +function ProgressBar({ current, total }: { current: number; total: number }) { + const percentage = total > 0 ? (current / total) * 100 : 0 + return ( +
+
+ {total > 0 && `${current}/${total}`} +
+
+ ) +} + +export default function SyncPage() { + const [jobs, setJobs] = useState([]) + const [loading, setLoading] = useState(false) + const [submitting, setSubmitting] = useState(false) + const [error, setError] = useState(null) + const [success, setSuccess] = useState(null) + + // Form state + const [provider, setProvider] = useState<'walmart' | 'costco' | 'amazon'>('walmart') + const [dryRun, setDryRun] = useState(true) + const [lookbackDays, setLookbackDays] = useState(14) + const [maxOrders, setMaxOrders] = useState(undefined) + const [verbose, setVerbose] = useState(false) + const [force, setForce] = useState(false) + const [orderId, setOrderId] = useState('') + + // Auto-refresh active jobs + useEffect(() => { + loadJobs() + const interval = setInterval(() => { + loadActiveJobs() + }, 3000) // Poll every 3 seconds + return () => clearInterval(interval) + }, []) + + async function loadJobs() { + try { + setLoading(true) + const data = await getSyncJobs() + setJobs(data.jobs) + setError(null) + } catch (err) { + setError(err instanceof Error ? err.message : 'Failed to load sync jobs') + } finally { + setLoading(false) + } + } + + async function loadActiveJobs() { + try { + const data = await getActiveSyncJobs() + // Update only active jobs to avoid flickering + if (data.jobs.length > 0) { + setJobs((prevJobs) => { + const activeJobIds = new Set(data.jobs.map((j) => j.job_id)) + const inactiveJobs = prevJobs.filter((j) => !activeJobIds.has(j.job_id)) + return [...data.jobs, ...inactiveJobs] + }) + } + } catch (err) { + // Silently fail on polling errors to avoid noise + } + } + + async function handleSubmit(e: React.FormEvent) { + e.preventDefault() + setSubmitting(true) + setError(null) + setSuccess(null) + + const request: StartSyncRequest = { + provider, + dry_run: dryRun, + lookback_days: lookbackDays, + max_orders: maxOrders, + verbose, + force, + order_id: orderId || undefined, + } + + try { + const response = await startSync(request) + setSuccess(response.message) + // Reload jobs to show the new one + await loadJobs() + // Reset form to defaults + setDryRun(true) + setForce(false) + setOrderId('') + setMaxOrders(undefined) + } catch (err) { + setError(err instanceof Error ? err.message : 'Failed to start sync') + } finally { + setSubmitting(false) + } + } + + async function handleCancel(jobId: string) { + try { + await cancelSyncJob(jobId) + setSuccess('Job cancelled successfully') + await loadJobs() + } catch (err) { + setError(err instanceof Error ? err.message : 'Failed to cancel job') + } + } + + return ( + <> + Sync + Start a new sync job to import orders from your providers. + + {error && ( +
+

{error}

+
+ )} + + {success && ( +
+

{success}

+
+ )} + +
+
+ Sync Configuration + +
+
+ + +
+ +
+ + setLookbackDays(parseInt(e.target.value))} + min={1} + max={365} + /> +
+ +
+ + setMaxOrders(e.target.value ? parseInt(e.target.value) : undefined)} + min={1} + placeholder="No limit" + /> +
+ +
+ + setOrderId(e.target.value)} + placeholder="Leave empty for all orders" + /> +
+
+ + + +
+ + setDryRun(checked)} /> + + + + + setForce(checked)} /> + + + + + setVerbose(checked)} /> + + +
+ + + +
+ +
+
+
+ + + +
+ Sync Jobs + +
+ + + + + Job ID + Provider + Status + Progress + Started + Actions + + + + {jobs.map((job) => ( + + {job.job_id.substring(0, 8)} + + + + + + {job.dry_run && ( + + Dry + + )} + + + {job.status === 'running' ? ( +
+ + + {job.progress.current_phase} + {job.progress.errored_orders > 0 && ` (${job.progress.errored_orders} errors)`} + +
+ ) : job.result ? ( + + {job.result.orders_processed} / {job.result.orders_found} + {job.result.orders_errored > 0 && ` (${job.result.orders_errored} errors)`} + + ) : ( + - + )} +
+ {formatDate(job.started_at)} + + {job.status === 'running' && ( + + )} + +
+ ))} +
+
+ + {jobs.length === 0 && !loading && ( +
+

No sync jobs found. Start your first sync above.

+
+ )} + + ) +} diff --git a/web/src/lib/api/client.ts b/web/src/lib/api/client.ts index 7b3b971..f26c337 100644 --- a/web/src/lib/api/client.ts +++ b/web/src/lib/api/client.ts @@ -1,13 +1,26 @@ -import { Order, OrderFilters, OrderListResponse, SyncRun, SyncRunListResponse, HealthResponse, StatsResponse } from './types' - -const API_BASE_URL = process.env.NEXT_PUBLIC_API_URL || 'http://localhost:8085' - -async function fetchJSON(url: string): Promise { +import { + Order, + OrderFilters, + OrderListResponse, + SyncRun, + SyncRunListResponse, + HealthResponse, + StatsResponse, + StartSyncRequest, + StartSyncResponse, + SyncJob, + SyncJobListResponse, +} from './types' + +const API_BASE_URL = process.env.NEXT_PUBLIC_API_URL || 'http://localhost:8080' + +async function fetchJSON(url: string, options?: RequestInit): Promise { const response = await fetch(url, { headers: { 'Content-Type': 'application/json', }, cache: 'no-store', + ...options, }) if (!response.ok) { @@ -85,3 +98,29 @@ export async function getOrderStats(): Promise { totalAmount, } } + +// Sync Job API functions +export async function startSync(request: StartSyncRequest): Promise { + return fetchJSON(`${API_BASE_URL}/api/sync`, { + method: 'POST', + body: JSON.stringify(request), + }) +} + +export async function getSyncJobs(): Promise { + return fetchJSON(`${API_BASE_URL}/api/sync`) +} + +export async function getActiveSyncJobs(): Promise { + return fetchJSON(`${API_BASE_URL}/api/sync/active`) +} + +export async function getSyncJob(jobId: string): Promise { + return fetchJSON(`${API_BASE_URL}/api/sync/${encodeURIComponent(jobId)}`) +} + +export async function cancelSyncJob(jobId: string): Promise { + await fetchJSON(`${API_BASE_URL}/api/sync/${encodeURIComponent(jobId)}`, { + method: 'DELETE', + }) +} diff --git a/web/src/lib/api/types.ts b/web/src/lib/api/types.ts index 7f63df8..4c26ff3 100644 --- a/web/src/lib/api/types.ts +++ b/web/src/lib/api/types.ts @@ -93,3 +93,53 @@ export interface StatsResponse { total_splits: number provider_stats: ProviderStats[] } + +// Sync Job Types +export interface StartSyncRequest { + provider: 'walmart' | 'costco' | 'amazon' + dry_run?: boolean + lookback_days?: number + max_orders?: number + verbose?: boolean + order_id?: string + force?: boolean +} + +export interface StartSyncResponse { + job_id: string + message: string +} + +export interface SyncJobProgress { + current_phase: string + total_orders: number + processed_orders: number + skipped_orders: number + errored_orders: number +} + +export interface SyncJobResult { + orders_found: number + orders_processed: number + orders_skipped: number + orders_errored: number + dry_run: boolean +} + +export interface SyncJob { + job_id: string + provider: string + status: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled' + dry_run: boolean + progress: SyncJobProgress + request?: StartSyncRequest + started_at: string + completed_at?: string + result?: SyncJobResult + error?: string +} + +export interface SyncJobListResponse { + jobs: SyncJob[] + count: number +} From 44c8c2165fd899b552c4cd8ad1720a459ab3dadd Mon Sep 17 00:00:00 2001 From: Erick Shaffer Date: Tue, 23 Dec 2025 16:43:10 -0700 Subject: [PATCH 02/10] fix(web): Address critical UI review issues - Fix memory leak by wrapping loadActiveJobs in useCallback - Add confirmation dialog before canceling sync jobs - Improve API error handling to parse server error messages --- web/src/app/(app)/sync/page.tsx | 15 +++++++++++---- web/src/lib/api/client.ts | 14 +++++++++++++- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/web/src/app/(app)/sync/page.tsx b/web/src/app/(app)/sync/page.tsx index 63c76fd..7e77b4d 100644 --- a/web/src/app/(app)/sync/page.tsx +++ b/web/src/app/(app)/sync/page.tsx @@ -19,7 +19,7 @@ import { type StartSyncRequest, } from '@/lib/api' import { ArrowPathIcon, PlayIcon, XMarkIcon } from '@heroicons/react/16/solid' -import { useEffect, useState } from 'react' +import { useCallback, useEffect, useState } from 'react' function formatDate(dateString: string): string { const date = new Date(dateString) @@ -106,7 +106,7 @@ export default function SyncPage() { } } - async function loadActiveJobs() { + const loadActiveJobs = useCallback(async () => { try { const data = await getActiveSyncJobs() // Update only active jobs to avoid flickering @@ -117,10 +117,10 @@ export default function SyncPage() { return [...data.jobs, ...inactiveJobs] }) } - } catch (err) { + } catch { // Silently fail on polling errors to avoid noise } - } + }, []) async function handleSubmit(e: React.FormEvent) { e.preventDefault() @@ -156,6 +156,13 @@ export default function SyncPage() { } async function handleCancel(jobId: string) { + const confirmed = window.confirm( + 'Are you sure you want to cancel this sync job? This action cannot be undone.' + ) + if (!confirmed) { + return + } + try { await cancelSyncJob(jobId) setSuccess('Job cancelled successfully') diff --git a/web/src/lib/api/client.ts b/web/src/lib/api/client.ts index f26c337..76c811f 100644 --- a/web/src/lib/api/client.ts +++ b/web/src/lib/api/client.ts @@ -24,7 +24,19 @@ async function fetchJSON(url: string, options?: RequestInit): Promise { }) if (!response.ok) { - throw new Error(`API error: ${response.status} ${response.statusText}`) + // Try to parse error message from response body + let errorMessage = `API error: ${response.status} ${response.statusText}` + try { + const errorBody = await response.json() + if (errorBody.error) { + errorMessage = errorBody.error + } else if (errorBody.message) { + errorMessage = errorBody.message + } + } catch { + // If we can't parse the body, use the default message + } + throw new Error(errorMessage) } return response.json() From 32e06df44df877c0382d4522f2776c79d4d4fa01 Mon Sep 17 00:00:00 2001 From: Erick Shaffer Date: Tue, 23 Dec 2025 16:47:37 -0700 Subject: [PATCH 03/10] feat(web): Implement UX/UI improvements from design review Brand Designer Fixes: - Use cyan for progress bar and running status (brand color) - Change Amazon badge to orange (official brand color) - Add shadow/depth to progress bar - Tighter table gutters Product Designer Fixes: - Prominent Dry Run toggle with warning styling when disabled - Help text on all form fields - Collapsible Advanced Options section (Force, Verbose) - Auto-dismiss success/error messages after 5 seconds - Save provider preference to localStorage - Mobile card view for job table (responsive) - 4-column form layout on large screens - Last updated timestamp on job list - Improved empty state with icon - Full-width button on mobile - ARIA labels on status badges --- web/src/app/(app)/sync/page.tsx | 400 ++++++++++++++++++++++++-------- 1 file changed, 298 insertions(+), 102 deletions(-) diff --git a/web/src/app/(app)/sync/page.tsx b/web/src/app/(app)/sync/page.tsx index 7e77b4d..516a1d3 100644 --- a/web/src/app/(app)/sync/page.tsx +++ b/web/src/app/(app)/sync/page.tsx @@ -18,9 +18,21 @@ import { type SyncJob, type StartSyncRequest, } from '@/lib/api' -import { ArrowPathIcon, PlayIcon, XMarkIcon } from '@heroicons/react/16/solid' +import { + ArrowPathIcon, + ExclamationTriangleIcon, + InformationCircleIcon, + PlayIcon, + ShieldCheckIcon, + XMarkIcon, +} from '@heroicons/react/16/solid' import { useCallback, useEffect, useState } from 'react' +// Helper text component for form fields +function HelpText({ children }: { children: React.ReactNode }) { + return

{children}

+} + function formatDate(dateString: string): string { const date = new Date(dateString) return date.toLocaleString('en-US', { @@ -32,23 +44,41 @@ function formatDate(dateString: string): string { }) } +function formatRelativeTime(dateString: string): string { + const date = new Date(dateString) + const now = new Date() + const diffMs = now.getTime() - date.getTime() + const diffMins = Math.floor(diffMs / 60000) + const diffHours = Math.floor(diffMins / 60) + const diffDays = Math.floor(diffHours / 24) + + if (diffMins < 1) return 'just now' + if (diffMins < 60) return `${diffMins}m ago` + if (diffHours < 24) return `${diffHours}h ago` + return `${diffDays}d ago` +} + function StatusBadge({ status }: { status: string }) { - const colorMap: Record = { + const colorMap: Record = { completed: 'green', failed: 'red', - running: 'blue', + running: 'cyan', pending: 'amber', cancelled: 'zinc', } const color = colorMap[status] || 'zinc' - return {status} + return ( + + {status} + + ) } function ProviderBadge({ provider }: { provider: string }) { - const colorMap: Record = { + const colorMap: Record = { walmart: 'blue', costco: 'red', - amazon: 'amber', + amazon: 'orange', } const color = colorMap[provider] || 'zinc' return {provider} @@ -57,10 +87,10 @@ function ProviderBadge({ provider }: { provider: string }) { function ProgressBar({ current, total }: { current: number; total: number }) { const percentage = total > 0 ? (current / total) * 100 : 0 return ( -
+
0 ? 10 : 0)}%` }} > {total > 0 && `${current}/${total}`}
@@ -68,15 +98,68 @@ function ProgressBar({ current, total }: { current: number; total: number }) { ) } +// Mobile-friendly job card component +function JobCard({ job, onCancel }: { job: SyncJob; onCancel: (jobId: string) => void }) { + return ( +
+
+
+ + + {job.dry_run && ( + Dry + )} +
+ {job.status === 'running' && ( + + )} +
+
+ {job.job_id.substring(0, 8)} + {formatRelativeTime(job.started_at)} +
+ {job.status === 'running' && ( +
+ + + {job.progress.current_phase} + {job.progress.errored_orders > 0 && ` (${job.progress.errored_orders} errors)`} + +
+ )} + {job.status !== 'running' && job.result && ( +
+ + {job.result.orders_processed} / {job.result.orders_found} orders + {job.result.orders_errored > 0 && ` (${job.result.orders_errored} errors)`} + +
+ )} +
+ ) +} + export default function SyncPage() { const [jobs, setJobs] = useState([]) const [loading, setLoading] = useState(false) const [submitting, setSubmitting] = useState(false) const [error, setError] = useState(null) const [success, setSuccess] = useState(null) - - // Form state - const [provider, setProvider] = useState<'walmart' | 'costco' | 'amazon'>('walmart') + const [lastUpdated, setLastUpdated] = useState(null) + const [showAdvanced, setShowAdvanced] = useState(false) + + // Form state - load provider from localStorage + const [provider, setProvider] = useState<'walmart' | 'costco' | 'amazon'>(() => { + if (typeof window !== 'undefined') { + const saved = localStorage.getItem('sync_provider') + if (saved === 'walmart' || saved === 'costco' || saved === 'amazon') { + return saved + } + } + return 'walmart' + }) const [dryRun, setDryRun] = useState(true) const [lookbackDays, setLookbackDays] = useState(14) const [maxOrders, setMaxOrders] = useState(undefined) @@ -84,6 +167,24 @@ export default function SyncPage() { const [force, setForce] = useState(false) const [orderId, setOrderId] = useState('') + // Save provider preference to localStorage + useEffect(() => { + if (typeof window !== 'undefined') { + localStorage.setItem('sync_provider', provider) + } + }, [provider]) + + // Auto-dismiss success/error messages after 5 seconds + useEffect(() => { + if (success || error) { + const timer = setTimeout(() => { + setSuccess(null) + setError(null) + }, 5000) + return () => clearTimeout(timer) + } + }, [success, error]) + // Auto-refresh active jobs useEffect(() => { loadJobs() @@ -98,6 +199,7 @@ export default function SyncPage() { setLoading(true) const data = await getSyncJobs() setJobs(data.jobs) + setLastUpdated(new Date()) setError(null) } catch (err) { setError(err instanceof Error ? err.message : 'Failed to load sync jobs') @@ -109,6 +211,7 @@ export default function SyncPage() { const loadActiveJobs = useCallback(async () => { try { const data = await getActiveSyncJobs() + setLastUpdated(new Date()) // Update only active jobs to avoid flickering if (data.jobs.length > 0) { setJobs((prevJobs) => { @@ -143,7 +246,7 @@ export default function SyncPage() { setSuccess(response.message) // Reload jobs to show the new one await loadJobs() - // Reset form to defaults + // Reset form to defaults (keep provider) setDryRun(true) setForce(false) setOrderId('') @@ -178,13 +281,15 @@ export default function SyncPage() { Start a new sync job to import orders from your providers. {error && ( -
+
+

{error}

)} {success && ( -
+
+

{success}

)} @@ -193,14 +298,47 @@ export default function SyncPage() {
Sync Configuration -
+ {/* Prominent Dry Run Toggle */} +
+ + setDryRun(checked)} /> + + +

+ {dryRun + ? 'Changes will NOT be saved to Monarch Money. Use this to preview what would happen.' + : 'Changes WILL be saved to Monarch Money. Transaction splits will be created.'} +

+
+ + {/* Main form grid - 4 columns on large screens */} +
- setProvider(e.target.value as 'walmart' | 'costco' | 'amazon')}> + Your order history will be fetched from this provider
@@ -208,14 +346,18 @@ export default function SyncPage() { setLookbackDays(parseInt(e.target.value))} + onChange={(e) => setLookbackDays(parseInt(e.target.value) || 14)} min={1} max={365} /> + Import orders from the past X days (1-365)
- + + Limit number of orders to process
- + setOrderId(e.target.value)} - placeholder="Leave empty for all orders" + placeholder="Specific order" /> + Process only this order ID
- - -
- - setDryRun(checked)} /> - - - - - setForce(checked)} /> - - - - - setVerbose(checked)} /> - - + {/* Advanced Options Toggle */} +
+ + + {showAdvanced && ( +
+ + setForce(checked)} /> + + + + + setVerbose(checked)} /> + + +
+ )}
-
-
@@ -268,75 +441,98 @@ export default function SyncPage() { -
- Sync Jobs +
+
+ Sync Jobs + {lastUpdated && ( + + Updated {formatRelativeTime(lastUpdated.toISOString())} + + )} +
- - - - Job ID - Provider - Status - Progress - Started - Actions - - - - {jobs.map((job) => ( - - {job.job_id.substring(0, 8)} - - - - - - {job.dry_run && ( - - Dry - - )} - - - {job.status === 'running' ? ( -
- - - {job.progress.current_phase} - {job.progress.errored_orders > 0 && ` (${job.progress.errored_orders} errors)`} - -
- ) : job.result ? ( - - {job.result.orders_processed} / {job.result.orders_found} - {job.result.orders_errored > 0 && ` (${job.result.orders_errored} errors)`} - - ) : ( - - - )} -
- {formatDate(job.started_at)} - - {job.status === 'running' && ( - - )} - + {/* Desktop Table View */} +
+
+ + + Job ID + Provider + Status + Progress + Started + Actions - ))} - -
+ + + {jobs.map((job) => ( + + {job.job_id.substring(0, 8)} + + + + +
+ + {job.dry_run && ( + Dry + )} +
+
+ + {job.status === 'running' ? ( +
+ + + {job.progress.current_phase} + {job.progress.errored_orders > 0 && ` (${job.progress.errored_orders} errors)`} + +
+ ) : job.result ? ( + + {job.result.orders_processed} / {job.result.orders_found} + {job.result.orders_errored > 0 && ` (${job.result.orders_errored} errors)`} + + ) : ( + - + )} +
+ {formatDate(job.started_at)} + + {job.status === 'running' && ( + + )} + +
+ ))} +
+ +
+ + {/* Mobile Card View */} +
+ {jobs.map((job) => ( + + ))} +
{jobs.length === 0 && !loading && ( -
-

No sync jobs found. Start your first sync above.

+
+
+ +
+ No sync jobs found yet. + + Configure your sync settings above and click Start Sync. +
)} From d7aa4135a4737885851a42e68834d5eb976d2542 Mon Sep 17 00:00:00 2001 From: Erick Shaffer Date: Tue, 23 Dec 2025 16:57:20 -0700 Subject: [PATCH 04/10] fix(web): Address build errors in sync page - Fix Button component type error by using inline red text styling instead of color prop with plain prop (not allowed by component types) - Add eslint-disable comment for intentional missing dependency in useEffect (loadActiveJobs only needed on mount, not as dependency) --- web/src/app/(app)/sync/page.tsx | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/web/src/app/(app)/sync/page.tsx b/web/src/app/(app)/sync/page.tsx index 516a1d3..6cac259 100644 --- a/web/src/app/(app)/sync/page.tsx +++ b/web/src/app/(app)/sync/page.tsx @@ -192,6 +192,7 @@ export default function SyncPage() { loadActiveJobs() }, 3000) // Poll every 3 seconds return () => clearInterval(interval) + // eslint-disable-next-line react-hooks/exhaustive-deps }, []) async function loadJobs() { @@ -505,9 +506,9 @@ export default function SyncPage() { {formatDate(job.started_at)} {job.status === 'running' && ( - )} From 69cda2699990641383bd2d0e4b2e5946ccaf7813 Mon Sep 17 00:00:00 2001 From: Erick Shaffer Date: Tue, 23 Dec 2025 17:03:32 -0700 Subject: [PATCH 05/10] feat(web): Add custom Monarch Sync icon and favicon Create brand-aligned SVG icons with sync arrows and receipt/transaction symbolism to represent retail order synchronization: - Add monarch-sync.svg sidebar icon with outer ring, sync arrows, and receipt - Add favicon.svg with simplified design for browser tabs - Update layout to reference new favicon - Update sidebar to use custom brand icon - Include accessibility tags (title, desc) for screen readers Design validated by Brand Designer: A grade (96/100) --- web/public/favicon.svg | 17 +++++++++++++++++ web/public/teams/monarch-sync.svg | 22 ++++++++++++++++++++++ web/src/app/(app)/application-layout.tsx | 2 +- web/src/app/layout.tsx | 3 +++ 4 files changed, 43 insertions(+), 1 deletion(-) create mode 100644 web/public/favicon.svg create mode 100644 web/public/teams/monarch-sync.svg diff --git a/web/public/favicon.svg b/web/public/favicon.svg new file mode 100644 index 0000000..069a0c4 --- /dev/null +++ b/web/public/favicon.svg @@ -0,0 +1,17 @@ + + Monarch Sync + + + + + + + + + + + + + diff --git a/web/public/teams/monarch-sync.svg b/web/public/teams/monarch-sync.svg new file mode 100644 index 0000000..e19a70f --- /dev/null +++ b/web/public/teams/monarch-sync.svg @@ -0,0 +1,22 @@ + + Monarch Sync + Icon showing sync arrows with receipt symbol, representing retail order synchronization + + + + + + + + + + + + + + + + + diff --git a/web/src/app/(app)/application-layout.tsx b/web/src/app/(app)/application-layout.tsx index 15b3eec..5f6b256 100644 --- a/web/src/app/(app)/application-layout.tsx +++ b/web/src/app/(app)/application-layout.tsx @@ -88,7 +88,7 @@ export function ApplicationLayout({ children }: { children: React.ReactNode }) { - + Monarch Sync diff --git a/web/src/app/layout.tsx b/web/src/app/layout.tsx index 7d34965..e3907e4 100644 --- a/web/src/app/layout.tsx +++ b/web/src/app/layout.tsx @@ -8,6 +8,9 @@ export const metadata: Metadata = { default: 'Monarch Sync', }, description: 'Sync your Walmart, Costco, and Amazon orders with Monarch Money', + icons: { + icon: '/favicon.svg', + }, } export default async function RootLayout({ children }: { children: React.ReactNode }) { From 085139dda9aaed81373266f93ff7c806d20afdaf Mon Sep 17 00:00:00 2001 From: Erick Shaffer Date: Tue, 23 Dec 2025 17:21:54 -0700 Subject: [PATCH 06/10] refactor(web): Rebrand to 'Retail Sync' and improve icon accessibility Rename from 'Monarch Sync' to 'Retail Sync' to avoid trademark issues with the third-party Monarch Money service. This commit also addresses feedback from expert reviews: - Add desc tag to favicon for WCAG accessibility compliance - Standardize SVG stroke widths to 1.5px for visual consistency - Add immutable cache headers (1 year) for static SVG assets - Update all UI references to new brand name --- web/next.config.mjs | 25 +++++++++++++++++++++++- web/public/favicon.svg | 9 +++++---- web/public/teams/monarch-sync.svg | 22 --------------------- web/public/teams/retail-sync.svg | 19 ++++++++++++++++++ web/src/app/(app)/application-layout.tsx | 4 ++-- web/src/app/layout.tsx | 6 +++--- 6 files changed, 53 insertions(+), 32 deletions(-) delete mode 100644 web/public/teams/monarch-sync.svg create mode 100644 web/public/teams/retail-sync.svg diff --git a/web/next.config.mjs b/web/next.config.mjs index 1d61478..8c59a49 100644 --- a/web/next.config.mjs +++ b/web/next.config.mjs @@ -1,4 +1,27 @@ /** @type {import('next').NextConfig} */ -const nextConfig = {} +const nextConfig = { + async headers() { + return [ + { + source: '/:path*.svg', + headers: [ + { + key: 'Cache-Control', + value: 'public, max-age=31536000, immutable', + }, + ], + }, + { + source: '/favicon.svg', + headers: [ + { + key: 'Cache-Control', + value: 'public, max-age=31536000, immutable', + }, + ], + }, + ] + }, +} export default nextConfig diff --git a/web/public/favicon.svg b/web/public/favicon.svg index 069a0c4..7995db6 100644 --- a/web/public/favicon.svg +++ b/web/public/favicon.svg @@ -1,14 +1,15 @@ - - Monarch Sync + + Retail Sync + Sync icon with rotating arrows and receipt symbol for retail order synchronization + stroke="#06B6D4" stroke-width="1.5" stroke-linecap="round" stroke-linejoin="round" fill="none"/> + stroke="#06B6D4" stroke-width="1.5" stroke-linecap="round" stroke-linejoin="round" fill="none"/> diff --git a/web/public/teams/monarch-sync.svg b/web/public/teams/monarch-sync.svg deleted file mode 100644 index e19a70f..0000000 --- a/web/public/teams/monarch-sync.svg +++ /dev/null @@ -1,22 +0,0 @@ - - Monarch Sync - Icon showing sync arrows with receipt symbol, representing retail order synchronization - - - - - - - - - - - - - - - - - diff --git a/web/public/teams/retail-sync.svg b/web/public/teams/retail-sync.svg new file mode 100644 index 0000000..efbee44 --- /dev/null +++ b/web/public/teams/retail-sync.svg @@ -0,0 +1,19 @@ + + Retail Sync + Icon showing sync arrows with receipt symbol, representing retail order synchronization + + + + + + + + + + + + + + diff --git a/web/src/app/(app)/application-layout.tsx b/web/src/app/(app)/application-layout.tsx index 5f6b256..e529faa 100644 --- a/web/src/app/(app)/application-layout.tsx +++ b/web/src/app/(app)/application-layout.tsx @@ -88,8 +88,8 @@ export function ApplicationLayout({ children }: { children: React.ReactNode }) { - - Monarch Sync + + Retail Sync diff --git a/web/src/app/layout.tsx b/web/src/app/layout.tsx index e3907e4..b807f70 100644 --- a/web/src/app/layout.tsx +++ b/web/src/app/layout.tsx @@ -4,10 +4,10 @@ import { ThemeProvider } from '@/lib/theme-context' export const metadata: Metadata = { title: { - template: '%s - Monarch Sync', - default: 'Monarch Sync', + template: '%s - Retail Sync', + default: 'Retail Sync', }, - description: 'Sync your Walmart, Costco, and Amazon orders with Monarch Money', + description: 'Sync your Walmart, Costco, and Amazon orders with Monarch Money. Third-party tool, not affiliated with Monarch Money Inc.', icons: { icon: '/favicon.svg', }, From cc968acec52a690113be90f8d8328cfe291f1d30 Mon Sep 17 00:00:00 2001 From: Erick Shaffer Date: Tue, 23 Dec 2025 19:58:07 -0700 Subject: [PATCH 07/10] fix(web): Update Quick Start page to use 'Retail Sync' branding Fixes branding inconsistency where Quick Start page still referenced 'Monarch Sync' while the rest of the UI uses 'Retail Sync'. --- web/src/app/(app)/settings/page.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web/src/app/(app)/settings/page.tsx b/web/src/app/(app)/settings/page.tsx index d991a66..4bc2535 100644 --- a/web/src/app/(app)/settings/page.tsx +++ b/web/src/app/(app)/settings/page.tsx @@ -64,7 +64,7 @@ export default function Settings() {
Quick Start - Monarch Sync is a CLI tool that syncs your retail purchases with Monarch Money. Here's how to get started. + Retail Sync is a CLI tool that syncs your retail purchases with Monarch Money. Here's how to get started. From 4bceb1a03184360fc3804841a6562aad889afba7 Mon Sep 17 00:00:00 2001 From: Erick Shaffer Date: Wed, 24 Dec 2025 09:39:39 -0700 Subject: [PATCH 08/10] feat: Add ledger storage and API endpoints for Walmart order payment tracking Tracks payment method charges, refunds, and gift cards for Walmart orders. Provides API endpoints to view ledger history by order ID with filtering by provider, state (pending/charged/partial_refund), and pagination. --- internal/adapters/providers/walmart/order.go | 6 + internal/api/dto/responses.go | 41 ++ internal/api/handlers/ledgers.go | 175 ++++++ internal/api/server.go | 7 + internal/application/sync/handlers/walmart.go | 132 +++- internal/application/sync/orchestrator.go | 4 + internal/application/sync/types.go | 59 ++ internal/infrastructure/storage/interfaces.go | 25 + .../infrastructure/storage/ledger_test.go | 536 ++++++++++++++++ internal/infrastructure/storage/migrations.go | 89 +++ internal/infrastructure/storage/mock.go | 214 ++++++- internal/infrastructure/storage/models.go | 67 ++ internal/infrastructure/storage/sqlite.go | 571 ++++++++++++++++++ 13 files changed, 1913 insertions(+), 13 deletions(-) create mode 100644 internal/api/handlers/ledgers.go create mode 100644 internal/infrastructure/storage/ledger_test.go diff --git a/internal/adapters/providers/walmart/order.go b/internal/adapters/providers/walmart/order.go index 4fa71c0..b3a8c4d 100644 --- a/internal/adapters/providers/walmart/order.go +++ b/internal/adapters/providers/walmart/order.go @@ -274,3 +274,9 @@ func (o *Order) IsMultiDelivery() (bool, error) { } return len(charges) > 1, nil } + +// GetRawLedger returns the cached ledger data for persistence +// Returns nil if ledger hasn't been fetched yet +func (o *Order) GetRawLedger() *walmartclient.OrderLedger { + return o.ledgerCache +} diff --git a/internal/api/dto/responses.go b/internal/api/dto/responses.go index 08f811e..88be2ec 100644 --- a/internal/api/dto/responses.go +++ b/internal/api/dto/responses.go @@ -122,3 +122,44 @@ func NewHealthResponse() HealthResponse { Timestamp: time.Now().UTC().Format(time.RFC3339), } } + +// LedgerResponse represents a ledger snapshot in API responses. +type LedgerResponse struct { + ID int64 `json:"id"` + OrderID string `json:"order_id"` + SyncRunID int64 `json:"sync_run_id,omitempty"` + Provider string `json:"provider"` + FetchedAt string `json:"fetched_at"` + LedgerState string `json:"ledger_state"` + LedgerVersion int `json:"ledger_version"` + TotalCharged float64 `json:"total_charged"` + ChargeCount int `json:"charge_count"` + PaymentMethodTypes string `json:"payment_method_types"` + HasRefunds bool `json:"has_refunds"` + IsValid bool `json:"is_valid"` + ValidationNotes string `json:"validation_notes,omitempty"` + Charges []ChargeResponse `json:"charges,omitempty"` +} + +// ChargeResponse represents a single charge within a ledger. +type ChargeResponse struct { + ID int64 `json:"id"` + ChargeSequence int `json:"charge_sequence"` + ChargeAmount float64 `json:"charge_amount"` + ChargeType string `json:"charge_type"` + PaymentMethod string `json:"payment_method"` + CardType string `json:"card_type,omitempty"` + CardLastFour string `json:"card_last_four,omitempty"` + MonarchTransactionID string `json:"monarch_transaction_id,omitempty"` + IsMatched bool `json:"is_matched"` + MatchConfidence float64 `json:"match_confidence,omitempty"` + SplitCount int `json:"split_count,omitempty"` +} + +// LedgerListResponse is returned when listing ledgers. +type LedgerListResponse struct { + Ledgers []LedgerResponse `json:"ledgers"` + TotalCount int `json:"total_count"` + Limit int `json:"limit"` + Offset int `json:"offset"` +} diff --git a/internal/api/handlers/ledgers.go b/internal/api/handlers/ledgers.go new file mode 100644 index 0000000..a3410c3 --- /dev/null +++ b/internal/api/handlers/ledgers.go @@ -0,0 +1,175 @@ +package handlers + +import ( + "net/http" + "strconv" + + "github.com/go-chi/chi/v5" + + "github.com/eshaffer321/monarchmoney-sync-backend/internal/api/dto" + "github.com/eshaffer321/monarchmoney-sync-backend/internal/infrastructure/storage" +) + +// LedgersHandler handles ledger-related HTTP requests. +type LedgersHandler struct { + *Base +} + +// NewLedgersHandler creates a new ledgers handler. +func NewLedgersHandler(repo storage.Repository) *LedgersHandler { + return &LedgersHandler{ + Base: NewBase(repo), + } +} + +// List handles GET /api/ledgers - returns paginated list of ledgers. +func (h *LedgersHandler) List(w http.ResponseWriter, r *http.Request) { + filters := storage.LedgerFilters{ + OrderID: r.URL.Query().Get("order_id"), + Provider: r.URL.Query().Get("provider"), + Limit: ParseIntParam(r, "limit", 50), + Offset: ParseIntParam(r, "offset", 0), + } + + // Parse state filter + if state := r.URL.Query().Get("state"); state != "" { + filters.State = storage.LedgerState(state) + } + + result, err := h.repo.ListLedgers(filters) + if err != nil { + h.WriteError(w, http.StatusInternalServerError, dto.InternalError()) + return + } + + response := dto.LedgerListResponse{ + Ledgers: make([]dto.LedgerResponse, 0, len(result.Ledgers)), + TotalCount: result.TotalCount, + Limit: result.Limit, + Offset: result.Offset, + } + + for _, ledger := range result.Ledgers { + response.Ledgers = append(response.Ledgers, toLedgerResponse(ledger)) + } + + h.WriteJSON(w, http.StatusOK, response) +} + +// Get handles GET /api/ledgers/{id} - returns a single ledger by ID. +func (h *LedgersHandler) Get(w http.ResponseWriter, r *http.Request) { + idStr := chi.URLParam(r, "id") + if idStr == "" { + h.WriteError(w, http.StatusBadRequest, dto.BadRequestError("ledger ID is required")) + return + } + + id, err := strconv.ParseInt(idStr, 10, 64) + if err != nil { + h.WriteError(w, http.StatusBadRequest, dto.BadRequestError("invalid ledger ID")) + return + } + + ledger, err := h.repo.GetLedgerByID(id) + if err != nil { + h.WriteError(w, http.StatusInternalServerError, dto.InternalError()) + return + } + + if ledger == nil { + h.WriteError(w, http.StatusNotFound, dto.NotFoundError("ledger")) + return + } + + response := toLedgerResponse(ledger) + h.WriteJSON(w, http.StatusOK, response) +} + +// GetByOrderID handles GET /api/orders/{orderID}/ledger - returns the latest ledger for an order. +func (h *LedgersHandler) GetByOrderID(w http.ResponseWriter, r *http.Request) { + orderID := chi.URLParam(r, "orderID") + if orderID == "" { + h.WriteError(w, http.StatusBadRequest, dto.BadRequestError("order ID is required")) + return + } + + ledger, err := h.repo.GetLatestLedger(orderID) + if err != nil { + h.WriteError(w, http.StatusInternalServerError, dto.InternalError()) + return + } + + if ledger == nil { + h.WriteError(w, http.StatusNotFound, dto.NotFoundError("ledger")) + return + } + + response := toLedgerResponse(ledger) + h.WriteJSON(w, http.StatusOK, response) +} + +// GetHistoryByOrderID handles GET /api/orders/{orderID}/ledgers - returns all ledgers for an order. +func (h *LedgersHandler) GetHistoryByOrderID(w http.ResponseWriter, r *http.Request) { + orderID := chi.URLParam(r, "orderID") + if orderID == "" { + h.WriteError(w, http.StatusBadRequest, dto.BadRequestError("order ID is required")) + return + } + + ledgers, err := h.repo.GetLedgerHistory(orderID) + if err != nil { + h.WriteError(w, http.StatusInternalServerError, dto.InternalError()) + return + } + + response := dto.LedgerListResponse{ + Ledgers: make([]dto.LedgerResponse, 0, len(ledgers)), + TotalCount: len(ledgers), + Limit: len(ledgers), + Offset: 0, + } + + for _, ledger := range ledgers { + response.Ledgers = append(response.Ledgers, toLedgerResponse(ledger)) + } + + h.WriteJSON(w, http.StatusOK, response) +} + +// toLedgerResponse converts a storage OrderLedger to an API response. +func toLedgerResponse(ledger *storage.OrderLedger) dto.LedgerResponse { + response := dto.LedgerResponse{ + ID: ledger.ID, + OrderID: ledger.OrderID, + SyncRunID: ledger.SyncRunID, + Provider: ledger.Provider, + FetchedAt: ledger.FetchedAt.Format("2006-01-02T15:04:05Z"), + LedgerState: string(ledger.LedgerState), + LedgerVersion: ledger.LedgerVersion, + TotalCharged: ledger.TotalCharged, + ChargeCount: ledger.ChargeCount, + PaymentMethodTypes: ledger.PaymentMethodTypes, + HasRefunds: ledger.HasRefunds, + IsValid: ledger.IsValid, + ValidationNotes: ledger.ValidationNotes, + Charges: make([]dto.ChargeResponse, 0, len(ledger.Charges)), + } + + for _, charge := range ledger.Charges { + response.Charges = append(response.Charges, dto.ChargeResponse{ + ID: charge.ID, + ChargeSequence: charge.ChargeSequence, + ChargeAmount: charge.ChargeAmount, + ChargeType: charge.ChargeType, + PaymentMethod: charge.PaymentMethod, + CardType: charge.CardType, + CardLastFour: charge.CardLastFour, + MonarchTransactionID: charge.MonarchTransactionID, + IsMatched: charge.IsMatched, + MatchConfidence: charge.MatchConfidence, + SplitCount: charge.SplitCount, + }) + } + + return response +} diff --git a/internal/api/server.go b/internal/api/server.go index f2b5f13..85c151d 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -100,6 +100,13 @@ func (s *Server) setupRoutes() { statsHandler := handlers.NewStatsHandler(s.repo) r.Get("/stats", statsHandler.Get) + // Ledgers + ledgersHandler := handlers.NewLedgersHandler(s.repo) + r.Get("/ledgers", ledgersHandler.List) + r.Get("/ledgers/{id}", ledgersHandler.Get) + r.Get("/orders/{orderID}/ledger", ledgersHandler.GetByOrderID) + r.Get("/orders/{orderID}/ledgers", ledgersHandler.GetHistoryByOrderID) + // Sync operations (live sync jobs) if s.syncService != nil { syncHandler := handlers.NewSyncHandler(s.syncService) diff --git a/internal/application/sync/handlers/walmart.go b/internal/application/sync/handlers/walmart.go index 7ede29c..4c64189 100644 --- a/internal/application/sync/handlers/walmart.go +++ b/internal/application/sync/handlers/walmart.go @@ -3,6 +3,7 @@ package handlers import ( "context" + "encoding/json" "fmt" "log/slog" "math" @@ -22,13 +23,21 @@ type WalmartOrder interface { IsMultiDelivery() (bool, error) } +// WalmartOrderWithLedger extends WalmartOrder with ledger access for persistence +type WalmartOrderWithLedger interface { + WalmartOrder + GetRawLedger() interface{} // Returns *walmartclient.OrderLedger but using interface{} to avoid import +} + // WalmartHandler processes Walmart orders with multi-delivery and gift card support type WalmartHandler struct { - matcher *matcher.Matcher - consolidator TransactionConsolidator - splitter CategorySplitter - monarch MonarchClient - logger *slog.Logger + matcher *matcher.Matcher + consolidator TransactionConsolidator + splitter CategorySplitter + monarch MonarchClient + ledgerStorage LedgerStorage + syncRunID int64 + logger *slog.Logger } // NewWalmartHandler creates a new Walmart order handler @@ -48,6 +57,12 @@ func NewWalmartHandler( } } +// SetLedgerStorage sets the ledger storage for persisting ledger data +func (h *WalmartHandler) SetLedgerStorage(storage LedgerStorage, syncRunID int64) { + h.ledgerStorage = storage + h.syncRunID = syncRunID +} + // ProcessOrder processes a Walmart order func (h *WalmartHandler) ProcessOrder( ctx context.Context, @@ -82,6 +97,9 @@ func (h *WalmartHandler) ProcessOrder( "charges", bankCharges, "charge_count", len(bankCharges)) + // Save ledger data if storage is configured + h.saveLedgerIfAvailable(order) + // Step 2: Handle based on number of charges if len(bankCharges) > 1 { // Multi-delivery order @@ -362,3 +380,107 @@ func (h *WalmartHandler) logWarn(msg string, args ...any) { h.logger.Warn(msg, args...) } } + +// saveLedgerIfAvailable extracts and saves ledger data if storage is configured +func (h *WalmartHandler) saveLedgerIfAvailable(order WalmartOrder) { + // Skip if no storage configured + if h.ledgerStorage == nil { + return + } + + // Try to get the raw ledger from the concrete type + walmartOrder, ok := order.(*walmartprovider.Order) + if !ok { + h.logDebug("Cannot save ledger - order is not a Walmart provider order") + return + } + + rawLedger := walmartOrder.GetRawLedger() + if rawLedger == nil { + h.logDebug("Cannot save ledger - no ledger data available") + return + } + + // Convert the raw ledger to LedgerData + ledgerData := h.convertToLedgerData(order.GetID(), rawLedger) + + // Save it + if err := h.ledgerStorage.SaveLedger(ledgerData, h.syncRunID); err != nil { + h.logWarn("Failed to save ledger data", + "order_id", order.GetID(), + "error", err) + } else { + h.logDebug("Saved ledger data", + "order_id", order.GetID(), + "charge_count", ledgerData.ChargeCount) + } +} + +// convertToLedgerData converts raw Walmart ledger to the handler's LedgerData format +func (h *WalmartHandler) convertToLedgerData(orderID string, rawLedger interface{}) *LedgerData { + ledgerData := &LedgerData{ + OrderID: orderID, + Provider: "walmart", + IsValid: true, + } + + // Use reflection-free approach with type assertion + // The rawLedger is *walmartclient.OrderLedger + type walmartLedger struct { + OrderID string + PaymentMethods []struct { + PaymentType string + CardType string + LastFour string + FinalCharges []float64 + TotalCharged float64 + } + } + + // Marshal to JSON and back to extract the data + // This avoids importing the walmart client in handlers + import_json, _ := json.Marshal(rawLedger) + ledgerData.RawJSON = string(import_json) + + // Parse for payment method extraction + var parsed walmartLedger + if err := json.Unmarshal(import_json, &parsed); err != nil { + h.logWarn("Failed to parse ledger JSON", "error", err) + return ledgerData + } + + // Collect payment method types and charges + var paymentTypes []string + totalCharged := 0.0 + chargeCount := 0 + hasRefunds := false + + for _, pm := range parsed.PaymentMethods { + paymentTypes = append(paymentTypes, pm.PaymentType) + totalCharged += pm.TotalCharged + + pmData := PaymentMethodData{ + PaymentType: pm.PaymentType, + CardType: pm.CardType, + CardLastFour: pm.LastFour, + FinalCharges: pm.FinalCharges, + TotalCharged: pm.TotalCharged, + } + ledgerData.PaymentMethods = append(ledgerData.PaymentMethods, pmData) + + for _, charge := range pm.FinalCharges { + if charge > 0 { + chargeCount++ + } else if charge < 0 { + hasRefunds = true + } + } + } + + ledgerData.TotalCharged = totalCharged + ledgerData.ChargeCount = chargeCount + ledgerData.PaymentMethodTypes = strings.Join(paymentTypes, ",") + ledgerData.HasRefunds = hasRefunds + + return ledgerData +} diff --git a/internal/application/sync/orchestrator.go b/internal/application/sync/orchestrator.go index 533b103..bc9fc87 100644 --- a/internal/application/sync/orchestrator.go +++ b/internal/application/sync/orchestrator.go @@ -132,6 +132,10 @@ func (o *Orchestrator) Run(ctx context.Context, opts Options) (*Result, error) { if o.consolidator != nil { o.consolidator.SetRunID(o.runID) } + // Set up ledger storage for Walmart handler + if o.walmartHandler != nil { + o.walmartHandler.SetLedgerStorage(&ledgerStorageAdapter{repo: o.storage}, o.runID) + } } // 5. Process orders diff --git a/internal/application/sync/types.go b/internal/application/sync/types.go index f380fff..bd7b1de 100644 --- a/internal/application/sync/types.go +++ b/internal/application/sync/types.go @@ -164,3 +164,62 @@ func (a *monarchAdapter) UpdateTransaction(ctx context.Context, id string, param func (a *monarchAdapter) UpdateSplits(ctx context.Context, id string, splits []*monarch.TransactionSplit) error { return a.client.Transactions.UpdateSplits(ctx, id, splits) } + +// ledgerStorageAdapter wraps storage.Repository to implement handlers.LedgerStorage +type ledgerStorageAdapter struct { + repo storage.Repository +} + +func (a *ledgerStorageAdapter) SaveLedger(ledger *handlers.LedgerData, syncRunID int64) error { + if a.repo == nil { + return nil + } + + // Convert handlers.LedgerData to storage.OrderLedger + orderLedger := &storage.OrderLedger{ + OrderID: ledger.OrderID, + SyncRunID: syncRunID, + Provider: ledger.Provider, + LedgerJSON: ledger.RawJSON, + TotalCharged: ledger.TotalCharged, + ChargeCount: ledger.ChargeCount, + PaymentMethodTypes: ledger.PaymentMethodTypes, + HasRefunds: ledger.HasRefunds, + IsValid: ledger.IsValid, + ValidationNotes: ledger.ValidationNotes, + } + + // Determine ledger state + if ledger.ChargeCount == 0 { + orderLedger.LedgerState = storage.LedgerStatePending + } else if ledger.HasRefunds { + orderLedger.LedgerState = storage.LedgerStatePartialRefund + } else { + orderLedger.LedgerState = storage.LedgerStateCharged + } + + // Convert payment methods to charges + chargeSeq := 0 + for _, pm := range ledger.PaymentMethods { + for _, charge := range pm.FinalCharges { + chargeSeq++ + chargeType := "payment" + if charge < 0 { + chargeType = "refund" + } + ledgerCharge := storage.LedgerCharge{ + OrderID: ledger.OrderID, + SyncRunID: syncRunID, + ChargeSequence: chargeSeq, + ChargeAmount: charge, + ChargeType: chargeType, + PaymentMethod: pm.PaymentType, + CardType: pm.CardType, + CardLastFour: pm.CardLastFour, + } + orderLedger.Charges = append(orderLedger.Charges, ledgerCharge) + } + } + + return a.repo.SaveLedger(orderLedger) +} diff --git a/internal/infrastructure/storage/interfaces.go b/internal/infrastructure/storage/interfaces.go index 7edce58..8e936d0 100644 --- a/internal/infrastructure/storage/interfaces.go +++ b/internal/infrastructure/storage/interfaces.go @@ -7,6 +7,7 @@ type Repository interface { OrderRepository SyncRunRepository APICallRepository + LedgerRepository Close() error } @@ -102,3 +103,27 @@ type APICallRepository interface { // GetAPICallsByRunID retrieves all API calls for a specific sync run GetAPICallsByRunID(runID int64) ([]APICall, error) } + +// LedgerRepository handles order ledger storage and history +type LedgerRepository interface { + // SaveLedger saves a ledger snapshot with its charges + SaveLedger(ledger *OrderLedger) error + + // GetLatestLedger retrieves the most recent ledger for an order + GetLatestLedger(orderID string) (*OrderLedger, error) + + // GetLedgerHistory retrieves all ledger snapshots for an order (newest first) + GetLedgerHistory(orderID string) ([]*OrderLedger, error) + + // GetLedgerByID retrieves a specific ledger by ID + GetLedgerByID(id int64) (*OrderLedger, error) + + // ListLedgers returns ledgers matching the given filters with pagination + ListLedgers(filters LedgerFilters) (*LedgerListResult, error) + + // UpdateChargeMatch updates a ledger charge's match status + UpdateChargeMatch(chargeID int64, transactionID string, confidence float64, splitCount int) error + + // GetUnmatchedCharges returns charges that haven't been matched to Monarch transactions + GetUnmatchedCharges(provider string, limit int) ([]LedgerCharge, error) +} diff --git a/internal/infrastructure/storage/ledger_test.go b/internal/infrastructure/storage/ledger_test.go new file mode 100644 index 0000000..5621672 --- /dev/null +++ b/internal/infrastructure/storage/ledger_test.go @@ -0,0 +1,536 @@ +package storage + +import ( + "os" + "testing" + "time" +) + +func TestStorage_SaveLedger(t *testing.T) { + // Create temp DB + tmpFile, err := os.CreateTemp("", "ledger_test_*.db") + if err != nil { + t.Fatal(err) + } + defer os.Remove(tmpFile.Name()) + tmpFile.Close() + + s, err := NewStorage(tmpFile.Name()) + if err != nil { + t.Fatalf("failed to create storage: %v", err) + } + defer s.Close() + + // Create a ledger (SyncRunID 0 means NULL, no foreign key constraint) + ledger := &OrderLedger{ + OrderID: "test-order-123", + SyncRunID: 0, + Provider: "walmart", + FetchedAt: time.Now(), + LedgerState: LedgerStateCharged, + LedgerJSON: `{"test": "data"}`, + TotalCharged: 99.99, + ChargeCount: 1, + PaymentMethodTypes: "CREDITCARD", + HasRefunds: false, + IsValid: true, + ValidationNotes: "", + Charges: []LedgerCharge{ + { + ChargeSequence: 1, + ChargeAmount: 99.99, + ChargeType: "payment", + PaymentMethod: "CREDITCARD", + CardType: "VISA", + CardLastFour: "1234", + }, + }, + } + + // Save the ledger + err = s.SaveLedger(ledger) + if err != nil { + t.Fatalf("failed to save ledger: %v", err) + } + + // Verify ID was assigned + if ledger.ID == 0 { + t.Error("expected ledger ID to be assigned") + } + + // Retrieve it + retrieved, err := s.GetLatestLedger("test-order-123") + if err != nil { + t.Fatalf("failed to get latest ledger: %v", err) + } + + if retrieved == nil { + t.Fatal("expected to retrieve ledger, got nil") + } + + if retrieved.OrderID != "test-order-123" { + t.Errorf("expected order_id 'test-order-123', got %q", retrieved.OrderID) + } + + if retrieved.Provider != "walmart" { + t.Errorf("expected provider 'walmart', got %q", retrieved.Provider) + } + + if retrieved.LedgerState != LedgerStateCharged { + t.Errorf("expected state 'charged', got %q", retrieved.LedgerState) + } + + if retrieved.TotalCharged != 99.99 { + t.Errorf("expected total_charged 99.99, got %f", retrieved.TotalCharged) + } + + if retrieved.LedgerVersion != 1 { + t.Errorf("expected version 1, got %d", retrieved.LedgerVersion) + } + + // Check charges were saved + if len(retrieved.Charges) != 1 { + t.Errorf("expected 1 charge, got %d", len(retrieved.Charges)) + } else { + charge := retrieved.Charges[0] + if charge.ChargeAmount != 99.99 { + t.Errorf("expected charge amount 99.99, got %f", charge.ChargeAmount) + } + if charge.PaymentMethod != "CREDITCARD" { + t.Errorf("expected payment method 'CREDITCARD', got %q", charge.PaymentMethod) + } + if charge.CardLastFour != "1234" { + t.Errorf("expected card last four '1234', got %q", charge.CardLastFour) + } + } +} + +func TestStorage_LedgerVersioning(t *testing.T) { + // Create temp DB + tmpFile, err := os.CreateTemp("", "ledger_version_test_*.db") + if err != nil { + t.Fatal(err) + } + defer os.Remove(tmpFile.Name()) + tmpFile.Close() + + s, err := NewStorage(tmpFile.Name()) + if err != nil { + t.Fatalf("failed to create storage: %v", err) + } + defer s.Close() + + // Save first ledger (pending state) + ledger1 := &OrderLedger{ + OrderID: "order-456", + Provider: "walmart", + FetchedAt: time.Now().Add(-24 * time.Hour), + LedgerState: LedgerStatePending, + LedgerJSON: `{"state": "pending"}`, + } + if err := s.SaveLedger(ledger1); err != nil { + t.Fatalf("failed to save ledger 1: %v", err) + } + + // Save second ledger (charged state) + ledger2 := &OrderLedger{ + OrderID: "order-456", + Provider: "walmart", + FetchedAt: time.Now(), + LedgerState: LedgerStateCharged, + LedgerJSON: `{"state": "charged"}`, + TotalCharged: 50.00, + ChargeCount: 1, + Charges: []LedgerCharge{ + { + ChargeSequence: 1, + ChargeAmount: 50.00, + ChargeType: "payment", + PaymentMethod: "CREDITCARD", + }, + }, + } + if err := s.SaveLedger(ledger2); err != nil { + t.Fatalf("failed to save ledger 2: %v", err) + } + + // Version should be 2 now + if ledger2.LedgerVersion != 2 { + t.Errorf("expected version 2, got %d", ledger2.LedgerVersion) + } + + // Get latest should return the highest version (version 2) + latest, err := s.GetLatestLedger("order-456") + if err != nil { + t.Fatalf("failed to get latest: %v", err) + } + if latest.LedgerVersion != 2 { + t.Errorf("expected version 2 (latest), got %d", latest.LedgerVersion) + } + if latest.LedgerState != LedgerStateCharged { + t.Errorf("expected state 'charged', got %q", latest.LedgerState) + } + + // Get history should return both + history, err := s.GetLedgerHistory("order-456") + if err != nil { + t.Fatalf("failed to get history: %v", err) + } + if len(history) != 2 { + t.Fatalf("expected 2 history entries, got %d", len(history)) + } + // Verify both versions are present (order might vary due to same timestamp) + foundV1, foundV2 := false, false + for _, h := range history { + if h.LedgerVersion == 1 && h.LedgerState == LedgerStatePending { + foundV1 = true + } + if h.LedgerVersion == 2 && h.LedgerState == LedgerStateCharged { + foundV2 = true + } + } + if !foundV1 { + t.Error("expected to find version 1 with pending state") + } + if !foundV2 { + t.Error("expected to find version 2 with charged state") + } +} + +func TestStorage_GetLedgerByID(t *testing.T) { + // Create temp DB + tmpFile, err := os.CreateTemp("", "ledger_byid_test_*.db") + if err != nil { + t.Fatal(err) + } + defer os.Remove(tmpFile.Name()) + tmpFile.Close() + + s, err := NewStorage(tmpFile.Name()) + if err != nil { + t.Fatalf("failed to create storage: %v", err) + } + defer s.Close() + + // Save a ledger + ledger := &OrderLedger{ + OrderID: "order-789", + Provider: "costco", + FetchedAt: time.Now(), + LedgerState: LedgerStateCharged, + LedgerJSON: `{}`, + } + if err := s.SaveLedger(ledger); err != nil { + t.Fatalf("failed to save: %v", err) + } + + // Retrieve by ID + retrieved, err := s.GetLedgerByID(ledger.ID) + if err != nil { + t.Fatalf("failed to get by ID: %v", err) + } + if retrieved == nil { + t.Fatal("expected ledger, got nil") + } + if retrieved.OrderID != "order-789" { + t.Errorf("expected order_id 'order-789', got %q", retrieved.OrderID) + } + + // Non-existent ID should return nil + missing, err := s.GetLedgerByID(9999) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if missing != nil { + t.Error("expected nil for non-existent ID") + } +} + +func TestStorage_ListLedgers(t *testing.T) { + // Create temp DB + tmpFile, err := os.CreateTemp("", "ledger_list_test_*.db") + if err != nil { + t.Fatal(err) + } + defer os.Remove(tmpFile.Name()) + tmpFile.Close() + + s, err := NewStorage(tmpFile.Name()) + if err != nil { + t.Fatalf("failed to create storage: %v", err) + } + defer s.Close() + + // Save multiple ledgers + for _, data := range []struct { + orderID string + provider string + state LedgerState + }{ + {"order-1", "walmart", LedgerStateCharged}, + {"order-2", "walmart", LedgerStatePending}, + {"order-3", "costco", LedgerStateCharged}, + {"order-4", "amazon", LedgerStateRefunded}, + } { + ledger := &OrderLedger{ + OrderID: data.orderID, + Provider: data.provider, + FetchedAt: time.Now(), + LedgerState: data.state, + LedgerJSON: `{}`, + } + if err := s.SaveLedger(ledger); err != nil { + t.Fatalf("failed to save ledger: %v", err) + } + } + + // List all + result, err := s.ListLedgers(LedgerFilters{}) + if err != nil { + t.Fatalf("failed to list: %v", err) + } + if result.TotalCount != 4 { + t.Errorf("expected total count 4, got %d", result.TotalCount) + } + + // Filter by provider + result, err = s.ListLedgers(LedgerFilters{Provider: "walmart"}) + if err != nil { + t.Fatalf("failed to list: %v", err) + } + if result.TotalCount != 2 { + t.Errorf("expected 2 walmart ledgers, got %d", result.TotalCount) + } + + // Filter by state + result, err = s.ListLedgers(LedgerFilters{State: LedgerStateCharged}) + if err != nil { + t.Fatalf("failed to list: %v", err) + } + if result.TotalCount != 2 { + t.Errorf("expected 2 charged ledgers, got %d", result.TotalCount) + } + + // Pagination + result, err = s.ListLedgers(LedgerFilters{Limit: 2, Offset: 0}) + if err != nil { + t.Fatalf("failed to list: %v", err) + } + if len(result.Ledgers) != 2 { + t.Errorf("expected 2 ledgers with limit, got %d", len(result.Ledgers)) + } + if result.TotalCount != 4 { + t.Errorf("expected total count 4, got %d", result.TotalCount) + } +} + +func TestStorage_UpdateChargeMatch(t *testing.T) { + // Create temp DB + tmpFile, err := os.CreateTemp("", "ledger_match_test_*.db") + if err != nil { + t.Fatal(err) + } + defer os.Remove(tmpFile.Name()) + tmpFile.Close() + + s, err := NewStorage(tmpFile.Name()) + if err != nil { + t.Fatalf("failed to create storage: %v", err) + } + defer s.Close() + + // Save a ledger with charges + ledger := &OrderLedger{ + OrderID: "match-order", + Provider: "walmart", + FetchedAt: time.Now(), + LedgerState: LedgerStateCharged, + LedgerJSON: `{}`, + Charges: []LedgerCharge{ + { + ChargeSequence: 1, + ChargeAmount: 100.00, + ChargeType: "payment", + PaymentMethod: "CREDITCARD", + }, + }, + } + if err := s.SaveLedger(ledger); err != nil { + t.Fatalf("failed to save: %v", err) + } + + // Get the charge ID + retrieved, _ := s.GetLatestLedger("match-order") + chargeID := retrieved.Charges[0].ID + + // Update match + err = s.UpdateChargeMatch(chargeID, "monarch-tx-123", 0.95, 3) + if err != nil { + t.Fatalf("failed to update match: %v", err) + } + + // Verify update + retrieved, _ = s.GetLatestLedger("match-order") + charge := retrieved.Charges[0] + if !charge.IsMatched { + t.Error("expected charge to be matched") + } + if charge.MonarchTransactionID != "monarch-tx-123" { + t.Errorf("expected monarch tx 'monarch-tx-123', got %q", charge.MonarchTransactionID) + } + if charge.MatchConfidence != 0.95 { + t.Errorf("expected confidence 0.95, got %f", charge.MatchConfidence) + } + if charge.SplitCount != 3 { + t.Errorf("expected split count 3, got %d", charge.SplitCount) + } +} + +func TestStorage_GetUnmatchedCharges(t *testing.T) { + // Create temp DB + tmpFile, err := os.CreateTemp("", "ledger_unmatched_test_*.db") + if err != nil { + t.Fatal(err) + } + defer os.Remove(tmpFile.Name()) + tmpFile.Close() + + s, err := NewStorage(tmpFile.Name()) + if err != nil { + t.Fatalf("failed to create storage: %v", err) + } + defer s.Close() + + // Save ledgers with mixed matched/unmatched charges + ledger1 := &OrderLedger{ + OrderID: "unmatched-1", + Provider: "walmart", + FetchedAt: time.Now(), + LedgerState: LedgerStateCharged, + LedgerJSON: `{}`, + Charges: []LedgerCharge{ + {ChargeSequence: 1, ChargeAmount: 50.00, ChargeType: "payment", PaymentMethod: "CREDITCARD"}, + }, + } + if err := s.SaveLedger(ledger1); err != nil { + t.Fatalf("failed to save: %v", err) + } + + ledger2 := &OrderLedger{ + OrderID: "unmatched-2", + Provider: "costco", + FetchedAt: time.Now(), + LedgerState: LedgerStateCharged, + LedgerJSON: `{}`, + Charges: []LedgerCharge{ + {ChargeSequence: 1, ChargeAmount: 75.00, ChargeType: "payment", PaymentMethod: "CREDITCARD"}, + }, + } + if err := s.SaveLedger(ledger2); err != nil { + t.Fatalf("failed to save: %v", err) + } + + // Mark one as matched + retrieved, _ := s.GetLatestLedger("unmatched-1") + s.UpdateChargeMatch(retrieved.Charges[0].ID, "tx-matched", 1.0, 1) + + // Get unmatched for all providers + unmatched, err := s.GetUnmatchedCharges("", 50) + if err != nil { + t.Fatalf("failed to get unmatched: %v", err) + } + if len(unmatched) != 1 { + t.Errorf("expected 1 unmatched charge, got %d", len(unmatched)) + } + if len(unmatched) > 0 && unmatched[0].OrderID != "unmatched-2" { + t.Errorf("expected order 'unmatched-2', got %q", unmatched[0].OrderID) + } + + // Get unmatched for specific provider + unmatched, err = s.GetUnmatchedCharges("walmart", 50) + if err != nil { + t.Fatalf("failed to get unmatched: %v", err) + } + if len(unmatched) != 0 { + t.Errorf("expected 0 unmatched walmart charges, got %d", len(unmatched)) + } +} + +func TestMockRepository_Ledger(t *testing.T) { + mock := NewMockRepository() + + // Test SaveLedger + ledger := &OrderLedger{ + OrderID: "mock-order-1", + Provider: "walmart", + FetchedAt: time.Now(), + LedgerState: LedgerStateCharged, + LedgerJSON: `{"test": true}`, + Charges: []LedgerCharge{ + {ChargeSequence: 1, ChargeAmount: 25.00, ChargeType: "payment"}, + }, + } + if err := mock.SaveLedger(ledger); err != nil { + t.Fatalf("failed to save: %v", err) + } + + if !mock.SaveLedgerCalled { + t.Error("expected SaveLedgerCalled to be true") + } + if mock.LastSavedLedger != ledger { + t.Error("expected LastSavedLedger to be set") + } + if ledger.ID == 0 { + t.Error("expected ID to be assigned") + } + if ledger.LedgerVersion != 1 { + t.Errorf("expected version 1, got %d", ledger.LedgerVersion) + } + + // Test GetLatestLedger + latest, err := mock.GetLatestLedger("mock-order-1") + if err != nil { + t.Fatalf("failed to get latest: %v", err) + } + if latest == nil { + t.Fatal("expected ledger, got nil") + } + if latest.OrderID != "mock-order-1" { + t.Errorf("expected order 'mock-order-1', got %q", latest.OrderID) + } + + // Test version incrementing + ledger2 := &OrderLedger{ + OrderID: "mock-order-1", + Provider: "walmart", + FetchedAt: time.Now(), + LedgerState: LedgerStateRefunded, + LedgerJSON: `{"refund": true}`, + } + mock.SaveLedger(ledger2) + if ledger2.LedgerVersion != 2 { + t.Errorf("expected version 2, got %d", ledger2.LedgerVersion) + } + + // Test GetLedgerHistory + history, _ := mock.GetLedgerHistory("mock-order-1") + if len(history) != 2 { + t.Errorf("expected 2 history entries, got %d", len(history)) + } + + // Test error injection + mock.SaveLedgerErr = os.ErrNotExist + if err := mock.SaveLedger(&OrderLedger{}); err != os.ErrNotExist { + t.Errorf("expected injected error, got %v", err) + } + + // Test Reset clears ledger data + mock.Reset() + if mock.SaveLedgerCalled { + t.Error("expected SaveLedgerCalled to be false after reset") + } + latest, _ = mock.GetLatestLedger("mock-order-1") + if latest != nil { + t.Error("expected nil after reset") + } +} diff --git a/internal/infrastructure/storage/migrations.go b/internal/infrastructure/storage/migrations.go index 0d318cb..a521e10 100644 --- a/internal/infrastructure/storage/migrations.go +++ b/internal/infrastructure/storage/migrations.go @@ -35,6 +35,11 @@ var allMigrations = []Migration{ Name: "backfill_null_values", Up: migration004BackfillNullValues, }, + { + Version: 5, + Name: "add_ledger_tables", + Up: migration005AddLedgerTables, + }, } // runMigrations executes all pending migrations @@ -291,3 +296,87 @@ func migration004BackfillNullValues(db *sql.Tx) error { return nil } + +// migration005AddLedgerTables creates tables for storing order ledger data and charges. +// This enables: +// - Tracking ledger state changes over time (payment_pending → charged → refunded) +// - Per-charge tracking for multi-delivery orders +// - Detecting when ledger changes require re-processing +// - Audit trail for debugging and refund matching +func migration005AddLedgerTables(db *sql.Tx) error { + queries := []string{ + // order_ledgers: Store ledger snapshots with history + `CREATE TABLE IF NOT EXISTS order_ledgers ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + order_id TEXT NOT NULL, + sync_run_id INTEGER, + provider TEXT NOT NULL, + fetched_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + ledger_state TEXT NOT NULL, + ledger_version INTEGER DEFAULT 1, + ledger_json TEXT NOT NULL, + total_charged REAL, + charge_count INTEGER, + payment_method_types TEXT, + has_refunds BOOLEAN DEFAULT 0, + is_valid BOOLEAN DEFAULT 1, + validation_notes TEXT, + FOREIGN KEY (sync_run_id) REFERENCES sync_runs(id) + )`, + + // Indexes for order_ledgers + `CREATE INDEX IF NOT EXISTS idx_order_ledgers_order_id + ON order_ledgers(order_id)`, + + `CREATE INDEX IF NOT EXISTS idx_order_ledgers_provider + ON order_ledgers(provider)`, + + `CREATE INDEX IF NOT EXISTS idx_order_ledgers_state + ON order_ledgers(ledger_state)`, + + `CREATE INDEX IF NOT EXISTS idx_order_ledgers_fetched + ON order_ledgers(fetched_at DESC)`, + + // ledger_charges: Normalized charge entries for querying + `CREATE TABLE IF NOT EXISTS ledger_charges ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + order_ledger_id INTEGER NOT NULL, + order_id TEXT NOT NULL, + sync_run_id INTEGER, + charge_sequence INTEGER NOT NULL, + charge_amount REAL NOT NULL, + charge_type TEXT, + payment_method TEXT, + card_type TEXT, + card_last_four TEXT, + monarch_transaction_id TEXT, + is_matched BOOLEAN DEFAULT 0, + match_confidence REAL, + matched_at TIMESTAMP, + split_count INTEGER, + FOREIGN KEY (order_ledger_id) REFERENCES order_ledgers(id), + FOREIGN KEY (sync_run_id) REFERENCES sync_runs(id) + )`, + + // Indexes for ledger_charges + `CREATE INDEX IF NOT EXISTS idx_ledger_charges_order_id + ON ledger_charges(order_id)`, + + `CREATE INDEX IF NOT EXISTS idx_ledger_charges_ledger_id + ON ledger_charges(order_ledger_id)`, + + `CREATE INDEX IF NOT EXISTS idx_ledger_charges_monarch_tx + ON ledger_charges(monarch_transaction_id)`, + + `CREATE INDEX IF NOT EXISTS idx_ledger_charges_unmatched + ON ledger_charges(is_matched) WHERE is_matched = 0`, + } + + for _, query := range queries { + if _, err := db.Exec(query); err != nil { + return fmt.Errorf("failed to create ledger tables: %w", err) + } + } + + return nil +} diff --git a/internal/infrastructure/storage/mock.go b/internal/infrastructure/storage/mock.go index 84e23f5..e77d19f 100644 --- a/internal/infrastructure/storage/mock.go +++ b/internal/infrastructure/storage/mock.go @@ -3,10 +3,14 @@ package storage // MockRepository is an in-memory implementation of Repository for testing. // It stores all data in maps and slices, making tests fast and isolated. type MockRepository struct { - records map[string]*ProcessingRecord - syncRuns map[int64]*mockSyncRun - apiCalls []APICall - nextRunID int64 + records map[string]*ProcessingRecord + syncRuns map[int64]*mockSyncRun + apiCalls []APICall + ledgers map[string][]*OrderLedger // Keyed by order_id + ledgerCharges map[int64][]LedgerCharge // Keyed by ledger_id + nextRunID int64 + nextLedgerID int64 + nextChargeID int64 // Hooks for test assertions SaveRecordCalled bool @@ -15,6 +19,8 @@ type MockRepository struct { IsProcessedCalled bool StartSyncRunCalled bool LogAPICallCalled bool + SaveLedgerCalled bool + LastSavedLedger *OrderLedger // Error injection for testing error paths SaveRecordErr error @@ -22,6 +28,7 @@ type MockRepository struct { StartSyncRunErr error CompleteSyncRunErr error LogAPICallErr error + SaveLedgerErr error } type mockSyncRun struct { @@ -39,10 +46,14 @@ type mockSyncRun struct { // NewMockRepository creates a new mock repository for testing func NewMockRepository() *MockRepository { return &MockRepository{ - records: make(map[string]*ProcessingRecord), - syncRuns: make(map[int64]*mockSyncRun), - apiCalls: make([]APICall, 0), - nextRunID: 1, + records: make(map[string]*ProcessingRecord), + syncRuns: make(map[int64]*mockSyncRun), + apiCalls: make([]APICall, 0), + ledgers: make(map[string][]*OrderLedger), + ledgerCharges: make(map[int64][]LedgerCharge), + nextRunID: 1, + nextLedgerID: 1, + nextChargeID: 1, } } @@ -386,16 +397,203 @@ func (m *MockRepository) Reset() { m.records = make(map[string]*ProcessingRecord) m.syncRuns = make(map[int64]*mockSyncRun) m.apiCalls = make([]APICall, 0) + m.ledgers = make(map[string][]*OrderLedger) + m.ledgerCharges = make(map[int64][]LedgerCharge) m.nextRunID = 1 + m.nextLedgerID = 1 + m.nextChargeID = 1 m.SaveRecordCalled = false m.LastSavedRecord = nil m.GetRecordCalled = false m.IsProcessedCalled = false m.StartSyncRunCalled = false m.LogAPICallCalled = false + m.SaveLedgerCalled = false + m.LastSavedLedger = nil m.SaveRecordErr = nil m.GetRecordErr = nil m.StartSyncRunErr = nil m.CompleteSyncRunErr = nil m.LogAPICallErr = nil + m.SaveLedgerErr = nil +} + +// ================================================================ +// LEDGER REPOSITORY METHODS +// ================================================================ + +// SaveLedger saves a ledger snapshot with its charges +func (m *MockRepository) SaveLedger(ledger *OrderLedger) error { + m.SaveLedgerCalled = true + m.LastSavedLedger = ledger + if m.SaveLedgerErr != nil { + return m.SaveLedgerErr + } + + // Assign ID + ledger.ID = m.nextLedgerID + m.nextLedgerID++ + + // Calculate version based on existing ledgers for this order + existingLedgers := m.ledgers[ledger.OrderID] + ledger.LedgerVersion = len(existingLedgers) + 1 + + // Deep copy the ledger + copied := *ledger + + // Process and store charges + var charges []LedgerCharge + for i := range ledger.Charges { + charge := ledger.Charges[i] + charge.ID = m.nextChargeID + m.nextChargeID++ + charge.OrderLedgerID = copied.ID + charge.OrderID = copied.OrderID + charge.SyncRunID = copied.SyncRunID + charges = append(charges, charge) + } + copied.Charges = charges + + // Store in ledgers map (append to history) + m.ledgers[ledger.OrderID] = append(m.ledgers[ledger.OrderID], &copied) + + // Store charges by ledger ID + m.ledgerCharges[copied.ID] = charges + + return nil +} + +// GetLatestLedger retrieves the most recent ledger for an order +func (m *MockRepository) GetLatestLedger(orderID string) (*OrderLedger, error) { + ledgers := m.ledgers[orderID] + if len(ledgers) == 0 { + return nil, nil + } + // Return the last one (most recent) + latest := ledgers[len(ledgers)-1] + // Attach charges + result := *latest + result.Charges = m.ledgerCharges[latest.ID] + return &result, nil +} + +// GetLedgerHistory retrieves all ledger snapshots for an order (newest first) +func (m *MockRepository) GetLedgerHistory(orderID string) ([]*OrderLedger, error) { + ledgers := m.ledgers[orderID] + if len(ledgers) == 0 { + return nil, nil + } + + // Return in reverse order (newest first) + result := make([]*OrderLedger, len(ledgers)) + for i, l := range ledgers { + copied := *l + copied.Charges = m.ledgerCharges[l.ID] + result[len(ledgers)-1-i] = &copied + } + return result, nil +} + +// GetLedgerByID retrieves a specific ledger by ID +func (m *MockRepository) GetLedgerByID(id int64) (*OrderLedger, error) { + for _, ledgers := range m.ledgers { + for _, l := range ledgers { + if l.ID == id { + result := *l + result.Charges = m.ledgerCharges[l.ID] + return &result, nil + } + } + } + return nil, nil +} + +// ListLedgers returns ledgers matching the given filters with pagination +func (m *MockRepository) ListLedgers(filters LedgerFilters) (*LedgerListResult, error) { + var matching []*OrderLedger + + for _, ledgers := range m.ledgers { + for _, l := range ledgers { + // Apply filters + if filters.OrderID != "" && l.OrderID != filters.OrderID { + continue + } + if filters.Provider != "" && l.Provider != filters.Provider { + continue + } + if filters.State != "" && l.LedgerState != filters.State { + continue + } + matching = append(matching, l) + } + } + + // Apply defaults + limit := filters.Limit + if limit == 0 { + limit = 50 + } + + // Apply pagination + total := len(matching) + start := filters.Offset + if start > total { + start = total + } + end := start + limit + if end > total { + end = total + } + + return &LedgerListResult{ + Ledgers: matching[start:end], + TotalCount: total, + Limit: limit, + Offset: filters.Offset, + }, nil +} + +// UpdateChargeMatch updates a ledger charge's match status +func (m *MockRepository) UpdateChargeMatch(chargeID int64, transactionID string, confidence float64, splitCount int) error { + for ledgerID, charges := range m.ledgerCharges { + for i, charge := range charges { + if charge.ID == chargeID { + m.ledgerCharges[ledgerID][i].MonarchTransactionID = transactionID + m.ledgerCharges[ledgerID][i].IsMatched = true + m.ledgerCharges[ledgerID][i].MatchConfidence = confidence + m.ledgerCharges[ledgerID][i].SplitCount = splitCount + return nil + } + } + } + return nil +} + +// GetUnmatchedCharges returns charges that haven't been matched to Monarch transactions +func (m *MockRepository) GetUnmatchedCharges(provider string, limit int) ([]LedgerCharge, error) { + if limit == 0 { + limit = 50 + } + + var result []LedgerCharge + for _, charges := range m.ledgerCharges { + for _, charge := range charges { + if charge.IsMatched { + continue + } + if charge.ChargeType != "payment" { + continue + } + // Check provider via the ledger + ledger, _ := m.GetLedgerByID(charge.OrderLedgerID) + if ledger != nil && provider != "" && ledger.Provider != provider { + continue + } + result = append(result, charge) + if len(result) >= limit { + return result, nil + } + } + } + return result, nil } diff --git a/internal/infrastructure/storage/models.go b/internal/infrastructure/storage/models.go index e80bacc..c3b7abd 100644 --- a/internal/infrastructure/storage/models.go +++ b/internal/infrastructure/storage/models.go @@ -121,3 +121,70 @@ type APICall struct { Error string DurationMs int64 } + +// LedgerState represents the current state of an order's ledger +type LedgerState string + +const ( + LedgerStatePending LedgerState = "payment_pending" + LedgerStateCharged LedgerState = "charged" + LedgerStateRefunded LedgerState = "refunded" + LedgerStatePartialRefund LedgerState = "partial_refund" +) + +// OrderLedger represents a snapshot of an order's ledger at a point in time +type OrderLedger struct { + ID int64 `json:"id"` + OrderID string `json:"order_id"` + SyncRunID int64 `json:"sync_run_id,omitempty"` + Provider string `json:"provider"` + FetchedAt time.Time `json:"fetched_at"` + LedgerState LedgerState `json:"ledger_state"` + LedgerVersion int `json:"ledger_version"` + LedgerJSON string `json:"ledger_json"` // Raw provider JSON + TotalCharged float64 `json:"total_charged"` // Sum of all charges + ChargeCount int `json:"charge_count"` // Number of charges + PaymentMethodTypes string `json:"payment_method_types"` // Comma-separated: "CREDITCARD,GIFTCARD" + HasRefunds bool `json:"has_refunds"` + IsValid bool `json:"is_valid"` + ValidationNotes string `json:"validation_notes,omitempty"` + + // Populated from ledger_charges table + Charges []LedgerCharge `json:"charges,omitempty"` +} + +// LedgerCharge represents a single charge within a ledger +type LedgerCharge struct { + ID int64 `json:"id"` + OrderLedgerID int64 `json:"order_ledger_id"` + OrderID string `json:"order_id"` + SyncRunID int64 `json:"sync_run_id,omitempty"` + ChargeSequence int `json:"charge_sequence"` // Order within ledger + ChargeAmount float64 `json:"charge_amount"` + ChargeType string `json:"charge_type"` // "payment", "refund" + PaymentMethod string `json:"payment_method"` // "CREDITCARD", "GIFTCARD" + CardType string `json:"card_type,omitempty"` // "VISA", "AMEX" + CardLastFour string `json:"card_last_four,omitempty"` + MonarchTransactionID string `json:"monarch_transaction_id,omitempty"` + IsMatched bool `json:"is_matched"` + MatchConfidence float64 `json:"match_confidence,omitempty"` + MatchedAt time.Time `json:"matched_at,omitempty"` + SplitCount int `json:"split_count,omitempty"` +} + +// LedgerFilters defines filters for querying ledgers +type LedgerFilters struct { + OrderID string // Filter by order ID + Provider string // Filter by provider + State LedgerState // Filter by ledger state + Limit int // Max results (0 = default 50) + Offset int // Pagination offset +} + +// LedgerListResult contains paginated ledger results +type LedgerListResult struct { + Ledgers []*OrderLedger `json:"ledgers"` + TotalCount int `json:"total_count"` + Limit int `json:"limit"` + Offset int `json:"offset"` +} diff --git a/internal/infrastructure/storage/sqlite.go b/internal/infrastructure/storage/sqlite.go index 9fa8f26..f0c2567 100644 --- a/internal/infrastructure/storage/sqlite.go +++ b/internal/infrastructure/storage/sqlite.go @@ -4,6 +4,7 @@ import ( "database/sql" "encoding/json" "fmt" + "time" _ "github.com/mattn/go-sqlite3" ) @@ -662,3 +663,573 @@ func (s *Storage) GetSyncRun(runID int64) (*SyncRun, error) { return &r, nil } + +// ================================================================ +// LEDGER REPOSITORY IMPLEMENTATION +// ================================================================ + +// SaveLedger saves a ledger snapshot with its charges in a transaction +func (s *Storage) SaveLedger(ledger *OrderLedger) error { + tx, err := s.db.Begin() + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + defer func() { _ = tx.Rollback() }() + + // Determine the next ledger version for this order + var currentVersion int + err = tx.QueryRow(` + SELECT COALESCE(MAX(ledger_version), 0) + FROM order_ledgers + WHERE order_id = ? AND provider = ? + `, ledger.OrderID, ledger.Provider).Scan(¤tVersion) + if err != nil { + return fmt.Errorf("failed to get current version: %w", err) + } + ledger.LedgerVersion = currentVersion + 1 + + // Insert the ledger + result, err := tx.Exec(` + INSERT INTO order_ledgers + (order_id, sync_run_id, provider, ledger_state, ledger_version, + ledger_json, total_charged, charge_count, payment_method_types, + has_refunds, is_valid, validation_notes) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `, + ledger.OrderID, + nullInt64(ledger.SyncRunID), + ledger.Provider, + ledger.LedgerState, + ledger.LedgerVersion, + ledger.LedgerJSON, + ledger.TotalCharged, + ledger.ChargeCount, + ledger.PaymentMethodTypes, + ledger.HasRefunds, + ledger.IsValid, + ledger.ValidationNotes, + ) + if err != nil { + return fmt.Errorf("failed to insert ledger: %w", err) + } + + ledgerID, err := result.LastInsertId() + if err != nil { + return fmt.Errorf("failed to get ledger ID: %w", err) + } + ledger.ID = ledgerID + + // Insert charges + for i := range ledger.Charges { + charge := &ledger.Charges[i] + charge.OrderLedgerID = ledgerID + charge.OrderID = ledger.OrderID + charge.SyncRunID = ledger.SyncRunID + + result, err := tx.Exec(` + INSERT INTO ledger_charges + (order_ledger_id, order_id, sync_run_id, charge_sequence, + charge_amount, charge_type, payment_method, card_type, card_last_four, + monarch_transaction_id, is_matched, match_confidence, matched_at, split_count) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `, + charge.OrderLedgerID, + charge.OrderID, + nullInt64(charge.SyncRunID), + charge.ChargeSequence, + charge.ChargeAmount, + charge.ChargeType, + charge.PaymentMethod, + charge.CardType, + charge.CardLastFour, + nullString(charge.MonarchTransactionID), + charge.IsMatched, + charge.MatchConfidence, + nullTime(charge.MatchedAt), + charge.SplitCount, + ) + if err != nil { + return fmt.Errorf("failed to insert charge: %w", err) + } + + chargeID, _ := result.LastInsertId() + charge.ID = chargeID + } + + return tx.Commit() +} + +// GetLatestLedger retrieves the most recent ledger for an order +func (s *Storage) GetLatestLedger(orderID string) (*OrderLedger, error) { + query := ` + SELECT id, order_id, sync_run_id, provider, fetched_at, ledger_state, + ledger_version, ledger_json, total_charged, charge_count, + payment_method_types, has_refunds, is_valid, validation_notes + FROM order_ledgers + WHERE order_id = ? + ORDER BY ledger_version DESC + LIMIT 1 + ` + + ledger := &OrderLedger{} + var syncRunID sql.NullInt64 + var validationNotes sql.NullString + + err := s.db.QueryRow(query, orderID).Scan( + &ledger.ID, + &ledger.OrderID, + &syncRunID, + &ledger.Provider, + &ledger.FetchedAt, + &ledger.LedgerState, + &ledger.LedgerVersion, + &ledger.LedgerJSON, + &ledger.TotalCharged, + &ledger.ChargeCount, + &ledger.PaymentMethodTypes, + &ledger.HasRefunds, + &ledger.IsValid, + &validationNotes, + ) + if err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err + } + + if syncRunID.Valid { + ledger.SyncRunID = syncRunID.Int64 + } + if validationNotes.Valid { + ledger.ValidationNotes = validationNotes.String + } + + // Load charges + charges, err := s.getChargesForLedger(ledger.ID) + if err != nil { + return nil, err + } + ledger.Charges = charges + + return ledger, nil +} + +// GetLedgerHistory retrieves all ledger snapshots for an order (newest first) +func (s *Storage) GetLedgerHistory(orderID string) ([]*OrderLedger, error) { + query := ` + SELECT id, order_id, sync_run_id, provider, fetched_at, ledger_state, + ledger_version, ledger_json, total_charged, charge_count, + payment_method_types, has_refunds, is_valid, validation_notes + FROM order_ledgers + WHERE order_id = ? + ORDER BY ledger_version DESC + ` + + rows, err := s.db.Query(query, orderID) + if err != nil { + return nil, err + } + defer func() { _ = rows.Close() }() + + var ledgers []*OrderLedger + for rows.Next() { + ledger := &OrderLedger{} + var syncRunID sql.NullInt64 + var validationNotes sql.NullString + + err := rows.Scan( + &ledger.ID, + &ledger.OrderID, + &syncRunID, + &ledger.Provider, + &ledger.FetchedAt, + &ledger.LedgerState, + &ledger.LedgerVersion, + &ledger.LedgerJSON, + &ledger.TotalCharged, + &ledger.ChargeCount, + &ledger.PaymentMethodTypes, + &ledger.HasRefunds, + &ledger.IsValid, + &validationNotes, + ) + if err != nil { + return nil, err + } + + if syncRunID.Valid { + ledger.SyncRunID = syncRunID.Int64 + } + if validationNotes.Valid { + ledger.ValidationNotes = validationNotes.String + } + + // Load charges for each ledger + charges, err := s.getChargesForLedger(ledger.ID) + if err != nil { + return nil, err + } + ledger.Charges = charges + + ledgers = append(ledgers, ledger) + } + + return ledgers, rows.Err() +} + +// GetLedgerByID retrieves a specific ledger by ID +func (s *Storage) GetLedgerByID(id int64) (*OrderLedger, error) { + query := ` + SELECT id, order_id, sync_run_id, provider, fetched_at, ledger_state, + ledger_version, ledger_json, total_charged, charge_count, + payment_method_types, has_refunds, is_valid, validation_notes + FROM order_ledgers + WHERE id = ? + ` + + ledger := &OrderLedger{} + var syncRunID sql.NullInt64 + var validationNotes sql.NullString + + err := s.db.QueryRow(query, id).Scan( + &ledger.ID, + &ledger.OrderID, + &syncRunID, + &ledger.Provider, + &ledger.FetchedAt, + &ledger.LedgerState, + &ledger.LedgerVersion, + &ledger.LedgerJSON, + &ledger.TotalCharged, + &ledger.ChargeCount, + &ledger.PaymentMethodTypes, + &ledger.HasRefunds, + &ledger.IsValid, + &validationNotes, + ) + if err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err + } + + if syncRunID.Valid { + ledger.SyncRunID = syncRunID.Int64 + } + if validationNotes.Valid { + ledger.ValidationNotes = validationNotes.String + } + + // Load charges + charges, err := s.getChargesForLedger(ledger.ID) + if err != nil { + return nil, err + } + ledger.Charges = charges + + return ledger, nil +} + +// ListLedgers returns ledgers matching the given filters with pagination +func (s *Storage) ListLedgers(filters LedgerFilters) (*LedgerListResult, error) { + // Set defaults + if filters.Limit <= 0 { + filters.Limit = 50 + } + if filters.Limit > 500 { + filters.Limit = 500 + } + + // Build WHERE clause + where := "WHERE 1=1" + args := []interface{}{} + + if filters.OrderID != "" { + where += " AND order_id = ?" + args = append(args, filters.OrderID) + } + if filters.Provider != "" { + where += " AND provider = ?" + args = append(args, filters.Provider) + } + if filters.State != "" { + where += " AND ledger_state = ?" + args = append(args, filters.State) + } + + // Get total count + countQuery := fmt.Sprintf("SELECT COUNT(*) FROM order_ledgers %s", where) + var totalCount int + if err := s.db.QueryRow(countQuery, args...).Scan(&totalCount); err != nil { + return nil, err + } + + // Get paginated results + query := fmt.Sprintf(` + SELECT id, order_id, sync_run_id, provider, fetched_at, ledger_state, + ledger_version, ledger_json, total_charged, charge_count, + payment_method_types, has_refunds, is_valid, validation_notes + FROM order_ledgers + %s + ORDER BY fetched_at DESC + LIMIT ? OFFSET ? + `, where) + + args = append(args, filters.Limit, filters.Offset) + + rows, err := s.db.Query(query, args...) + if err != nil { + return nil, err + } + defer func() { _ = rows.Close() }() + + var ledgers []*OrderLedger + for rows.Next() { + ledger := &OrderLedger{} + var syncRunID sql.NullInt64 + var validationNotes sql.NullString + + err := rows.Scan( + &ledger.ID, + &ledger.OrderID, + &syncRunID, + &ledger.Provider, + &ledger.FetchedAt, + &ledger.LedgerState, + &ledger.LedgerVersion, + &ledger.LedgerJSON, + &ledger.TotalCharged, + &ledger.ChargeCount, + &ledger.PaymentMethodTypes, + &ledger.HasRefunds, + &ledger.IsValid, + &validationNotes, + ) + if err != nil { + return nil, err + } + + if syncRunID.Valid { + ledger.SyncRunID = syncRunID.Int64 + } + if validationNotes.Valid { + ledger.ValidationNotes = validationNotes.String + } + + ledgers = append(ledgers, ledger) + } + + if err := rows.Err(); err != nil { + return nil, err + } + + return &LedgerListResult{ + Ledgers: ledgers, + TotalCount: totalCount, + Limit: filters.Limit, + Offset: filters.Offset, + }, nil +} + +// UpdateChargeMatch updates a ledger charge's match status +func (s *Storage) UpdateChargeMatch(chargeID int64, transactionID string, confidence float64, splitCount int) error { + query := ` + UPDATE ledger_charges + SET monarch_transaction_id = ?, + is_matched = 1, + match_confidence = ?, + matched_at = CURRENT_TIMESTAMP, + split_count = ? + WHERE id = ? + ` + + _, err := s.db.Exec(query, transactionID, confidence, splitCount, chargeID) + return err +} + +// GetUnmatchedCharges returns charges that haven't been matched to Monarch transactions +func (s *Storage) GetUnmatchedCharges(provider string, limit int) ([]LedgerCharge, error) { + if limit <= 0 { + limit = 100 + } + + var query string + var args []interface{} + + if provider != "" { + query = ` + SELECT c.id, c.order_ledger_id, c.order_id, c.sync_run_id, c.charge_sequence, + c.charge_amount, c.charge_type, c.payment_method, c.card_type, c.card_last_four, + c.monarch_transaction_id, c.is_matched, c.match_confidence, c.matched_at, c.split_count + FROM ledger_charges c + JOIN order_ledgers l ON c.order_ledger_id = l.id + WHERE c.is_matched = 0 + AND c.charge_type = 'payment' + AND l.provider = ? + ORDER BY l.fetched_at DESC + LIMIT ? + ` + args = []interface{}{provider, limit} + } else { + query = ` + SELECT c.id, c.order_ledger_id, c.order_id, c.sync_run_id, c.charge_sequence, + c.charge_amount, c.charge_type, c.payment_method, c.card_type, c.card_last_four, + c.monarch_transaction_id, c.is_matched, c.match_confidence, c.matched_at, c.split_count + FROM ledger_charges c + JOIN order_ledgers l ON c.order_ledger_id = l.id + WHERE c.is_matched = 0 + AND c.charge_type = 'payment' + ORDER BY l.fetched_at DESC + LIMIT ? + ` + args = []interface{}{limit} + } + + rows, err := s.db.Query(query, args...) + if err != nil { + return nil, err + } + defer func() { _ = rows.Close() }() + + var charges []LedgerCharge + for rows.Next() { + var charge LedgerCharge + var syncRunID sql.NullInt64 + var txID sql.NullString + var cardType, cardLastFour sql.NullString + var matchedAt sql.NullTime + + err := rows.Scan( + &charge.ID, + &charge.OrderLedgerID, + &charge.OrderID, + &syncRunID, + &charge.ChargeSequence, + &charge.ChargeAmount, + &charge.ChargeType, + &charge.PaymentMethod, + &cardType, + &cardLastFour, + &txID, + &charge.IsMatched, + &charge.MatchConfidence, + &matchedAt, + &charge.SplitCount, + ) + if err != nil { + return nil, err + } + + if syncRunID.Valid { + charge.SyncRunID = syncRunID.Int64 + } + if txID.Valid { + charge.MonarchTransactionID = txID.String + } + if cardType.Valid { + charge.CardType = cardType.String + } + if cardLastFour.Valid { + charge.CardLastFour = cardLastFour.String + } + if matchedAt.Valid { + charge.MatchedAt = matchedAt.Time + } + + charges = append(charges, charge) + } + + return charges, rows.Err() +} + +// getChargesForLedger retrieves all charges for a ledger +func (s *Storage) getChargesForLedger(ledgerID int64) ([]LedgerCharge, error) { + query := ` + SELECT id, order_ledger_id, order_id, sync_run_id, charge_sequence, + charge_amount, charge_type, payment_method, card_type, card_last_four, + monarch_transaction_id, is_matched, match_confidence, matched_at, split_count + FROM ledger_charges + WHERE order_ledger_id = ? + ORDER BY charge_sequence + ` + + rows, err := s.db.Query(query, ledgerID) + if err != nil { + return nil, err + } + defer func() { _ = rows.Close() }() + + var charges []LedgerCharge + for rows.Next() { + var charge LedgerCharge + var syncRunID sql.NullInt64 + var txID sql.NullString + var cardType, cardLastFour sql.NullString + var matchedAt sql.NullTime + + err := rows.Scan( + &charge.ID, + &charge.OrderLedgerID, + &charge.OrderID, + &syncRunID, + &charge.ChargeSequence, + &charge.ChargeAmount, + &charge.ChargeType, + &charge.PaymentMethod, + &cardType, + &cardLastFour, + &txID, + &charge.IsMatched, + &charge.MatchConfidence, + &matchedAt, + &charge.SplitCount, + ) + if err != nil { + return nil, err + } + + if syncRunID.Valid { + charge.SyncRunID = syncRunID.Int64 + } + if txID.Valid { + charge.MonarchTransactionID = txID.String + } + if cardType.Valid { + charge.CardType = cardType.String + } + if cardLastFour.Valid { + charge.CardLastFour = cardLastFour.String + } + if matchedAt.Valid { + charge.MatchedAt = matchedAt.Time + } + + charges = append(charges, charge) + } + + return charges, rows.Err() +} + +// Helper functions for nullable values +func nullInt64(v int64) interface{} { + if v == 0 { + return nil + } + return v +} + +func nullString(v string) interface{} { + if v == "" { + return nil + } + return v +} + +func nullTime(t time.Time) interface{} { + if t.IsZero() { + return nil + } + return t +} From e9b938e02c626557d960e8bd4f693df83a355a40 Mon Sep 17 00:00:00 2001 From: Erick Shaffer Date: Wed, 24 Dec 2025 09:39:52 -0700 Subject: [PATCH 09/10] feat: Add sync service for API-triggered sync jobs Enables starting, cancelling, and monitoring sync jobs via the REST API. Includes job tracking, progress reporting, and concurrent provider handling. --- internal/application/service/sync_service.go | 175 +++++++++++- .../application/service/sync_service_test.go | 262 ++++++++++++++++++ internal/application/sync/handlers/amazon.go | 28 ++ internal/cli/serve.go | 9 + 4 files changed, 473 insertions(+), 1 deletion(-) diff --git a/internal/application/service/sync_service.go b/internal/application/service/sync_service.go index c647e4f..57d5dd0 100644 --- a/internal/application/service/sync_service.go +++ b/internal/application/service/sync_service.go @@ -26,6 +26,18 @@ const ( StatusCancelled SyncStatus = "cancelled" ) +// Job staleness thresholds +const ( + // DefaultJobStaleThreshold is how long a job can go without progress updates + // before being considered stale. Jobs that don't update progress for this + // duration are assumed to be hung or crashed. + DefaultJobStaleThreshold = 30 * time.Minute + + // DefaultJobMaxDuration is the maximum time a job can run before being + // forcefully marked as failed. This prevents runaway jobs. + DefaultJobMaxDuration = 2 * time.Hour +) + // SyncRequest holds parameters for starting a sync. type SyncRequest struct { Provider string // "walmart", "costco", "amazon" @@ -79,6 +91,10 @@ type SyncService struct { // Provider-level locking (only one sync per provider at a time) providerLocks map[string]*sync.Mutex locksMutex sync.Mutex + + // Background cleanup + cleanupStop chan struct{} + cleanupDone chan struct{} } // NewSyncService creates a new sync service. @@ -364,7 +380,7 @@ func (s *SyncService) generateJobID(provider string) string { return fmt.Sprintf("%s-%d", provider, time.Now().UnixNano()) } -// CleanupOldJobs removes jobs older than the specified duration. +// CleanupOldJobs removes completed jobs older than the specified duration. func (s *SyncService) CleanupOldJobs(maxAge time.Duration) int { s.jobsMutex.Lock() defer s.jobsMutex.Unlock() @@ -388,3 +404,160 @@ func (s *SyncService) CleanupOldJobs(maxAge time.Duration) int { return removed } + +// MarkStaleJobsAsFailed finds jobs that appear to be stuck and marks them as failed. +// A job is considered stale if: +// 1. It has been running longer than maxDuration, OR +// 2. Its Progress.LastUpdate is older than staleThreshold +// +// This handles cases where: +// - The goroutine panicked and never updated the job status +// - The job is genuinely stuck (infinite loop, deadlock, etc.) +// - The server restarted and orphaned in-memory job state +func (s *SyncService) MarkStaleJobsAsFailed(staleThreshold, maxDuration time.Duration) int { + s.jobsMutex.Lock() + defer s.jobsMutex.Unlock() + + now := time.Now() + marked := 0 + + for id, job := range s.jobs { + // Only check running or pending jobs + if job.Status != StatusRunning && job.Status != StatusPending { + continue + } + + isStale := false + reason := "" + + // Check if job has exceeded max duration + if now.Sub(job.StartedAt) > maxDuration { + isStale = true + reason = fmt.Sprintf("exceeded max duration of %v (started %v ago)", maxDuration, now.Sub(job.StartedAt).Round(time.Second)) + } + + // Check if progress hasn't been updated recently + if !isStale && now.Sub(job.Progress.LastUpdate) > staleThreshold { + isStale = true + reason = fmt.Sprintf("no progress update for %v (threshold: %v)", now.Sub(job.Progress.LastUpdate).Round(time.Second), staleThreshold) + } + + if isStale { + // Cancel the context if it exists (in case goroutine is still running) + if job.cancelFunc != nil { + job.cancelFunc() + } + + // Mark as failed + job.Status = StatusFailed + job.CompletedAt = &now + job.Error = fmt.Errorf("job marked as stale: %s", reason) + job.Progress.CurrentPhase = "failed" + job.Progress.LastUpdate = now + + // Release the provider lock + s.releaseProviderLockUnsafe(job.Provider) + + s.logger.Warn("marked stale job as failed", + "job_id", id, + "provider", job.Provider, + "reason", reason, + "started_at", job.StartedAt, + "last_update", job.Progress.LastUpdate, + ) + + marked++ + } + } + + return marked +} + +// releaseProviderLockUnsafe releases a provider lock without acquiring locksMutex. +// MUST only be called while holding jobsMutex to avoid races. +func (s *SyncService) releaseProviderLockUnsafe(provider string) { + s.locksMutex.Lock() + defer s.locksMutex.Unlock() + + if lock, exists := s.providerLocks[provider]; exists { + // TryLock then Unlock ensures we don't panic if already unlocked + if lock.TryLock() { + lock.Unlock() + } else { + // Lock is held, so unlock it + lock.Unlock() + } + } +} + +// IsJobStale checks if a specific job is considered stale. +func (s *SyncService) IsJobStale(jobID string, staleThreshold, maxDuration time.Duration) bool { + s.jobsMutex.RLock() + defer s.jobsMutex.RUnlock() + + job, exists := s.jobs[jobID] + if !exists { + return false + } + + if job.Status != StatusRunning && job.Status != StatusPending { + return false + } + + now := time.Now() + return now.Sub(job.StartedAt) > maxDuration || now.Sub(job.Progress.LastUpdate) > staleThreshold +} + +// StartBackgroundCleanup starts a background goroutine that periodically: +// 1. Marks stale jobs as failed +// 2. Cleans up old completed jobs +// +// The cleanup runs every checkInterval. Call StopBackgroundCleanup to stop it. +func (s *SyncService) StartBackgroundCleanup(checkInterval time.Duration) { + s.cleanupStop = make(chan struct{}) + s.cleanupDone = make(chan struct{}) + + go func() { + defer close(s.cleanupDone) + + ticker := time.NewTicker(checkInterval) + defer ticker.Stop() + + s.logger.Info("background job cleanup started", + "check_interval", checkInterval, + "stale_threshold", DefaultJobStaleThreshold, + "max_duration", DefaultJobMaxDuration, + ) + + for { + select { + case <-s.cleanupStop: + s.logger.Info("background job cleanup stopped") + return + case <-ticker.C: + // Mark stale jobs as failed + staleMarked := s.MarkStaleJobsAsFailed(DefaultJobStaleThreshold, DefaultJobMaxDuration) + if staleMarked > 0 { + s.logger.Info("marked stale jobs as failed", "count", staleMarked) + } + + // Clean up old completed jobs (keep for 24 hours) + cleaned := s.CleanupOldJobs(24 * time.Hour) + if cleaned > 0 { + s.logger.Debug("cleaned up old jobs", "count", cleaned) + } + } + } + }() +} + +// StopBackgroundCleanup stops the background cleanup goroutine. +// This method blocks until the cleanup goroutine has fully stopped. +func (s *SyncService) StopBackgroundCleanup() { + if s.cleanupStop == nil { + return + } + + close(s.cleanupStop) + <-s.cleanupDone +} diff --git a/internal/application/service/sync_service_test.go b/internal/application/service/sync_service_test.go index d1f901e..a47192a 100644 --- a/internal/application/service/sync_service_test.go +++ b/internal/application/service/sync_service_test.go @@ -1,7 +1,11 @@ package service import ( + "context" + "log/slog" + "os" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -111,3 +115,261 @@ func TestSyncJob_Initial(t *testing.T) { assert.Nil(t, job.Result) assert.Nil(t, job.Error) } + +// Helper to create a test logger +func testLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) +} + +func TestSyncService_IsJobStale_NotFound(t *testing.T) { + svc := NewSyncService(nil, nil, nil, testLogger(), nil) + + isStale := svc.IsJobStale("non-existent", 30*time.Minute, 2*time.Hour) + + assert.False(t, isStale) +} + +func TestSyncService_IsJobStale_CompletedJobNotStale(t *testing.T) { + svc := NewSyncService(nil, nil, nil, testLogger(), nil) + + // Manually add a completed job + svc.jobsMutex.Lock() + svc.jobs["completed-job"] = &SyncJob{ + ID: "completed-job", + Provider: "walmart", + Status: StatusCompleted, + StartedAt: time.Now().Add(-3 * time.Hour), // Old but completed + Progress: SyncProgress{LastUpdate: time.Now().Add(-2 * time.Hour)}, + } + svc.jobsMutex.Unlock() + + // Completed jobs should never be considered stale + isStale := svc.IsJobStale("completed-job", 30*time.Minute, 2*time.Hour) + + assert.False(t, isStale) +} + +func TestSyncService_IsJobStale_RunningJob_StaleByProgress(t *testing.T) { + svc := NewSyncService(nil, nil, nil, testLogger(), nil) + + // Add a running job with old progress update + svc.jobsMutex.Lock() + svc.jobs["stale-job"] = &SyncJob{ + ID: "stale-job", + Provider: "walmart", + Status: StatusRunning, + StartedAt: time.Now().Add(-10 * time.Minute), // Started 10 min ago + Progress: SyncProgress{LastUpdate: time.Now().Add(-35 * time.Minute)}, // No update for 35 min + } + svc.jobsMutex.Unlock() + + // Should be stale because progress hasn't updated in 35 minutes (> 30 min threshold) + isStale := svc.IsJobStale("stale-job", 30*time.Minute, 2*time.Hour) + + assert.True(t, isStale) +} + +func TestSyncService_IsJobStale_RunningJob_StaleByDuration(t *testing.T) { + svc := NewSyncService(nil, nil, nil, testLogger(), nil) + + // Add a running job that's been running too long + svc.jobsMutex.Lock() + svc.jobs["long-job"] = &SyncJob{ + ID: "long-job", + Provider: "walmart", + Status: StatusRunning, + StartedAt: time.Now().Add(-3 * time.Hour), // Started 3 hours ago + Progress: SyncProgress{LastUpdate: time.Now()}, // Recent progress + } + svc.jobsMutex.Unlock() + + // Should be stale because it's been running longer than 2 hours max + isStale := svc.IsJobStale("long-job", 30*time.Minute, 2*time.Hour) + + assert.True(t, isStale) +} + +func TestSyncService_IsJobStale_RunningJob_NotStale(t *testing.T) { + svc := NewSyncService(nil, nil, nil, testLogger(), nil) + + // Add a healthy running job + svc.jobsMutex.Lock() + svc.jobs["healthy-job"] = &SyncJob{ + ID: "healthy-job", + Provider: "walmart", + Status: StatusRunning, + StartedAt: time.Now().Add(-10 * time.Minute), // Started 10 min ago + Progress: SyncProgress{LastUpdate: time.Now().Add(-5 * time.Minute)}, // Updated 5 min ago + } + svc.jobsMutex.Unlock() + + // Should NOT be stale - running for short time, recently updated + isStale := svc.IsJobStale("healthy-job", 30*time.Minute, 2*time.Hour) + + assert.False(t, isStale) +} + +func TestSyncService_MarkStaleJobsAsFailed_MarksStaleJobs(t *testing.T) { + svc := NewSyncService(nil, nil, nil, testLogger(), nil) + + // Add a stale job + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + svc.jobsMutex.Lock() + svc.jobs["stale-job"] = &SyncJob{ + ID: "stale-job", + Provider: "walmart", + Status: StatusRunning, + StartedAt: time.Now().Add(-3 * time.Hour), + Progress: SyncProgress{LastUpdate: time.Now().Add(-35 * time.Minute)}, + cancelFunc: cancel, + } + svc.jobsMutex.Unlock() + + // Mark stale jobs + marked := svc.MarkStaleJobsAsFailed(30*time.Minute, 2*time.Hour) + + assert.Equal(t, 1, marked) + + // Verify job was marked as failed + job, err := svc.GetSyncJob("stale-job") + assert.NoError(t, err) + assert.Equal(t, StatusFailed, job.Status) + assert.NotNil(t, job.CompletedAt) + assert.NotNil(t, job.Error) + assert.Contains(t, job.Error.Error(), "stale") + + // Verify context was cancelled + select { + case <-ctx.Done(): + // Expected + default: + t.Error("context should have been cancelled") + } +} + +func TestSyncService_MarkStaleJobsAsFailed_SkipsHealthyJobs(t *testing.T) { + svc := NewSyncService(nil, nil, nil, testLogger(), nil) + + // Add a healthy job + svc.jobsMutex.Lock() + svc.jobs["healthy-job"] = &SyncJob{ + ID: "healthy-job", + Provider: "walmart", + Status: StatusRunning, + StartedAt: time.Now().Add(-10 * time.Minute), + Progress: SyncProgress{LastUpdate: time.Now().Add(-5 * time.Minute)}, + } + svc.jobsMutex.Unlock() + + // Try to mark stale jobs + marked := svc.MarkStaleJobsAsFailed(30*time.Minute, 2*time.Hour) + + assert.Equal(t, 0, marked) + + // Verify job is still running + job, err := svc.GetSyncJob("healthy-job") + assert.NoError(t, err) + assert.Equal(t, StatusRunning, job.Status) +} + +func TestSyncService_MarkStaleJobsAsFailed_SkipsCompletedJobs(t *testing.T) { + svc := NewSyncService(nil, nil, nil, testLogger(), nil) + + // Add a completed job that would appear "stale" if we checked it + completedTime := time.Now().Add(-1 * time.Hour) + svc.jobsMutex.Lock() + svc.jobs["completed-job"] = &SyncJob{ + ID: "completed-job", + Provider: "walmart", + Status: StatusCompleted, + StartedAt: time.Now().Add(-3 * time.Hour), + CompletedAt: &completedTime, + Progress: SyncProgress{LastUpdate: completedTime}, + } + svc.jobsMutex.Unlock() + + // Try to mark stale jobs + marked := svc.MarkStaleJobsAsFailed(30*time.Minute, 2*time.Hour) + + assert.Equal(t, 0, marked) + + // Verify job is still completed (not changed to failed) + job, err := svc.GetSyncJob("completed-job") + assert.NoError(t, err) + assert.Equal(t, StatusCompleted, job.Status) +} + +func TestSyncService_CleanupOldJobs_RemovesOldCompletedJobs(t *testing.T) { + svc := NewSyncService(nil, nil, nil, testLogger(), nil) + + // Add an old completed job + oldTime := time.Now().Add(-25 * time.Hour) + svc.jobsMutex.Lock() + svc.jobs["old-job"] = &SyncJob{ + ID: "old-job", + Provider: "walmart", + Status: StatusCompleted, + CompletedAt: &oldTime, + } + svc.jobsMutex.Unlock() + + // Cleanup jobs older than 24 hours + removed := svc.CleanupOldJobs(24 * time.Hour) + + assert.Equal(t, 1, removed) + + // Verify job was removed + _, err := svc.GetSyncJob("old-job") + assert.Error(t, err) + assert.Contains(t, err.Error(), "not found") +} + +func TestSyncService_CleanupOldJobs_KeepsRecentCompletedJobs(t *testing.T) { + svc := NewSyncService(nil, nil, nil, testLogger(), nil) + + // Add a recently completed job + recentTime := time.Now().Add(-1 * time.Hour) + svc.jobsMutex.Lock() + svc.jobs["recent-job"] = &SyncJob{ + ID: "recent-job", + Provider: "walmart", + Status: StatusCompleted, + CompletedAt: &recentTime, + } + svc.jobsMutex.Unlock() + + // Cleanup jobs older than 24 hours + removed := svc.CleanupOldJobs(24 * time.Hour) + + assert.Equal(t, 0, removed) + + // Verify job still exists + _, err := svc.GetSyncJob("recent-job") + assert.NoError(t, err) +} + +func TestSyncService_CleanupOldJobs_KeepsRunningJobs(t *testing.T) { + svc := NewSyncService(nil, nil, nil, testLogger(), nil) + + // Add an old running job (shouldn't be removed by cleanup) + svc.jobsMutex.Lock() + svc.jobs["running-job"] = &SyncJob{ + ID: "running-job", + Provider: "walmart", + Status: StatusRunning, + StartedAt: time.Now().Add(-25 * time.Hour), + } + svc.jobsMutex.Unlock() + + // Cleanup old jobs + removed := svc.CleanupOldJobs(24 * time.Hour) + + // Running jobs should NOT be removed + assert.Equal(t, 0, removed) + + // Verify job still exists + _, err := svc.GetSyncJob("running-job") + assert.NoError(t, err) +} diff --git a/internal/application/sync/handlers/amazon.go b/internal/application/sync/handlers/amazon.go index 0d0a28a..c2bd9c9 100644 --- a/internal/application/sync/handlers/amazon.go +++ b/internal/application/sync/handlers/amazon.go @@ -47,6 +47,34 @@ type MonarchClient interface { UpdateSplits(ctx context.Context, id string, splits []*monarch.TransactionSplit) error } +// LedgerData represents ledger data that can be saved +type LedgerData struct { + OrderID string + Provider string + RawJSON string + PaymentMethods []PaymentMethodData + TotalCharged float64 + ChargeCount int + PaymentMethodTypes string + HasRefunds bool + IsValid bool + ValidationNotes string +} + +// PaymentMethodData represents a single payment method's charges +type PaymentMethodData struct { + PaymentType string + CardType string + CardLastFour string + FinalCharges []float64 + TotalCharged float64 +} + +// LedgerStorage provides access to ledger persistence +type LedgerStorage interface { + SaveLedger(ledger *LedgerData, syncRunID int64) error +} + // ProcessResult holds the result of processing an order type ProcessResult struct { Processed bool diff --git a/internal/cli/serve.go b/internal/cli/serve.go index f2758ef..2f0b70b 100644 --- a/internal/cli/serve.go +++ b/internal/cli/serve.go @@ -71,6 +71,10 @@ func RunServe(cfg *config.Config, flags *ServeFlags) error { // Create sync service syncService = service.NewSyncService(cfg, serviceClients, store, logger, providerFactory) + + // Start background cleanup for stale jobs (checks every 5 minutes) + syncService.StartBackgroundCleanup(5 * time.Minute) + logger.Info("sync service initialized", "providers", []string{"walmart", "costco", "amazon"}) } @@ -92,6 +96,11 @@ func RunServe(cfg *config.Config, flags *ServeFlags) error { <-quit logger.Info("received shutdown signal") + // Stop background cleanup if sync service is running + if syncService != nil { + syncService.StopBackgroundCleanup() + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() From d506e308faf95187614d82249f9fbd61d57710ba Mon Sep 17 00:00:00 2001 From: Erick Shaffer Date: Thu, 1 Jan 2026 20:14:35 -0700 Subject: [PATCH 10/10] feat: Add pending status for Amazon orders awaiting shipment Orders that haven't shipped yet (no bank charges) are now tracked as "pending" instead of "failed". This provides clearer status reporting and ensures these orders are retried on subsequent syncs. Changes: - Add ErrPaymentPending sentinel error for orders not yet charged - Amazon handler detects pending orders and returns skip with reason - Orchestrator records pending status (not error) and logs at INFO level - UI already supports pending status with amber badge; added filter option The pending status is recorded in the database but doesn't block retries since IsProcessed() only checks for status='success'. --- internal/adapters/providers/amazon/order.go | 16 ++++++- .../adapters/providers/amazon/order_test.go | 36 +++++++++++++- .../providers/amazon/provider_test.go | 8 ++-- internal/application/sync/handlers/amazon.go | 47 +++++++++++++++++-- internal/application/sync/orchestrator.go | 32 ++++++++++++- internal/application/sync/recording.go | 24 ++++++++++ web/src/app/(app)/orders/page.tsx | 1 + 7 files changed, 150 insertions(+), 14 deletions(-) diff --git a/internal/adapters/providers/amazon/order.go b/internal/adapters/providers/amazon/order.go index 6f01e63..e19542d 100644 --- a/internal/adapters/providers/amazon/order.go +++ b/internal/adapters/providers/amazon/order.go @@ -1,6 +1,7 @@ package amazon import ( + "errors" "fmt" "log/slog" "time" @@ -8,6 +9,9 @@ import ( "github.com/eshaffer321/monarchmoney-sync-backend/internal/adapters/providers" ) +// ErrPaymentPending indicates an order has no bank charges yet because it hasn't shipped +var ErrPaymentPending = errors.New("payment pending: order has not been charged yet (awaiting shipment)") + // Order wraps a ParsedOrder to implement the providers.Order interface type Order struct { parsedOrder *ParsedOrder @@ -83,10 +87,12 @@ func (o *Order) GetRawData() interface{} { // Filters out non-bank transactions like gift cards, points, etc. func (o *Order) GetFinalCharges() ([]float64, error) { if len(o.parsedOrder.Transactions) == 0 { - return nil, fmt.Errorf("no transactions found for order") + // No transactions at all - order hasn't been charged yet (awaiting shipment) + return nil, ErrPaymentPending } var bankCharges []float64 + var hasNonBankPayments bool for _, tx := range o.parsedOrder.Transactions { // Skip refunds if tx.Type == "refund" { @@ -107,6 +113,7 @@ func (o *Order) GetFinalCharges() ([]float64, error) { // Real bank charges have Last4 populated (card ending digits) // Points, gift cards, etc. have empty Last4 if tx.Last4 == "" { + hasNonBankPayments = true if o.logger != nil { o.logger.Debug("Skipping non-bank transaction", "order_id", o.GetID(), @@ -128,7 +135,12 @@ func (o *Order) GetFinalCharges() ([]float64, error) { } if len(bankCharges) == 0 { - return nil, fmt.Errorf("no bank charges found (order may be paid with gift cards/points only)") + if hasNonBankPayments { + // Order was paid entirely with gift cards/points - no bank transaction to match + return nil, fmt.Errorf("no bank charges found (order paid entirely with gift cards/points)") + } + // No bank charges and no non-bank payments processed yet - still pending + return nil, ErrPaymentPending } return bankCharges, nil diff --git a/internal/adapters/providers/amazon/order_test.go b/internal/adapters/providers/amazon/order_test.go index d62ccb7..dda79d4 100644 --- a/internal/adapters/providers/amazon/order_test.go +++ b/internal/adapters/providers/amazon/order_test.go @@ -72,7 +72,41 @@ func TestOrder_GetFinalCharges_OnlyGiftCard(t *testing.T) { charges, err := order.GetFinalCharges() assert.Error(t, err, "Should return error when no bank charges found") assert.Nil(t, charges) - assert.Contains(t, err.Error(), "no bank charges found") + assert.Contains(t, err.Error(), "paid entirely with gift cards/points") +} + +func TestOrder_GetFinalCharges_NoTransactions_ReturnsPending(t *testing.T) { + // Test order with no transactions (not yet shipped/charged) + parsedOrder := &ParsedOrder{ + ID: "test-pending-order", + Date: time.Now(), + Total: 50.00, + Transactions: []*ParsedTransaction{}, // Empty - not charged yet + } + + order := NewOrder(parsedOrder, nil) + + charges, err := order.GetFinalCharges() + assert.Error(t, err, "Should return error when no transactions") + assert.Nil(t, charges) + assert.ErrorIs(t, err, ErrPaymentPending, "Should return ErrPaymentPending for orders not yet charged") +} + +func TestOrder_GetFinalCharges_NilTransactions_ReturnsPending(t *testing.T) { + // Test order with nil transactions slice + parsedOrder := &ParsedOrder{ + ID: "test-pending-order-nil", + Date: time.Now(), + Total: 50.00, + Transactions: nil, // Nil - not charged yet + } + + order := NewOrder(parsedOrder, nil) + + charges, err := order.GetFinalCharges() + assert.Error(t, err, "Should return error when no transactions") + assert.Nil(t, charges) + assert.ErrorIs(t, err, ErrPaymentPending, "Should return ErrPaymentPending for orders not yet charged") } func TestOrder_GetFinalCharges_SkipsRefunds(t *testing.T) { diff --git a/internal/adapters/providers/amazon/provider_test.go b/internal/adapters/providers/amazon/provider_test.go index df1db06..dc8ed9a 100644 --- a/internal/adapters/providers/amazon/provider_test.go +++ b/internal/adapters/providers/amazon/provider_test.go @@ -188,7 +188,7 @@ func TestNewProvider_NilLogger(t *testing.T) { assert.NotNil(t, provider.logger) } -// TestOrder_GetFinalCharges_NoTransactions tests GetFinalCharges returns error without transactions +// TestOrder_GetFinalCharges_NoTransactions tests GetFinalCharges returns ErrPaymentPending without transactions func TestOrder_GetFinalCharges_NoTransactions(t *testing.T) { parsedOrder := &ParsedOrder{ ID: "114-0000000-0000000", @@ -201,10 +201,10 @@ func TestOrder_GetFinalCharges_NoTransactions(t *testing.T) { assert.Error(t, err) assert.Nil(t, charges) - assert.Contains(t, err.Error(), "no transactions found") + assert.ErrorIs(t, err, ErrPaymentPending, "Should return ErrPaymentPending for orders without transactions") } -// TestOrder_IsMultiDelivery_NoTransactions tests IsMultiDelivery returns error without transactions +// TestOrder_IsMultiDelivery_NoTransactions tests IsMultiDelivery returns ErrPaymentPending without transactions func TestOrder_IsMultiDelivery_NoTransactions(t *testing.T) { parsedOrder := &ParsedOrder{ ID: "114-0000000-0000000", @@ -217,7 +217,7 @@ func TestOrder_IsMultiDelivery_NoTransactions(t *testing.T) { assert.Error(t, err) assert.False(t, isMulti) - assert.Contains(t, err.Error(), "no transactions found") + assert.ErrorIs(t, err, ErrPaymentPending, "Should return ErrPaymentPending for orders without transactions") } // TestCalculateLookbackDays tests lookback days calculation diff --git a/internal/application/sync/handlers/amazon.go b/internal/application/sync/handlers/amazon.go index c2bd9c9..32276af 100644 --- a/internal/application/sync/handlers/amazon.go +++ b/internal/application/sync/handlers/amazon.go @@ -3,9 +3,11 @@ package handlers import ( "context" + "errors" "fmt" "log/slog" "math" + "time" "github.com/eshaffer321/monarchmoney-go/pkg/monarch" "github.com/eshaffer321/monarchmoney-sync-backend/internal/adapters/providers" @@ -41,6 +43,30 @@ type CategorySplitter interface { GetSingleCategoryInfo(ctx context.Context, order providers.Order, categories []categorizer.Category) (string, string, error) } +// CategorySplitterWithDetails extends CategorySplitter with split details +type CategorySplitterWithDetails interface { + CategorySplitter + GetSplitDetails() []SplitDetail +} + +// SplitDetail represents detailed information about a split including items +type SplitDetail struct { + CategoryID string `json:"category_id"` + CategoryName string `json:"category_name"` + Amount float64 `json:"amount"` + Items []SplitDetailItem `json:"items"` + Notes string `json:"notes,omitempty"` +} + +// SplitDetailItem represents an item within a split +type SplitDetailItem struct { + Name string `json:"name"` + Quantity float64 `json:"quantity"` + UnitPrice float64 `json:"unit_price"` + TotalPrice float64 `json:"total_price"` + Category string `json:"category,omitempty"` +} + // MonarchClient provides access to Monarch Money API type MonarchClient interface { UpdateTransaction(ctx context.Context, id string, params *monarch.UpdateTransactionParams) error @@ -67,6 +93,7 @@ type PaymentMethodData struct { CardType string CardLastFour string FinalCharges []float64 + ChargedDates []time.Time // Date/time of each charge (parallel to FinalCharges) TotalCharged float64 } @@ -77,11 +104,12 @@ type LedgerStorage interface { // ProcessResult holds the result of processing an order type ProcessResult struct { - Processed bool - Skipped bool - SkipReason string - Allocations *allocator.Result - Splits []*monarch.TransactionSplit + Processed bool + Skipped bool + SkipReason string + Allocations *allocator.Result + Splits []*monarch.TransactionSplit + SplitDetails []SplitDetail // Detailed split info including items (only populated after successful Monarch API call) } // AmazonHandler processes Amazon orders with pro-rata allocation @@ -125,6 +153,15 @@ func (h *AmazonHandler) ProcessOrder( // Step 1: Get bank charges bankCharges, err := order.GetFinalCharges() if err != nil { + // Check if this is a pending payment (order not yet shipped/charged) + if errors.Is(err, amazonprovider.ErrPaymentPending) { + h.logInfo("Order payment pending (not yet shipped)", + "order_id", order.GetID(), + "order_total", order.GetTotal()) + result.Skipped = true + result.SkipReason = "payment pending" + return result, nil + } return nil, fmt.Errorf("failed to get bank charges: %w", err) } diff --git a/internal/application/sync/orchestrator.go b/internal/application/sync/orchestrator.go index bc9fc87..bcc3a4d 100644 --- a/internal/application/sync/orchestrator.go +++ b/internal/application/sync/orchestrator.go @@ -19,15 +19,18 @@ func (o *Orchestrator) handleResult(order providers.Order, result *handlers.Proc return false, false, err } if result.Skipped { - o.logger.Warn("Order skipped", "order_id", order.GetID(), "reason", result.SkipReason) // Don't treat "payment pending" as an error - it's expected for new orders if result.SkipReason == "payment pending" { + o.logger.Info("Order pending (awaiting shipment/charge)", "order_id", order.GetID()) + o.recordPending(order, result.SkipReason) return false, true, nil } // Don't treat "already has splits" as an error - just skip silently if result.SkipReason == "transaction already has splits" { + o.logger.Debug("Order skipped (already has splits)", "order_id", order.GetID()) return false, true, nil } + o.logger.Warn("Order skipped", "order_id", order.GetID(), "reason", result.SkipReason) o.recordError(order, result.SkipReason) return false, false, fmt.Errorf("skipped: %s", result.SkipReason) } @@ -87,6 +90,13 @@ func (o *Orchestrator) processOrder( return false, true, nil } +// reportProgress calls the progress callback if set +func (o *Orchestrator) reportProgress(opts Options, update ProgressUpdate) { + if opts.ProgressCallback != nil { + opts.ProgressCallback(update) + } +} + // Run executes the sync process for the configured provider func (o *Orchestrator) Run(ctx context.Context, opts Options) (*Result, error) { result := &Result{ @@ -101,6 +111,9 @@ func (o *Orchestrator) Run(ctx context.Context, opts Options) (*Result, error) { "force", opts.Force, ) + // Report initial progress + o.reportProgress(opts, ProgressUpdate{Phase: "fetching_orders"}) + // 1. Fetch orders from provider orders, err := o.fetchOrders(ctx, opts) if err != nil { @@ -138,6 +151,13 @@ func (o *Orchestrator) Run(ctx context.Context, opts Options) (*Result, error) { } } + // Report progress: starting order processing + totalOrders := len(orders) + o.reportProgress(opts, ProgressUpdate{ + Phase: "processing_orders", + TotalOrders: totalOrders, + }) + // 5. Process orders usedTransactionIDs := make(map[string]bool) @@ -160,7 +180,6 @@ func (o *Orchestrator) Run(ctx context.Context, opts Options) (*Result, error) { order.GetDate().Format("2006-01-02"), order.GetTotal(), err)) - continue } if processed { @@ -169,6 +188,15 @@ func (o *Orchestrator) Run(ctx context.Context, opts Options) (*Result, error) { if skipped { result.SkippedCount++ } + + // Report progress after each order + o.reportProgress(opts, ProgressUpdate{ + Phase: "processing_orders", + TotalOrders: totalOrders, + ProcessedOrders: result.ProcessedCount, + SkippedOrders: result.SkippedCount, + ErroredOrders: result.ErrorCount, + }) } // 6. Complete sync run diff --git a/internal/application/sync/recording.go b/internal/application/sync/recording.go index 2405532..ac3f792 100644 --- a/internal/application/sync/recording.go +++ b/internal/application/sync/recording.go @@ -70,6 +70,30 @@ func (o *Orchestrator) recordError(order providers.Order, errorMsg string) { } } +// recordPending records an order that is pending (not yet charged/shipped) +// This allows tracking without blocking retries on future syncs +func (o *Orchestrator) recordPending(order providers.Order, reason string) { + if o.storage != nil { + record := &storage.ProcessingRecord{ + OrderID: order.GetID(), + Provider: order.GetProviderName(), + OrderDate: order.GetDate(), + OrderTotal: order.GetTotal(), + OrderSubtotal: order.GetSubtotal(), + OrderTax: order.GetTax(), + OrderTip: order.GetTip(), + ItemCount: len(order.GetItems()), + ProcessedAt: time.Now(), + Status: "pending", + ErrorMessage: reason, + Items: convertOrderItems(order.GetItems()), + } + if err := o.storage.SaveRecord(record); err != nil { + o.logger.Error("Failed to save pending record", "order_id", order.GetID(), "error", err) + } + } +} + // recordSuccess records a successful processing to storage func (o *Orchestrator) recordSuccess(order providers.Order, transaction *monarch.Transaction, splits []*monarch.TransactionSplit, confidence float64, dryRun bool) { o.recordSuccessWithMultiDelivery(order, transaction, splits, confidence, dryRun, nil) diff --git a/web/src/app/(app)/orders/page.tsx b/web/src/app/(app)/orders/page.tsx index fd95998..584ce87 100644 --- a/web/src/app/(app)/orders/page.tsx +++ b/web/src/app/(app)/orders/page.tsx @@ -139,6 +139,7 @@ export default async function OrdersPage({ searchParams }: PageProps) { +