diff --git a/internal/adapters/providers/amazon/order.go b/internal/adapters/providers/amazon/order.go index 6f01e63..e19542d 100644 --- a/internal/adapters/providers/amazon/order.go +++ b/internal/adapters/providers/amazon/order.go @@ -1,6 +1,7 @@ package amazon import ( + "errors" "fmt" "log/slog" "time" @@ -8,6 +9,9 @@ import ( "github.com/eshaffer321/monarchmoney-sync-backend/internal/adapters/providers" ) +// ErrPaymentPending indicates an order has no bank charges yet because it hasn't shipped +var ErrPaymentPending = errors.New("payment pending: order has not been charged yet (awaiting shipment)") + // Order wraps a ParsedOrder to implement the providers.Order interface type Order struct { parsedOrder *ParsedOrder @@ -83,10 +87,12 @@ func (o *Order) GetRawData() interface{} { // Filters out non-bank transactions like gift cards, points, etc. func (o *Order) GetFinalCharges() ([]float64, error) { if len(o.parsedOrder.Transactions) == 0 { - return nil, fmt.Errorf("no transactions found for order") + // No transactions at all - order hasn't been charged yet (awaiting shipment) + return nil, ErrPaymentPending } var bankCharges []float64 + var hasNonBankPayments bool for _, tx := range o.parsedOrder.Transactions { // Skip refunds if tx.Type == "refund" { @@ -107,6 +113,7 @@ func (o *Order) GetFinalCharges() ([]float64, error) { // Real bank charges have Last4 populated (card ending digits) // Points, gift cards, etc. have empty Last4 if tx.Last4 == "" { + hasNonBankPayments = true if o.logger != nil { o.logger.Debug("Skipping non-bank transaction", "order_id", o.GetID(), @@ -128,7 +135,12 @@ func (o *Order) GetFinalCharges() ([]float64, error) { } if len(bankCharges) == 0 { - return nil, fmt.Errorf("no bank charges found (order may be paid with gift cards/points only)") + if hasNonBankPayments { + // Order was paid entirely with gift cards/points - no bank transaction to match + return nil, fmt.Errorf("no bank charges found (order paid entirely with gift cards/points)") + } + // No bank charges and no non-bank payments processed yet - still pending + return nil, ErrPaymentPending } return bankCharges, nil diff --git a/internal/adapters/providers/amazon/order_test.go b/internal/adapters/providers/amazon/order_test.go index d62ccb7..dda79d4 100644 --- a/internal/adapters/providers/amazon/order_test.go +++ b/internal/adapters/providers/amazon/order_test.go @@ -72,7 +72,41 @@ func TestOrder_GetFinalCharges_OnlyGiftCard(t *testing.T) { charges, err := order.GetFinalCharges() assert.Error(t, err, "Should return error when no bank charges found") assert.Nil(t, charges) - assert.Contains(t, err.Error(), "no bank charges found") + assert.Contains(t, err.Error(), "paid entirely with gift cards/points") +} + +func TestOrder_GetFinalCharges_NoTransactions_ReturnsPending(t *testing.T) { + // Test order with no transactions (not yet shipped/charged) + parsedOrder := &ParsedOrder{ + ID: "test-pending-order", + Date: time.Now(), + Total: 50.00, + Transactions: []*ParsedTransaction{}, // Empty - not charged yet + } + + order := NewOrder(parsedOrder, nil) + + charges, err := order.GetFinalCharges() + assert.Error(t, err, "Should return error when no transactions") + assert.Nil(t, charges) + assert.ErrorIs(t, err, ErrPaymentPending, "Should return ErrPaymentPending for orders not yet charged") +} + +func TestOrder_GetFinalCharges_NilTransactions_ReturnsPending(t *testing.T) { + // Test order with nil transactions slice + parsedOrder := &ParsedOrder{ + ID: "test-pending-order-nil", + Date: time.Now(), + Total: 50.00, + Transactions: nil, // Nil - not charged yet + } + + order := NewOrder(parsedOrder, nil) + + charges, err := order.GetFinalCharges() + assert.Error(t, err, "Should return error when no transactions") + assert.Nil(t, charges) + assert.ErrorIs(t, err, ErrPaymentPending, "Should return ErrPaymentPending for orders not yet charged") } func TestOrder_GetFinalCharges_SkipsRefunds(t *testing.T) { diff --git a/internal/adapters/providers/amazon/provider_test.go b/internal/adapters/providers/amazon/provider_test.go index df1db06..dc8ed9a 100644 --- a/internal/adapters/providers/amazon/provider_test.go +++ b/internal/adapters/providers/amazon/provider_test.go @@ -188,7 +188,7 @@ func TestNewProvider_NilLogger(t *testing.T) { assert.NotNil(t, provider.logger) } -// TestOrder_GetFinalCharges_NoTransactions tests GetFinalCharges returns error without transactions +// TestOrder_GetFinalCharges_NoTransactions tests GetFinalCharges returns ErrPaymentPending without transactions func TestOrder_GetFinalCharges_NoTransactions(t *testing.T) { parsedOrder := &ParsedOrder{ ID: "114-0000000-0000000", @@ -201,10 +201,10 @@ func TestOrder_GetFinalCharges_NoTransactions(t *testing.T) { assert.Error(t, err) assert.Nil(t, charges) - assert.Contains(t, err.Error(), "no transactions found") + assert.ErrorIs(t, err, ErrPaymentPending, "Should return ErrPaymentPending for orders without transactions") } -// TestOrder_IsMultiDelivery_NoTransactions tests IsMultiDelivery returns error without transactions +// TestOrder_IsMultiDelivery_NoTransactions tests IsMultiDelivery returns ErrPaymentPending without transactions func TestOrder_IsMultiDelivery_NoTransactions(t *testing.T) { parsedOrder := &ParsedOrder{ ID: "114-0000000-0000000", @@ -217,7 +217,7 @@ func TestOrder_IsMultiDelivery_NoTransactions(t *testing.T) { assert.Error(t, err) assert.False(t, isMulti) - assert.Contains(t, err.Error(), "no transactions found") + assert.ErrorIs(t, err, ErrPaymentPending, "Should return ErrPaymentPending for orders without transactions") } // TestCalculateLookbackDays tests lookback days calculation diff --git a/internal/adapters/providers/walmart/order.go b/internal/adapters/providers/walmart/order.go index 4fa71c0..b3a8c4d 100644 --- a/internal/adapters/providers/walmart/order.go +++ b/internal/adapters/providers/walmart/order.go @@ -274,3 +274,9 @@ func (o *Order) IsMultiDelivery() (bool, error) { } return len(charges) > 1, nil } + +// GetRawLedger returns the cached ledger data for persistence +// Returns nil if ledger hasn't been fetched yet +func (o *Order) GetRawLedger() *walmartclient.OrderLedger { + return o.ledgerCache +} diff --git a/internal/api/dto/responses.go b/internal/api/dto/responses.go index 08f811e..88be2ec 100644 --- a/internal/api/dto/responses.go +++ b/internal/api/dto/responses.go @@ -122,3 +122,44 @@ func NewHealthResponse() HealthResponse { Timestamp: time.Now().UTC().Format(time.RFC3339), } } + +// LedgerResponse represents a ledger snapshot in API responses. +type LedgerResponse struct { + ID int64 `json:"id"` + OrderID string `json:"order_id"` + SyncRunID int64 `json:"sync_run_id,omitempty"` + Provider string `json:"provider"` + FetchedAt string `json:"fetched_at"` + LedgerState string `json:"ledger_state"` + LedgerVersion int `json:"ledger_version"` + TotalCharged float64 `json:"total_charged"` + ChargeCount int `json:"charge_count"` + PaymentMethodTypes string `json:"payment_method_types"` + HasRefunds bool `json:"has_refunds"` + IsValid bool `json:"is_valid"` + ValidationNotes string `json:"validation_notes,omitempty"` + Charges []ChargeResponse `json:"charges,omitempty"` +} + +// ChargeResponse represents a single charge within a ledger. +type ChargeResponse struct { + ID int64 `json:"id"` + ChargeSequence int `json:"charge_sequence"` + ChargeAmount float64 `json:"charge_amount"` + ChargeType string `json:"charge_type"` + PaymentMethod string `json:"payment_method"` + CardType string `json:"card_type,omitempty"` + CardLastFour string `json:"card_last_four,omitempty"` + MonarchTransactionID string `json:"monarch_transaction_id,omitempty"` + IsMatched bool `json:"is_matched"` + MatchConfidence float64 `json:"match_confidence,omitempty"` + SplitCount int `json:"split_count,omitempty"` +} + +// LedgerListResponse is returned when listing ledgers. +type LedgerListResponse struct { + Ledgers []LedgerResponse `json:"ledgers"` + TotalCount int `json:"total_count"` + Limit int `json:"limit"` + Offset int `json:"offset"` +} diff --git a/internal/api/handlers/ledgers.go b/internal/api/handlers/ledgers.go new file mode 100644 index 0000000..a3410c3 --- /dev/null +++ b/internal/api/handlers/ledgers.go @@ -0,0 +1,175 @@ +package handlers + +import ( + "net/http" + "strconv" + + "github.com/go-chi/chi/v5" + + "github.com/eshaffer321/monarchmoney-sync-backend/internal/api/dto" + "github.com/eshaffer321/monarchmoney-sync-backend/internal/infrastructure/storage" +) + +// LedgersHandler handles ledger-related HTTP requests. +type LedgersHandler struct { + *Base +} + +// NewLedgersHandler creates a new ledgers handler. +func NewLedgersHandler(repo storage.Repository) *LedgersHandler { + return &LedgersHandler{ + Base: NewBase(repo), + } +} + +// List handles GET /api/ledgers - returns paginated list of ledgers. +func (h *LedgersHandler) List(w http.ResponseWriter, r *http.Request) { + filters := storage.LedgerFilters{ + OrderID: r.URL.Query().Get("order_id"), + Provider: r.URL.Query().Get("provider"), + Limit: ParseIntParam(r, "limit", 50), + Offset: ParseIntParam(r, "offset", 0), + } + + // Parse state filter + if state := r.URL.Query().Get("state"); state != "" { + filters.State = storage.LedgerState(state) + } + + result, err := h.repo.ListLedgers(filters) + if err != nil { + h.WriteError(w, http.StatusInternalServerError, dto.InternalError()) + return + } + + response := dto.LedgerListResponse{ + Ledgers: make([]dto.LedgerResponse, 0, len(result.Ledgers)), + TotalCount: result.TotalCount, + Limit: result.Limit, + Offset: result.Offset, + } + + for _, ledger := range result.Ledgers { + response.Ledgers = append(response.Ledgers, toLedgerResponse(ledger)) + } + + h.WriteJSON(w, http.StatusOK, response) +} + +// Get handles GET /api/ledgers/{id} - returns a single ledger by ID. +func (h *LedgersHandler) Get(w http.ResponseWriter, r *http.Request) { + idStr := chi.URLParam(r, "id") + if idStr == "" { + h.WriteError(w, http.StatusBadRequest, dto.BadRequestError("ledger ID is required")) + return + } + + id, err := strconv.ParseInt(idStr, 10, 64) + if err != nil { + h.WriteError(w, http.StatusBadRequest, dto.BadRequestError("invalid ledger ID")) + return + } + + ledger, err := h.repo.GetLedgerByID(id) + if err != nil { + h.WriteError(w, http.StatusInternalServerError, dto.InternalError()) + return + } + + if ledger == nil { + h.WriteError(w, http.StatusNotFound, dto.NotFoundError("ledger")) + return + } + + response := toLedgerResponse(ledger) + h.WriteJSON(w, http.StatusOK, response) +} + +// GetByOrderID handles GET /api/orders/{orderID}/ledger - returns the latest ledger for an order. +func (h *LedgersHandler) GetByOrderID(w http.ResponseWriter, r *http.Request) { + orderID := chi.URLParam(r, "orderID") + if orderID == "" { + h.WriteError(w, http.StatusBadRequest, dto.BadRequestError("order ID is required")) + return + } + + ledger, err := h.repo.GetLatestLedger(orderID) + if err != nil { + h.WriteError(w, http.StatusInternalServerError, dto.InternalError()) + return + } + + if ledger == nil { + h.WriteError(w, http.StatusNotFound, dto.NotFoundError("ledger")) + return + } + + response := toLedgerResponse(ledger) + h.WriteJSON(w, http.StatusOK, response) +} + +// GetHistoryByOrderID handles GET /api/orders/{orderID}/ledgers - returns all ledgers for an order. +func (h *LedgersHandler) GetHistoryByOrderID(w http.ResponseWriter, r *http.Request) { + orderID := chi.URLParam(r, "orderID") + if orderID == "" { + h.WriteError(w, http.StatusBadRequest, dto.BadRequestError("order ID is required")) + return + } + + ledgers, err := h.repo.GetLedgerHistory(orderID) + if err != nil { + h.WriteError(w, http.StatusInternalServerError, dto.InternalError()) + return + } + + response := dto.LedgerListResponse{ + Ledgers: make([]dto.LedgerResponse, 0, len(ledgers)), + TotalCount: len(ledgers), + Limit: len(ledgers), + Offset: 0, + } + + for _, ledger := range ledgers { + response.Ledgers = append(response.Ledgers, toLedgerResponse(ledger)) + } + + h.WriteJSON(w, http.StatusOK, response) +} + +// toLedgerResponse converts a storage OrderLedger to an API response. +func toLedgerResponse(ledger *storage.OrderLedger) dto.LedgerResponse { + response := dto.LedgerResponse{ + ID: ledger.ID, + OrderID: ledger.OrderID, + SyncRunID: ledger.SyncRunID, + Provider: ledger.Provider, + FetchedAt: ledger.FetchedAt.Format("2006-01-02T15:04:05Z"), + LedgerState: string(ledger.LedgerState), + LedgerVersion: ledger.LedgerVersion, + TotalCharged: ledger.TotalCharged, + ChargeCount: ledger.ChargeCount, + PaymentMethodTypes: ledger.PaymentMethodTypes, + HasRefunds: ledger.HasRefunds, + IsValid: ledger.IsValid, + ValidationNotes: ledger.ValidationNotes, + Charges: make([]dto.ChargeResponse, 0, len(ledger.Charges)), + } + + for _, charge := range ledger.Charges { + response.Charges = append(response.Charges, dto.ChargeResponse{ + ID: charge.ID, + ChargeSequence: charge.ChargeSequence, + ChargeAmount: charge.ChargeAmount, + ChargeType: charge.ChargeType, + PaymentMethod: charge.PaymentMethod, + CardType: charge.CardType, + CardLastFour: charge.CardLastFour, + MonarchTransactionID: charge.MonarchTransactionID, + IsMatched: charge.IsMatched, + MatchConfidence: charge.MatchConfidence, + SplitCount: charge.SplitCount, + }) + } + + return response +} diff --git a/internal/api/server.go b/internal/api/server.go index f2b5f13..85c151d 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -100,6 +100,13 @@ func (s *Server) setupRoutes() { statsHandler := handlers.NewStatsHandler(s.repo) r.Get("/stats", statsHandler.Get) + // Ledgers + ledgersHandler := handlers.NewLedgersHandler(s.repo) + r.Get("/ledgers", ledgersHandler.List) + r.Get("/ledgers/{id}", ledgersHandler.Get) + r.Get("/orders/{orderID}/ledger", ledgersHandler.GetByOrderID) + r.Get("/orders/{orderID}/ledgers", ledgersHandler.GetHistoryByOrderID) + // Sync operations (live sync jobs) if s.syncService != nil { syncHandler := handlers.NewSyncHandler(s.syncService) diff --git a/internal/application/service/sync_service.go b/internal/application/service/sync_service.go index c647e4f..57d5dd0 100644 --- a/internal/application/service/sync_service.go +++ b/internal/application/service/sync_service.go @@ -26,6 +26,18 @@ const ( StatusCancelled SyncStatus = "cancelled" ) +// Job staleness thresholds +const ( + // DefaultJobStaleThreshold is how long a job can go without progress updates + // before being considered stale. Jobs that don't update progress for this + // duration are assumed to be hung or crashed. + DefaultJobStaleThreshold = 30 * time.Minute + + // DefaultJobMaxDuration is the maximum time a job can run before being + // forcefully marked as failed. This prevents runaway jobs. + DefaultJobMaxDuration = 2 * time.Hour +) + // SyncRequest holds parameters for starting a sync. type SyncRequest struct { Provider string // "walmart", "costco", "amazon" @@ -79,6 +91,10 @@ type SyncService struct { // Provider-level locking (only one sync per provider at a time) providerLocks map[string]*sync.Mutex locksMutex sync.Mutex + + // Background cleanup + cleanupStop chan struct{} + cleanupDone chan struct{} } // NewSyncService creates a new sync service. @@ -364,7 +380,7 @@ func (s *SyncService) generateJobID(provider string) string { return fmt.Sprintf("%s-%d", provider, time.Now().UnixNano()) } -// CleanupOldJobs removes jobs older than the specified duration. +// CleanupOldJobs removes completed jobs older than the specified duration. func (s *SyncService) CleanupOldJobs(maxAge time.Duration) int { s.jobsMutex.Lock() defer s.jobsMutex.Unlock() @@ -388,3 +404,160 @@ func (s *SyncService) CleanupOldJobs(maxAge time.Duration) int { return removed } + +// MarkStaleJobsAsFailed finds jobs that appear to be stuck and marks them as failed. +// A job is considered stale if: +// 1. It has been running longer than maxDuration, OR +// 2. Its Progress.LastUpdate is older than staleThreshold +// +// This handles cases where: +// - The goroutine panicked and never updated the job status +// - The job is genuinely stuck (infinite loop, deadlock, etc.) +// - The server restarted and orphaned in-memory job state +func (s *SyncService) MarkStaleJobsAsFailed(staleThreshold, maxDuration time.Duration) int { + s.jobsMutex.Lock() + defer s.jobsMutex.Unlock() + + now := time.Now() + marked := 0 + + for id, job := range s.jobs { + // Only check running or pending jobs + if job.Status != StatusRunning && job.Status != StatusPending { + continue + } + + isStale := false + reason := "" + + // Check if job has exceeded max duration + if now.Sub(job.StartedAt) > maxDuration { + isStale = true + reason = fmt.Sprintf("exceeded max duration of %v (started %v ago)", maxDuration, now.Sub(job.StartedAt).Round(time.Second)) + } + + // Check if progress hasn't been updated recently + if !isStale && now.Sub(job.Progress.LastUpdate) > staleThreshold { + isStale = true + reason = fmt.Sprintf("no progress update for %v (threshold: %v)", now.Sub(job.Progress.LastUpdate).Round(time.Second), staleThreshold) + } + + if isStale { + // Cancel the context if it exists (in case goroutine is still running) + if job.cancelFunc != nil { + job.cancelFunc() + } + + // Mark as failed + job.Status = StatusFailed + job.CompletedAt = &now + job.Error = fmt.Errorf("job marked as stale: %s", reason) + job.Progress.CurrentPhase = "failed" + job.Progress.LastUpdate = now + + // Release the provider lock + s.releaseProviderLockUnsafe(job.Provider) + + s.logger.Warn("marked stale job as failed", + "job_id", id, + "provider", job.Provider, + "reason", reason, + "started_at", job.StartedAt, + "last_update", job.Progress.LastUpdate, + ) + + marked++ + } + } + + return marked +} + +// releaseProviderLockUnsafe releases a provider lock without acquiring locksMutex. +// MUST only be called while holding jobsMutex to avoid races. +func (s *SyncService) releaseProviderLockUnsafe(provider string) { + s.locksMutex.Lock() + defer s.locksMutex.Unlock() + + if lock, exists := s.providerLocks[provider]; exists { + // TryLock then Unlock ensures we don't panic if already unlocked + if lock.TryLock() { + lock.Unlock() + } else { + // Lock is held, so unlock it + lock.Unlock() + } + } +} + +// IsJobStale checks if a specific job is considered stale. +func (s *SyncService) IsJobStale(jobID string, staleThreshold, maxDuration time.Duration) bool { + s.jobsMutex.RLock() + defer s.jobsMutex.RUnlock() + + job, exists := s.jobs[jobID] + if !exists { + return false + } + + if job.Status != StatusRunning && job.Status != StatusPending { + return false + } + + now := time.Now() + return now.Sub(job.StartedAt) > maxDuration || now.Sub(job.Progress.LastUpdate) > staleThreshold +} + +// StartBackgroundCleanup starts a background goroutine that periodically: +// 1. Marks stale jobs as failed +// 2. Cleans up old completed jobs +// +// The cleanup runs every checkInterval. Call StopBackgroundCleanup to stop it. +func (s *SyncService) StartBackgroundCleanup(checkInterval time.Duration) { + s.cleanupStop = make(chan struct{}) + s.cleanupDone = make(chan struct{}) + + go func() { + defer close(s.cleanupDone) + + ticker := time.NewTicker(checkInterval) + defer ticker.Stop() + + s.logger.Info("background job cleanup started", + "check_interval", checkInterval, + "stale_threshold", DefaultJobStaleThreshold, + "max_duration", DefaultJobMaxDuration, + ) + + for { + select { + case <-s.cleanupStop: + s.logger.Info("background job cleanup stopped") + return + case <-ticker.C: + // Mark stale jobs as failed + staleMarked := s.MarkStaleJobsAsFailed(DefaultJobStaleThreshold, DefaultJobMaxDuration) + if staleMarked > 0 { + s.logger.Info("marked stale jobs as failed", "count", staleMarked) + } + + // Clean up old completed jobs (keep for 24 hours) + cleaned := s.CleanupOldJobs(24 * time.Hour) + if cleaned > 0 { + s.logger.Debug("cleaned up old jobs", "count", cleaned) + } + } + } + }() +} + +// StopBackgroundCleanup stops the background cleanup goroutine. +// This method blocks until the cleanup goroutine has fully stopped. +func (s *SyncService) StopBackgroundCleanup() { + if s.cleanupStop == nil { + return + } + + close(s.cleanupStop) + <-s.cleanupDone +} diff --git a/internal/application/service/sync_service_test.go b/internal/application/service/sync_service_test.go index d1f901e..a47192a 100644 --- a/internal/application/service/sync_service_test.go +++ b/internal/application/service/sync_service_test.go @@ -1,7 +1,11 @@ package service import ( + "context" + "log/slog" + "os" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -111,3 +115,261 @@ func TestSyncJob_Initial(t *testing.T) { assert.Nil(t, job.Result) assert.Nil(t, job.Error) } + +// Helper to create a test logger +func testLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) +} + +func TestSyncService_IsJobStale_NotFound(t *testing.T) { + svc := NewSyncService(nil, nil, nil, testLogger(), nil) + + isStale := svc.IsJobStale("non-existent", 30*time.Minute, 2*time.Hour) + + assert.False(t, isStale) +} + +func TestSyncService_IsJobStale_CompletedJobNotStale(t *testing.T) { + svc := NewSyncService(nil, nil, nil, testLogger(), nil) + + // Manually add a completed job + svc.jobsMutex.Lock() + svc.jobs["completed-job"] = &SyncJob{ + ID: "completed-job", + Provider: "walmart", + Status: StatusCompleted, + StartedAt: time.Now().Add(-3 * time.Hour), // Old but completed + Progress: SyncProgress{LastUpdate: time.Now().Add(-2 * time.Hour)}, + } + svc.jobsMutex.Unlock() + + // Completed jobs should never be considered stale + isStale := svc.IsJobStale("completed-job", 30*time.Minute, 2*time.Hour) + + assert.False(t, isStale) +} + +func TestSyncService_IsJobStale_RunningJob_StaleByProgress(t *testing.T) { + svc := NewSyncService(nil, nil, nil, testLogger(), nil) + + // Add a running job with old progress update + svc.jobsMutex.Lock() + svc.jobs["stale-job"] = &SyncJob{ + ID: "stale-job", + Provider: "walmart", + Status: StatusRunning, + StartedAt: time.Now().Add(-10 * time.Minute), // Started 10 min ago + Progress: SyncProgress{LastUpdate: time.Now().Add(-35 * time.Minute)}, // No update for 35 min + } + svc.jobsMutex.Unlock() + + // Should be stale because progress hasn't updated in 35 minutes (> 30 min threshold) + isStale := svc.IsJobStale("stale-job", 30*time.Minute, 2*time.Hour) + + assert.True(t, isStale) +} + +func TestSyncService_IsJobStale_RunningJob_StaleByDuration(t *testing.T) { + svc := NewSyncService(nil, nil, nil, testLogger(), nil) + + // Add a running job that's been running too long + svc.jobsMutex.Lock() + svc.jobs["long-job"] = &SyncJob{ + ID: "long-job", + Provider: "walmart", + Status: StatusRunning, + StartedAt: time.Now().Add(-3 * time.Hour), // Started 3 hours ago + Progress: SyncProgress{LastUpdate: time.Now()}, // Recent progress + } + svc.jobsMutex.Unlock() + + // Should be stale because it's been running longer than 2 hours max + isStale := svc.IsJobStale("long-job", 30*time.Minute, 2*time.Hour) + + assert.True(t, isStale) +} + +func TestSyncService_IsJobStale_RunningJob_NotStale(t *testing.T) { + svc := NewSyncService(nil, nil, nil, testLogger(), nil) + + // Add a healthy running job + svc.jobsMutex.Lock() + svc.jobs["healthy-job"] = &SyncJob{ + ID: "healthy-job", + Provider: "walmart", + Status: StatusRunning, + StartedAt: time.Now().Add(-10 * time.Minute), // Started 10 min ago + Progress: SyncProgress{LastUpdate: time.Now().Add(-5 * time.Minute)}, // Updated 5 min ago + } + svc.jobsMutex.Unlock() + + // Should NOT be stale - running for short time, recently updated + isStale := svc.IsJobStale("healthy-job", 30*time.Minute, 2*time.Hour) + + assert.False(t, isStale) +} + +func TestSyncService_MarkStaleJobsAsFailed_MarksStaleJobs(t *testing.T) { + svc := NewSyncService(nil, nil, nil, testLogger(), nil) + + // Add a stale job + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + svc.jobsMutex.Lock() + svc.jobs["stale-job"] = &SyncJob{ + ID: "stale-job", + Provider: "walmart", + Status: StatusRunning, + StartedAt: time.Now().Add(-3 * time.Hour), + Progress: SyncProgress{LastUpdate: time.Now().Add(-35 * time.Minute)}, + cancelFunc: cancel, + } + svc.jobsMutex.Unlock() + + // Mark stale jobs + marked := svc.MarkStaleJobsAsFailed(30*time.Minute, 2*time.Hour) + + assert.Equal(t, 1, marked) + + // Verify job was marked as failed + job, err := svc.GetSyncJob("stale-job") + assert.NoError(t, err) + assert.Equal(t, StatusFailed, job.Status) + assert.NotNil(t, job.CompletedAt) + assert.NotNil(t, job.Error) + assert.Contains(t, job.Error.Error(), "stale") + + // Verify context was cancelled + select { + case <-ctx.Done(): + // Expected + default: + t.Error("context should have been cancelled") + } +} + +func TestSyncService_MarkStaleJobsAsFailed_SkipsHealthyJobs(t *testing.T) { + svc := NewSyncService(nil, nil, nil, testLogger(), nil) + + // Add a healthy job + svc.jobsMutex.Lock() + svc.jobs["healthy-job"] = &SyncJob{ + ID: "healthy-job", + Provider: "walmart", + Status: StatusRunning, + StartedAt: time.Now().Add(-10 * time.Minute), + Progress: SyncProgress{LastUpdate: time.Now().Add(-5 * time.Minute)}, + } + svc.jobsMutex.Unlock() + + // Try to mark stale jobs + marked := svc.MarkStaleJobsAsFailed(30*time.Minute, 2*time.Hour) + + assert.Equal(t, 0, marked) + + // Verify job is still running + job, err := svc.GetSyncJob("healthy-job") + assert.NoError(t, err) + assert.Equal(t, StatusRunning, job.Status) +} + +func TestSyncService_MarkStaleJobsAsFailed_SkipsCompletedJobs(t *testing.T) { + svc := NewSyncService(nil, nil, nil, testLogger(), nil) + + // Add a completed job that would appear "stale" if we checked it + completedTime := time.Now().Add(-1 * time.Hour) + svc.jobsMutex.Lock() + svc.jobs["completed-job"] = &SyncJob{ + ID: "completed-job", + Provider: "walmart", + Status: StatusCompleted, + StartedAt: time.Now().Add(-3 * time.Hour), + CompletedAt: &completedTime, + Progress: SyncProgress{LastUpdate: completedTime}, + } + svc.jobsMutex.Unlock() + + // Try to mark stale jobs + marked := svc.MarkStaleJobsAsFailed(30*time.Minute, 2*time.Hour) + + assert.Equal(t, 0, marked) + + // Verify job is still completed (not changed to failed) + job, err := svc.GetSyncJob("completed-job") + assert.NoError(t, err) + assert.Equal(t, StatusCompleted, job.Status) +} + +func TestSyncService_CleanupOldJobs_RemovesOldCompletedJobs(t *testing.T) { + svc := NewSyncService(nil, nil, nil, testLogger(), nil) + + // Add an old completed job + oldTime := time.Now().Add(-25 * time.Hour) + svc.jobsMutex.Lock() + svc.jobs["old-job"] = &SyncJob{ + ID: "old-job", + Provider: "walmart", + Status: StatusCompleted, + CompletedAt: &oldTime, + } + svc.jobsMutex.Unlock() + + // Cleanup jobs older than 24 hours + removed := svc.CleanupOldJobs(24 * time.Hour) + + assert.Equal(t, 1, removed) + + // Verify job was removed + _, err := svc.GetSyncJob("old-job") + assert.Error(t, err) + assert.Contains(t, err.Error(), "not found") +} + +func TestSyncService_CleanupOldJobs_KeepsRecentCompletedJobs(t *testing.T) { + svc := NewSyncService(nil, nil, nil, testLogger(), nil) + + // Add a recently completed job + recentTime := time.Now().Add(-1 * time.Hour) + svc.jobsMutex.Lock() + svc.jobs["recent-job"] = &SyncJob{ + ID: "recent-job", + Provider: "walmart", + Status: StatusCompleted, + CompletedAt: &recentTime, + } + svc.jobsMutex.Unlock() + + // Cleanup jobs older than 24 hours + removed := svc.CleanupOldJobs(24 * time.Hour) + + assert.Equal(t, 0, removed) + + // Verify job still exists + _, err := svc.GetSyncJob("recent-job") + assert.NoError(t, err) +} + +func TestSyncService_CleanupOldJobs_KeepsRunningJobs(t *testing.T) { + svc := NewSyncService(nil, nil, nil, testLogger(), nil) + + // Add an old running job (shouldn't be removed by cleanup) + svc.jobsMutex.Lock() + svc.jobs["running-job"] = &SyncJob{ + ID: "running-job", + Provider: "walmart", + Status: StatusRunning, + StartedAt: time.Now().Add(-25 * time.Hour), + } + svc.jobsMutex.Unlock() + + // Cleanup old jobs + removed := svc.CleanupOldJobs(24 * time.Hour) + + // Running jobs should NOT be removed + assert.Equal(t, 0, removed) + + // Verify job still exists + _, err := svc.GetSyncJob("running-job") + assert.NoError(t, err) +} diff --git a/internal/application/sync/handlers/amazon.go b/internal/application/sync/handlers/amazon.go index 0d0a28a..32276af 100644 --- a/internal/application/sync/handlers/amazon.go +++ b/internal/application/sync/handlers/amazon.go @@ -3,9 +3,11 @@ package handlers import ( "context" + "errors" "fmt" "log/slog" "math" + "time" "github.com/eshaffer321/monarchmoney-go/pkg/monarch" "github.com/eshaffer321/monarchmoney-sync-backend/internal/adapters/providers" @@ -41,19 +43,73 @@ type CategorySplitter interface { GetSingleCategoryInfo(ctx context.Context, order providers.Order, categories []categorizer.Category) (string, string, error) } +// CategorySplitterWithDetails extends CategorySplitter with split details +type CategorySplitterWithDetails interface { + CategorySplitter + GetSplitDetails() []SplitDetail +} + +// SplitDetail represents detailed information about a split including items +type SplitDetail struct { + CategoryID string `json:"category_id"` + CategoryName string `json:"category_name"` + Amount float64 `json:"amount"` + Items []SplitDetailItem `json:"items"` + Notes string `json:"notes,omitempty"` +} + +// SplitDetailItem represents an item within a split +type SplitDetailItem struct { + Name string `json:"name"` + Quantity float64 `json:"quantity"` + UnitPrice float64 `json:"unit_price"` + TotalPrice float64 `json:"total_price"` + Category string `json:"category,omitempty"` +} + // MonarchClient provides access to Monarch Money API type MonarchClient interface { UpdateTransaction(ctx context.Context, id string, params *monarch.UpdateTransactionParams) error UpdateSplits(ctx context.Context, id string, splits []*monarch.TransactionSplit) error } +// LedgerData represents ledger data that can be saved +type LedgerData struct { + OrderID string + Provider string + RawJSON string + PaymentMethods []PaymentMethodData + TotalCharged float64 + ChargeCount int + PaymentMethodTypes string + HasRefunds bool + IsValid bool + ValidationNotes string +} + +// PaymentMethodData represents a single payment method's charges +type PaymentMethodData struct { + PaymentType string + CardType string + CardLastFour string + FinalCharges []float64 + ChargedDates []time.Time // Date/time of each charge (parallel to FinalCharges) + TotalCharged float64 +} + +// LedgerStorage provides access to ledger persistence +type LedgerStorage interface { + SaveLedger(ledger *LedgerData, syncRunID int64) error +} + // ProcessResult holds the result of processing an order type ProcessResult struct { - Processed bool - Skipped bool - SkipReason string - Allocations *allocator.Result - Splits []*monarch.TransactionSplit + Processed bool + Skipped bool + SkipReason string + Allocations *allocator.Result + Splits []*monarch.TransactionSplit + SplitDetails []SplitDetail // Detailed split info including items (only populated after successful Monarch API call) } // AmazonHandler processes Amazon orders with pro-rata allocation @@ -97,6 +153,15 @@ func (h *AmazonHandler) ProcessOrder( // Step 1: Get bank charges bankCharges, err := order.GetFinalCharges() if err != nil { + // Check if this is a pending payment (order not yet shipped/charged) + if errors.Is(err, amazonprovider.ErrPaymentPending) { + h.logInfo("Order payment pending (not yet shipped)", + "order_id", order.GetID(), + "order_total", order.GetTotal()) + result.Skipped = true + result.SkipReason = "payment pending" + return result, nil + } return nil, fmt.Errorf("failed to get bank charges: %w", err) } diff --git a/internal/application/sync/handlers/walmart.go b/internal/application/sync/handlers/walmart.go index 7ede29c..4c64189 100644 --- a/internal/application/sync/handlers/walmart.go +++ b/internal/application/sync/handlers/walmart.go @@ -3,6 +3,7 @@ package handlers import ( "context" + "encoding/json" "fmt" "log/slog" "math" @@ -22,13 +23,21 @@ type WalmartOrder interface { IsMultiDelivery() (bool, error) } +// WalmartOrderWithLedger extends WalmartOrder with ledger access for persistence +type WalmartOrderWithLedger interface { + WalmartOrder + GetRawLedger() interface{} // Returns *walmartclient.OrderLedger but using interface{} to avoid import +} + // WalmartHandler processes Walmart orders with multi-delivery and gift card support type WalmartHandler struct { - matcher *matcher.Matcher - consolidator TransactionConsolidator - splitter CategorySplitter - monarch MonarchClient - logger *slog.Logger + matcher *matcher.Matcher + consolidator TransactionConsolidator + splitter CategorySplitter + monarch MonarchClient + ledgerStorage LedgerStorage + syncRunID int64 + logger *slog.Logger } // NewWalmartHandler creates a new Walmart order handler @@ -48,6 +57,12 @@ func NewWalmartHandler( } } +// SetLedgerStorage sets the ledger storage for persisting ledger data +func (h *WalmartHandler) SetLedgerStorage(storage LedgerStorage, syncRunID int64) { + h.ledgerStorage = storage + h.syncRunID = syncRunID +} + // ProcessOrder processes a Walmart order func (h *WalmartHandler) ProcessOrder( ctx context.Context, @@ -82,6 +97,9 @@ func (h *WalmartHandler) ProcessOrder( "charges", bankCharges, "charge_count", len(bankCharges)) + // Save ledger data if storage is configured + h.saveLedgerIfAvailable(order) + // Step 2: Handle based on number of charges if len(bankCharges) > 1 { // Multi-delivery order @@ -362,3 +380,107 @@ func (h *WalmartHandler) logWarn(msg string, args ...any) { h.logger.Warn(msg, args...) } } + +// saveLedgerIfAvailable extracts and saves ledger data if storage is configured +func (h *WalmartHandler) saveLedgerIfAvailable(order WalmartOrder) { + // Skip if no storage configured + if h.ledgerStorage == nil { + return + } + + // Try to get the raw ledger from the concrete type + walmartOrder, ok := order.(*walmartprovider.Order) + if !ok { + h.logDebug("Cannot save ledger - order is not a Walmart provider order") + return + } + + rawLedger := walmartOrder.GetRawLedger() + if rawLedger == nil { + h.logDebug("Cannot save ledger - no ledger data available") + return + } + + // Convert the raw ledger to LedgerData + ledgerData := h.convertToLedgerData(order.GetID(), rawLedger) + + // Save it + if err := h.ledgerStorage.SaveLedger(ledgerData, h.syncRunID); err != nil { + h.logWarn("Failed to save ledger data", + "order_id", order.GetID(), + "error", err) + } else { + h.logDebug("Saved ledger data", + "order_id", order.GetID(), + "charge_count", ledgerData.ChargeCount) + } +} + +// convertToLedgerData converts raw Walmart ledger to the handler's LedgerData format +func (h *WalmartHandler) convertToLedgerData(orderID string, rawLedger interface{}) *LedgerData { + ledgerData := &LedgerData{ + OrderID: orderID, + Provider: "walmart", + IsValid: true, + } + + // Use reflection-free approach with type assertion + // The rawLedger is *walmartclient.OrderLedger + type walmartLedger struct { + OrderID string + PaymentMethods []struct { + PaymentType string + CardType string + LastFour string + FinalCharges []float64 + TotalCharged float64 + } + } + + // Marshal to JSON and back to extract the data + // This avoids importing the walmart client in handlers + import_json, _ := json.Marshal(rawLedger) + ledgerData.RawJSON = string(import_json) + + // Parse for payment method extraction + var parsed walmartLedger + if err := json.Unmarshal(import_json, &parsed); err != nil { + h.logWarn("Failed to parse ledger JSON", "error", err) + return ledgerData + } + + // Collect payment method types and charges + var paymentTypes []string + totalCharged := 0.0 + chargeCount := 0 + hasRefunds := false + + for _, pm := range parsed.PaymentMethods { + paymentTypes = append(paymentTypes, pm.PaymentType) + totalCharged += pm.TotalCharged + + pmData := PaymentMethodData{ + PaymentType: pm.PaymentType, + CardType: pm.CardType, + CardLastFour: pm.LastFour, + FinalCharges: pm.FinalCharges, + TotalCharged: pm.TotalCharged, + } + ledgerData.PaymentMethods = append(ledgerData.PaymentMethods, pmData) + + for _, charge := range pm.FinalCharges { + if charge > 0 { + chargeCount++ + } else if charge < 0 { + hasRefunds = true + } + } + } + + ledgerData.TotalCharged = totalCharged + ledgerData.ChargeCount = chargeCount + ledgerData.PaymentMethodTypes = strings.Join(paymentTypes, ",") + ledgerData.HasRefunds = hasRefunds + + return ledgerData +} diff --git a/internal/application/sync/orchestrator.go b/internal/application/sync/orchestrator.go index 533b103..bcc3a4d 100644 --- a/internal/application/sync/orchestrator.go +++ b/internal/application/sync/orchestrator.go @@ -19,15 +19,18 @@ func (o *Orchestrator) handleResult(order providers.Order, result *handlers.Proc return false, false, err } if result.Skipped { - o.logger.Warn("Order skipped", "order_id", order.GetID(), "reason", result.SkipReason) // Don't treat "payment pending" as an error - it's expected for new orders if result.SkipReason == "payment pending" { + o.logger.Info("Order pending (awaiting shipment/charge)", "order_id", order.GetID()) + o.recordPending(order, result.SkipReason) return false, true, nil } // Don't treat "already has splits" as an error - just skip silently if result.SkipReason == "transaction already has splits" { + o.logger.Debug("Order skipped (already has splits)", "order_id", order.GetID()) return false, true, nil } + o.logger.Warn("Order skipped", "order_id", order.GetID(), "reason", result.SkipReason) o.recordError(order, result.SkipReason) return false, false, fmt.Errorf("skipped: %s", result.SkipReason) } @@ -87,6 +90,13 @@ func (o *Orchestrator) processOrder( return false, true, nil } +// reportProgress calls the progress callback if set +func (o *Orchestrator) reportProgress(opts Options, update ProgressUpdate) { + if opts.ProgressCallback != nil { + opts.ProgressCallback(update) + } +} + // Run executes the sync process for the configured provider func (o *Orchestrator) Run(ctx context.Context, opts Options) (*Result, error) { result := &Result{ @@ -101,6 +111,9 @@ func (o *Orchestrator) Run(ctx context.Context, opts Options) (*Result, error) { "force", opts.Force, ) + // Report initial progress + o.reportProgress(opts, ProgressUpdate{Phase: "fetching_orders"}) + // 1. Fetch orders from provider orders, err := o.fetchOrders(ctx, opts) if err != nil { @@ -132,8 +145,19 @@ func (o *Orchestrator) Run(ctx context.Context, opts Options) (*Result, error) { if o.consolidator != nil { o.consolidator.SetRunID(o.runID) } + // Set up ledger storage for Walmart handler + if o.walmartHandler != nil { + o.walmartHandler.SetLedgerStorage(&ledgerStorageAdapter{repo: o.storage}, o.runID) + } } + // Report progress: starting order processing + totalOrders := len(orders) + o.reportProgress(opts, ProgressUpdate{ + Phase: "processing_orders", + TotalOrders: totalOrders, + }) + // 5. Process orders usedTransactionIDs := make(map[string]bool) @@ -156,7 +180,6 @@ func (o *Orchestrator) Run(ctx context.Context, opts Options) (*Result, error) { order.GetDate().Format("2006-01-02"), order.GetTotal(), err)) - continue } if processed { @@ -165,6 +188,15 @@ func (o *Orchestrator) Run(ctx context.Context, opts Options) (*Result, error) { if skipped { result.SkippedCount++ } + + // Report progress after each order + o.reportProgress(opts, ProgressUpdate{ + Phase: "processing_orders", + TotalOrders: totalOrders, + ProcessedOrders: result.ProcessedCount, + SkippedOrders: result.SkippedCount, + ErroredOrders: result.ErrorCount, + }) } // 6. Complete sync run diff --git a/internal/application/sync/recording.go b/internal/application/sync/recording.go index 2405532..ac3f792 100644 --- a/internal/application/sync/recording.go +++ b/internal/application/sync/recording.go @@ -70,6 +70,30 @@ func (o *Orchestrator) recordError(order providers.Order, errorMsg string) { } } +// recordPending records an order that is pending (not yet charged/shipped) +// This allows tracking without blocking retries on future syncs +func (o *Orchestrator) recordPending(order providers.Order, reason string) { + if o.storage != nil { + record := &storage.ProcessingRecord{ + OrderID: order.GetID(), + Provider: order.GetProviderName(), + OrderDate: order.GetDate(), + OrderTotal: order.GetTotal(), + OrderSubtotal: order.GetSubtotal(), + OrderTax: order.GetTax(), + OrderTip: order.GetTip(), + ItemCount: len(order.GetItems()), + ProcessedAt: time.Now(), + Status: "pending", + ErrorMessage: reason, + Items: convertOrderItems(order.GetItems()), + } + if err := o.storage.SaveRecord(record); err != nil { + o.logger.Error("Failed to save pending record", "order_id", order.GetID(), "error", err) + } + } +} + // recordSuccess records a successful processing to storage func (o *Orchestrator) recordSuccess(order providers.Order, transaction *monarch.Transaction, splits []*monarch.TransactionSplit, confidence float64, dryRun bool) { o.recordSuccessWithMultiDelivery(order, transaction, splits, confidence, dryRun, nil) diff --git a/internal/application/sync/types.go b/internal/application/sync/types.go index f380fff..bd7b1de 100644 --- a/internal/application/sync/types.go +++ b/internal/application/sync/types.go @@ -164,3 +164,62 @@ func (a *monarchAdapter) UpdateTransaction(ctx context.Context, id string, param func (a *monarchAdapter) UpdateSplits(ctx context.Context, id string, splits []*monarch.TransactionSplit) error { return a.client.Transactions.UpdateSplits(ctx, id, splits) } + +// ledgerStorageAdapter wraps storage.Repository to implement handlers.LedgerStorage +type ledgerStorageAdapter struct { + repo storage.Repository +} + +func (a *ledgerStorageAdapter) SaveLedger(ledger *handlers.LedgerData, syncRunID int64) error { + if a.repo == nil { + return nil + } + + // Convert handlers.LedgerData to storage.OrderLedger + orderLedger := &storage.OrderLedger{ + OrderID: ledger.OrderID, + SyncRunID: syncRunID, + Provider: ledger.Provider, + LedgerJSON: ledger.RawJSON, + TotalCharged: ledger.TotalCharged, + ChargeCount: ledger.ChargeCount, + PaymentMethodTypes: ledger.PaymentMethodTypes, + HasRefunds: ledger.HasRefunds, + IsValid: ledger.IsValid, + ValidationNotes: ledger.ValidationNotes, + } + + // Determine ledger state + if ledger.ChargeCount == 0 { + orderLedger.LedgerState = storage.LedgerStatePending + } else if ledger.HasRefunds { + orderLedger.LedgerState = storage.LedgerStatePartialRefund + } else { + orderLedger.LedgerState = storage.LedgerStateCharged + } + + // Convert payment methods to charges + chargeSeq := 0 + for _, pm := range ledger.PaymentMethods { + for _, charge := range pm.FinalCharges { + chargeSeq++ + chargeType := "payment" + if charge < 0 { + chargeType = "refund" + } + ledgerCharge := storage.LedgerCharge{ + OrderID: ledger.OrderID, + SyncRunID: syncRunID, + ChargeSequence: chargeSeq, + ChargeAmount: charge, + ChargeType: chargeType, + PaymentMethod: pm.PaymentType, + CardType: pm.CardType, + CardLastFour: pm.CardLastFour, + } + orderLedger.Charges = append(orderLedger.Charges, ledgerCharge) + } + } + + return a.repo.SaveLedger(orderLedger) +} diff --git a/internal/cli/serve.go b/internal/cli/serve.go index f2758ef..2f0b70b 100644 --- a/internal/cli/serve.go +++ b/internal/cli/serve.go @@ -71,6 +71,10 @@ func RunServe(cfg *config.Config, flags *ServeFlags) error { // Create sync service syncService = service.NewSyncService(cfg, serviceClients, store, logger, providerFactory) + + // Start background cleanup for stale jobs (checks every 5 minutes) + syncService.StartBackgroundCleanup(5 * time.Minute) + logger.Info("sync service initialized", "providers", []string{"walmart", "costco", "amazon"}) } @@ -92,6 +96,11 @@ func RunServe(cfg *config.Config, flags *ServeFlags) error { <-quit logger.Info("received shutdown signal") + // Stop background cleanup if sync service is running + if syncService != nil { + syncService.StopBackgroundCleanup() + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() diff --git a/internal/infrastructure/storage/interfaces.go b/internal/infrastructure/storage/interfaces.go index 7edce58..8e936d0 100644 --- a/internal/infrastructure/storage/interfaces.go +++ b/internal/infrastructure/storage/interfaces.go @@ -7,6 +7,7 @@ type Repository interface { OrderRepository SyncRunRepository APICallRepository + LedgerRepository Close() error } @@ -102,3 +103,27 @@ type APICallRepository interface { // GetAPICallsByRunID retrieves all API calls for a specific sync run GetAPICallsByRunID(runID int64) ([]APICall, error) } + +// LedgerRepository handles order ledger storage and history +type LedgerRepository interface { + // SaveLedger saves a ledger snapshot with its charges + SaveLedger(ledger *OrderLedger) error + + // GetLatestLedger retrieves the most recent ledger for an order + GetLatestLedger(orderID string) (*OrderLedger, error) + + // GetLedgerHistory retrieves all ledger snapshots for an order (newest first) + GetLedgerHistory(orderID string) ([]*OrderLedger, error) + + // GetLedgerByID retrieves a specific ledger by ID + GetLedgerByID(id int64) (*OrderLedger, error) + + // ListLedgers returns ledgers matching the given filters with pagination + ListLedgers(filters LedgerFilters) (*LedgerListResult, error) + + // UpdateChargeMatch updates a ledger charge's match status + UpdateChargeMatch(chargeID int64, transactionID string, confidence float64, splitCount int) error + + // GetUnmatchedCharges returns charges that haven't been matched to Monarch transactions + GetUnmatchedCharges(provider string, limit int) ([]LedgerCharge, error) +} diff --git a/internal/infrastructure/storage/ledger_test.go b/internal/infrastructure/storage/ledger_test.go new file mode 100644 index 0000000..5621672 --- /dev/null +++ b/internal/infrastructure/storage/ledger_test.go @@ -0,0 +1,536 @@ +package storage + +import ( + "os" + "testing" + "time" +) + +func TestStorage_SaveLedger(t *testing.T) { + // Create temp DB + tmpFile, err := os.CreateTemp("", "ledger_test_*.db") + if err != nil { + t.Fatal(err) + } + defer os.Remove(tmpFile.Name()) + tmpFile.Close() + + s, err := NewStorage(tmpFile.Name()) + if err != nil { + t.Fatalf("failed to create storage: %v", err) + } + defer s.Close() + + // Create a ledger (SyncRunID 0 means NULL, no foreign key constraint) + ledger := &OrderLedger{ + OrderID: "test-order-123", + SyncRunID: 0, + Provider: "walmart", + FetchedAt: time.Now(), + LedgerState: LedgerStateCharged, + LedgerJSON: `{"test": "data"}`, + TotalCharged: 99.99, + ChargeCount: 1, + PaymentMethodTypes: "CREDITCARD", + HasRefunds: false, + IsValid: true, + ValidationNotes: "", + Charges: []LedgerCharge{ + { + ChargeSequence: 1, + ChargeAmount: 99.99, + ChargeType: "payment", + PaymentMethod: "CREDITCARD", + CardType: "VISA", + CardLastFour: "1234", + }, + }, + } + + // Save the ledger + err = s.SaveLedger(ledger) + if err != nil { + t.Fatalf("failed to save ledger: %v", err) + } + + // Verify ID was assigned + if ledger.ID == 0 { + t.Error("expected ledger ID to be assigned") + } + + // Retrieve it + retrieved, err := s.GetLatestLedger("test-order-123") + if err != nil { + t.Fatalf("failed to get latest ledger: %v", err) + } + + if retrieved == nil { + t.Fatal("expected to retrieve ledger, got nil") + } + + if retrieved.OrderID != "test-order-123" { + t.Errorf("expected order_id 'test-order-123', got %q", retrieved.OrderID) + } + + if retrieved.Provider != "walmart" { + t.Errorf("expected provider 'walmart', got %q", retrieved.Provider) + } + + if retrieved.LedgerState != LedgerStateCharged { + t.Errorf("expected state 'charged', got %q", retrieved.LedgerState) + } + + if retrieved.TotalCharged != 99.99 { + t.Errorf("expected total_charged 99.99, got %f", retrieved.TotalCharged) + } + + if retrieved.LedgerVersion != 1 { + t.Errorf("expected version 1, got %d", retrieved.LedgerVersion) + } + + // Check charges were saved + if len(retrieved.Charges) != 1 { + t.Errorf("expected 1 charge, got %d", len(retrieved.Charges)) + } else { + charge := retrieved.Charges[0] + if charge.ChargeAmount != 99.99 { + t.Errorf("expected charge amount 99.99, got %f", charge.ChargeAmount) + } + if charge.PaymentMethod != "CREDITCARD" { + t.Errorf("expected payment method 'CREDITCARD', got %q", charge.PaymentMethod) + } + if charge.CardLastFour != "1234" { + t.Errorf("expected card last four '1234', got %q", charge.CardLastFour) + } + } +} + +func TestStorage_LedgerVersioning(t *testing.T) { + // Create temp DB + tmpFile, err := os.CreateTemp("", "ledger_version_test_*.db") + if err != nil { + t.Fatal(err) + } + defer os.Remove(tmpFile.Name()) + tmpFile.Close() + + s, err := NewStorage(tmpFile.Name()) + if err != nil { + t.Fatalf("failed to create storage: %v", err) + } + defer s.Close() + + // Save first ledger (pending state) + ledger1 := &OrderLedger{ + OrderID: "order-456", + Provider: "walmart", + FetchedAt: time.Now().Add(-24 * time.Hour), + LedgerState: LedgerStatePending, + LedgerJSON: `{"state": "pending"}`, + } + if err := s.SaveLedger(ledger1); err != nil { + t.Fatalf("failed to save ledger 1: %v", err) + } + + // Save second ledger (charged state) + ledger2 := &OrderLedger{ + OrderID: "order-456", + Provider: "walmart", + FetchedAt: time.Now(), + LedgerState: LedgerStateCharged, + LedgerJSON: `{"state": "charged"}`, + TotalCharged: 50.00, + ChargeCount: 1, + Charges: []LedgerCharge{ + { + ChargeSequence: 1, + ChargeAmount: 50.00, + ChargeType: "payment", + PaymentMethod: "CREDITCARD", + }, + }, + } + if err := s.SaveLedger(ledger2); err != nil { + t.Fatalf("failed to save ledger 2: %v", err) + } + + // Version should be 2 now + if ledger2.LedgerVersion != 2 { + t.Errorf("expected version 2, got %d", ledger2.LedgerVersion) + } + + // Get latest should return the highest version (version 2) + latest, err := s.GetLatestLedger("order-456") + if err != nil { + t.Fatalf("failed to get latest: %v", err) + } + if latest.LedgerVersion != 2 { + t.Errorf("expected version 2 (latest), got %d", latest.LedgerVersion) + } + if latest.LedgerState != LedgerStateCharged { + t.Errorf("expected state 'charged', got %q", latest.LedgerState) + } + + // Get history should return both + history, err := s.GetLedgerHistory("order-456") + if err != nil { + t.Fatalf("failed to get history: %v", err) + } + if len(history) != 2 { + t.Fatalf("expected 2 history entries, got %d", len(history)) + } + // Verify both versions are present (order might vary due to same timestamp) + foundV1, foundV2 := false, false + for _, h := range history { + if h.LedgerVersion == 1 && h.LedgerState == LedgerStatePending { + foundV1 = true + } + if h.LedgerVersion == 2 && h.LedgerState == LedgerStateCharged { + foundV2 = true + } + } + if !foundV1 { + t.Error("expected to find version 1 with pending state") + } + if !foundV2 { + t.Error("expected to find version 2 with charged state") + } +} + +func TestStorage_GetLedgerByID(t *testing.T) { + // Create temp DB + tmpFile, err := os.CreateTemp("", "ledger_byid_test_*.db") + if err != nil { + t.Fatal(err) + } + defer os.Remove(tmpFile.Name()) + tmpFile.Close() + + s, err := NewStorage(tmpFile.Name()) + if err != nil { + t.Fatalf("failed to create storage: %v", err) + } + defer s.Close() + + // Save a ledger + ledger := &OrderLedger{ + OrderID: "order-789", + Provider: "costco", + FetchedAt: time.Now(), + LedgerState: LedgerStateCharged, + LedgerJSON: `{}`, + } + if err := s.SaveLedger(ledger); err != nil { + t.Fatalf("failed to save: %v", err) + } + + // Retrieve by ID + retrieved, err := s.GetLedgerByID(ledger.ID) + if err != nil { + t.Fatalf("failed to get by ID: %v", err) + } + if retrieved == nil { + t.Fatal("expected ledger, got nil") + } + if retrieved.OrderID != "order-789" { + t.Errorf("expected order_id 'order-789', got %q", retrieved.OrderID) + } + + // Non-existent ID should return nil + missing, err := s.GetLedgerByID(9999) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if missing != nil { + t.Error("expected nil for non-existent ID") + } +} + +func TestStorage_ListLedgers(t *testing.T) { + // Create temp DB + tmpFile, err := os.CreateTemp("", "ledger_list_test_*.db") + if err != nil { + t.Fatal(err) + } + defer os.Remove(tmpFile.Name()) + tmpFile.Close() + + s, err := NewStorage(tmpFile.Name()) + if err != nil { + t.Fatalf("failed to create storage: %v", err) + } + defer s.Close() + + // Save multiple ledgers + for _, data := range []struct { + orderID string + provider string + state LedgerState + }{ + {"order-1", "walmart", LedgerStateCharged}, + {"order-2", "walmart", LedgerStatePending}, + {"order-3", "costco", LedgerStateCharged}, + {"order-4", "amazon", LedgerStateRefunded}, + } { + ledger := &OrderLedger{ + OrderID: data.orderID, + Provider: data.provider, + FetchedAt: time.Now(), + LedgerState: data.state, + LedgerJSON: `{}`, + } + if err := s.SaveLedger(ledger); err != nil { + t.Fatalf("failed to save ledger: %v", err) + } + } + + // List all + result, err := s.ListLedgers(LedgerFilters{}) + if err != nil { + t.Fatalf("failed to list: %v", err) + } + if result.TotalCount != 4 { + t.Errorf("expected total count 4, got %d", result.TotalCount) + } + + // Filter by provider + result, err = s.ListLedgers(LedgerFilters{Provider: "walmart"}) + if err != nil { + t.Fatalf("failed to list: %v", err) + } + if result.TotalCount != 2 { + t.Errorf("expected 2 walmart ledgers, got %d", result.TotalCount) + } + + // Filter by state + result, err = s.ListLedgers(LedgerFilters{State: LedgerStateCharged}) + if err != nil { + t.Fatalf("failed to list: %v", err) + } + if result.TotalCount != 2 { + t.Errorf("expected 2 charged ledgers, got %d", result.TotalCount) + } + + // Pagination + result, err = s.ListLedgers(LedgerFilters{Limit: 2, Offset: 0}) + if err != nil { + t.Fatalf("failed to list: %v", err) + } + if len(result.Ledgers) != 2 { + t.Errorf("expected 2 ledgers with limit, got %d", len(result.Ledgers)) + } + if result.TotalCount != 4 { + t.Errorf("expected total count 4, got %d", result.TotalCount) + } +} + +func TestStorage_UpdateChargeMatch(t *testing.T) { + // Create temp DB + tmpFile, err := os.CreateTemp("", "ledger_match_test_*.db") + if err != nil { + t.Fatal(err) + } + defer os.Remove(tmpFile.Name()) + tmpFile.Close() + + s, err := NewStorage(tmpFile.Name()) + if err != nil { + t.Fatalf("failed to create storage: %v", err) + } + defer s.Close() + + // Save a ledger with charges + ledger := &OrderLedger{ + OrderID: "match-order", + Provider: "walmart", + FetchedAt: time.Now(), + LedgerState: LedgerStateCharged, + LedgerJSON: `{}`, + Charges: []LedgerCharge{ + { + ChargeSequence: 1, + ChargeAmount: 100.00, + ChargeType: "payment", + PaymentMethod: "CREDITCARD", + }, + }, + } + if err := s.SaveLedger(ledger); err != nil { + t.Fatalf("failed to save: %v", err) + } + + // Get the charge ID + retrieved, _ := s.GetLatestLedger("match-order") + chargeID := retrieved.Charges[0].ID + + // Update match + err = s.UpdateChargeMatch(chargeID, "monarch-tx-123", 0.95, 3) + if err != nil { + t.Fatalf("failed to update match: %v", err) + } + + // Verify update + retrieved, _ = s.GetLatestLedger("match-order") + charge := retrieved.Charges[0] + if !charge.IsMatched { + t.Error("expected charge to be matched") + } + if charge.MonarchTransactionID != "monarch-tx-123" { + t.Errorf("expected monarch tx 'monarch-tx-123', got %q", charge.MonarchTransactionID) + } + if charge.MatchConfidence != 0.95 { + t.Errorf("expected confidence 0.95, got %f", charge.MatchConfidence) + } + if charge.SplitCount != 3 { + t.Errorf("expected split count 3, got %d", charge.SplitCount) + } +} + +func TestStorage_GetUnmatchedCharges(t *testing.T) { + // Create temp DB + tmpFile, err := os.CreateTemp("", "ledger_unmatched_test_*.db") + if err != nil { + t.Fatal(err) + } + defer os.Remove(tmpFile.Name()) + tmpFile.Close() + + s, err := NewStorage(tmpFile.Name()) + if err != nil { + t.Fatalf("failed to create storage: %v", err) + } + defer s.Close() + + // Save ledgers with mixed matched/unmatched charges + ledger1 := &OrderLedger{ + OrderID: "unmatched-1", + Provider: "walmart", + FetchedAt: time.Now(), + LedgerState: LedgerStateCharged, + LedgerJSON: `{}`, + Charges: []LedgerCharge{ + {ChargeSequence: 1, ChargeAmount: 50.00, ChargeType: "payment", PaymentMethod: "CREDITCARD"}, + }, + } + if err := s.SaveLedger(ledger1); err != nil { + t.Fatalf("failed to save: %v", err) + } + + ledger2 := &OrderLedger{ + OrderID: "unmatched-2", + Provider: "costco", + FetchedAt: time.Now(), + LedgerState: LedgerStateCharged, + LedgerJSON: `{}`, + Charges: []LedgerCharge{ + {ChargeSequence: 1, ChargeAmount: 75.00, ChargeType: "payment", PaymentMethod: "CREDITCARD"}, + }, + } + if err := s.SaveLedger(ledger2); err != nil { + t.Fatalf("failed to save: %v", err) + } + + // Mark one as matched + retrieved, _ := s.GetLatestLedger("unmatched-1") + s.UpdateChargeMatch(retrieved.Charges[0].ID, "tx-matched", 1.0, 1) + + // Get unmatched for all providers + unmatched, err := s.GetUnmatchedCharges("", 50) + if err != nil { + t.Fatalf("failed to get unmatched: %v", err) + } + if len(unmatched) != 1 { + t.Errorf("expected 1 unmatched charge, got %d", len(unmatched)) + } + if len(unmatched) > 0 && unmatched[0].OrderID != "unmatched-2" { + t.Errorf("expected order 'unmatched-2', got %q", unmatched[0].OrderID) + } + + // Get unmatched for specific provider + unmatched, err = s.GetUnmatchedCharges("walmart", 50) + if err != nil { + t.Fatalf("failed to get unmatched: %v", err) + } + if len(unmatched) != 0 { + t.Errorf("expected 0 unmatched walmart charges, got %d", len(unmatched)) + } +} + +func TestMockRepository_Ledger(t *testing.T) { + mock := NewMockRepository() + + // Test SaveLedger + ledger := &OrderLedger{ + OrderID: "mock-order-1", + Provider: "walmart", + FetchedAt: time.Now(), + LedgerState: LedgerStateCharged, + LedgerJSON: `{"test": true}`, + Charges: []LedgerCharge{ + {ChargeSequence: 1, ChargeAmount: 25.00, ChargeType: "payment"}, + }, + } + if err := mock.SaveLedger(ledger); err != nil { + t.Fatalf("failed to save: %v", err) + } + + if !mock.SaveLedgerCalled { + t.Error("expected SaveLedgerCalled to be true") + } + if mock.LastSavedLedger != ledger { + t.Error("expected LastSavedLedger to be set") + } + if ledger.ID == 0 { + t.Error("expected ID to be assigned") + } + if ledger.LedgerVersion != 1 { + t.Errorf("expected version 1, got %d", ledger.LedgerVersion) + } + + // Test GetLatestLedger + latest, err := mock.GetLatestLedger("mock-order-1") + if err != nil { + t.Fatalf("failed to get latest: %v", err) + } + if latest == nil { + t.Fatal("expected ledger, got nil") + } + if latest.OrderID != "mock-order-1" { + t.Errorf("expected order 'mock-order-1', got %q", latest.OrderID) + } + + // Test version incrementing + ledger2 := &OrderLedger{ + OrderID: "mock-order-1", + Provider: "walmart", + FetchedAt: time.Now(), + LedgerState: LedgerStateRefunded, + LedgerJSON: `{"refund": true}`, + } + mock.SaveLedger(ledger2) + if ledger2.LedgerVersion != 2 { + t.Errorf("expected version 2, got %d", ledger2.LedgerVersion) + } + + // Test GetLedgerHistory + history, _ := mock.GetLedgerHistory("mock-order-1") + if len(history) != 2 { + t.Errorf("expected 2 history entries, got %d", len(history)) + } + + // Test error injection + mock.SaveLedgerErr = os.ErrNotExist + if err := mock.SaveLedger(&OrderLedger{}); err != os.ErrNotExist { + t.Errorf("expected injected error, got %v", err) + } + + // Test Reset clears ledger data + mock.Reset() + if mock.SaveLedgerCalled { + t.Error("expected SaveLedgerCalled to be false after reset") + } + latest, _ = mock.GetLatestLedger("mock-order-1") + if latest != nil { + t.Error("expected nil after reset") + } +} diff --git a/internal/infrastructure/storage/migrations.go b/internal/infrastructure/storage/migrations.go index 0d318cb..a521e10 100644 --- a/internal/infrastructure/storage/migrations.go +++ b/internal/infrastructure/storage/migrations.go @@ -35,6 +35,11 @@ var allMigrations = []Migration{ Name: "backfill_null_values", Up: migration004BackfillNullValues, }, + { + Version: 5, + Name: "add_ledger_tables", + Up: migration005AddLedgerTables, + }, } // runMigrations executes all pending migrations @@ -291,3 +296,87 @@ func migration004BackfillNullValues(db *sql.Tx) error { return nil } + +// migration005AddLedgerTables creates tables for storing order ledger data and charges. +// This enables: +// - Tracking ledger state changes over time (payment_pending → charged → refunded) +// - Per-charge tracking for multi-delivery orders +// - Detecting when ledger changes require re-processing +// - Audit trail for debugging and refund matching +func migration005AddLedgerTables(db *sql.Tx) error { + queries := []string{ + // order_ledgers: Store ledger snapshots with history + `CREATE TABLE IF NOT EXISTS order_ledgers ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + order_id TEXT NOT NULL, + sync_run_id INTEGER, + provider TEXT NOT NULL, + fetched_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + ledger_state TEXT NOT NULL, + ledger_version INTEGER DEFAULT 1, + ledger_json TEXT NOT NULL, + total_charged REAL, + charge_count INTEGER, + payment_method_types TEXT, + has_refunds BOOLEAN DEFAULT 0, + is_valid BOOLEAN DEFAULT 1, + validation_notes TEXT, + FOREIGN KEY (sync_run_id) REFERENCES sync_runs(id) + )`, + + // Indexes for order_ledgers + `CREATE INDEX IF NOT EXISTS idx_order_ledgers_order_id + ON order_ledgers(order_id)`, + + `CREATE INDEX IF NOT EXISTS idx_order_ledgers_provider + ON order_ledgers(provider)`, + + `CREATE INDEX IF NOT EXISTS idx_order_ledgers_state + ON order_ledgers(ledger_state)`, + + `CREATE INDEX IF NOT EXISTS idx_order_ledgers_fetched + ON order_ledgers(fetched_at DESC)`, + + // ledger_charges: Normalized charge entries for querying + `CREATE TABLE IF NOT EXISTS ledger_charges ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + order_ledger_id INTEGER NOT NULL, + order_id TEXT NOT NULL, + sync_run_id INTEGER, + charge_sequence INTEGER NOT NULL, + charge_amount REAL NOT NULL, + charge_type TEXT, + payment_method TEXT, + card_type TEXT, + card_last_four TEXT, + monarch_transaction_id TEXT, + is_matched BOOLEAN DEFAULT 0, + match_confidence REAL, + matched_at TIMESTAMP, + split_count INTEGER, + FOREIGN KEY (order_ledger_id) REFERENCES order_ledgers(id), + FOREIGN KEY (sync_run_id) REFERENCES sync_runs(id) + )`, + + // Indexes for ledger_charges + `CREATE INDEX IF NOT EXISTS idx_ledger_charges_order_id + ON ledger_charges(order_id)`, + + `CREATE INDEX IF NOT EXISTS idx_ledger_charges_ledger_id + ON ledger_charges(order_ledger_id)`, + + `CREATE INDEX IF NOT EXISTS idx_ledger_charges_monarch_tx + ON ledger_charges(monarch_transaction_id)`, + + `CREATE INDEX IF NOT EXISTS idx_ledger_charges_unmatched + ON ledger_charges(is_matched) WHERE is_matched = 0`, + } + + for _, query := range queries { + if _, err := db.Exec(query); err != nil { + return fmt.Errorf("failed to create ledger tables: %w", err) + } + } + + return nil +} diff --git a/internal/infrastructure/storage/mock.go b/internal/infrastructure/storage/mock.go index 84e23f5..e77d19f 100644 --- a/internal/infrastructure/storage/mock.go +++ b/internal/infrastructure/storage/mock.go @@ -3,10 +3,14 @@ package storage // MockRepository is an in-memory implementation of Repository for testing. // It stores all data in maps and slices, making tests fast and isolated. type MockRepository struct { - records map[string]*ProcessingRecord - syncRuns map[int64]*mockSyncRun - apiCalls []APICall - nextRunID int64 + records map[string]*ProcessingRecord + syncRuns map[int64]*mockSyncRun + apiCalls []APICall + ledgers map[string][]*OrderLedger // Keyed by order_id + ledgerCharges map[int64][]LedgerCharge // Keyed by ledger_id + nextRunID int64 + nextLedgerID int64 + nextChargeID int64 // Hooks for test assertions SaveRecordCalled bool @@ -15,6 +19,8 @@ type MockRepository struct { IsProcessedCalled bool StartSyncRunCalled bool LogAPICallCalled bool + SaveLedgerCalled bool + LastSavedLedger *OrderLedger // Error injection for testing error paths SaveRecordErr error @@ -22,6 +28,7 @@ type MockRepository struct { StartSyncRunErr error CompleteSyncRunErr error LogAPICallErr error + SaveLedgerErr error } type mockSyncRun struct { @@ -39,10 +46,14 @@ type mockSyncRun struct { // NewMockRepository creates a new mock repository for testing func NewMockRepository() *MockRepository { return &MockRepository{ - records: make(map[string]*ProcessingRecord), - syncRuns: make(map[int64]*mockSyncRun), - apiCalls: make([]APICall, 0), - nextRunID: 1, + records: make(map[string]*ProcessingRecord), + syncRuns: make(map[int64]*mockSyncRun), + apiCalls: make([]APICall, 0), + ledgers: make(map[string][]*OrderLedger), + ledgerCharges: make(map[int64][]LedgerCharge), + nextRunID: 1, + nextLedgerID: 1, + nextChargeID: 1, } } @@ -386,16 +397,203 @@ func (m *MockRepository) Reset() { m.records = make(map[string]*ProcessingRecord) m.syncRuns = make(map[int64]*mockSyncRun) m.apiCalls = make([]APICall, 0) + m.ledgers = make(map[string][]*OrderLedger) + m.ledgerCharges = make(map[int64][]LedgerCharge) m.nextRunID = 1 + m.nextLedgerID = 1 + m.nextChargeID = 1 m.SaveRecordCalled = false m.LastSavedRecord = nil m.GetRecordCalled = false m.IsProcessedCalled = false m.StartSyncRunCalled = false m.LogAPICallCalled = false + m.SaveLedgerCalled = false + m.LastSavedLedger = nil m.SaveRecordErr = nil m.GetRecordErr = nil m.StartSyncRunErr = nil m.CompleteSyncRunErr = nil m.LogAPICallErr = nil + m.SaveLedgerErr = nil +} + +// ================================================================ +// LEDGER REPOSITORY METHODS +// ================================================================ + +// SaveLedger saves a ledger snapshot with its charges +func (m *MockRepository) SaveLedger(ledger *OrderLedger) error { + m.SaveLedgerCalled = true + m.LastSavedLedger = ledger + if m.SaveLedgerErr != nil { + return m.SaveLedgerErr + } + + // Assign ID + ledger.ID = m.nextLedgerID + m.nextLedgerID++ + + // Calculate version based on existing ledgers for this order + existingLedgers := m.ledgers[ledger.OrderID] + ledger.LedgerVersion = len(existingLedgers) + 1 + + // Deep copy the ledger + copied := *ledger + + // Process and store charges + var charges []LedgerCharge + for i := range ledger.Charges { + charge := ledger.Charges[i] + charge.ID = m.nextChargeID + m.nextChargeID++ + charge.OrderLedgerID = copied.ID + charge.OrderID = copied.OrderID + charge.SyncRunID = copied.SyncRunID + charges = append(charges, charge) + } + copied.Charges = charges + + // Store in ledgers map (append to history) + m.ledgers[ledger.OrderID] = append(m.ledgers[ledger.OrderID], &copied) + + // Store charges by ledger ID + m.ledgerCharges[copied.ID] = charges + + return nil +} + +// GetLatestLedger retrieves the most recent ledger for an order +func (m *MockRepository) GetLatestLedger(orderID string) (*OrderLedger, error) { + ledgers := m.ledgers[orderID] + if len(ledgers) == 0 { + return nil, nil + } + // Return the last one (most recent) + latest := ledgers[len(ledgers)-1] + // Attach charges + result := *latest + result.Charges = m.ledgerCharges[latest.ID] + return &result, nil +} + +// GetLedgerHistory retrieves all ledger snapshots for an order (newest first) +func (m *MockRepository) GetLedgerHistory(orderID string) ([]*OrderLedger, error) { + ledgers := m.ledgers[orderID] + if len(ledgers) == 0 { + return nil, nil + } + + // Return in reverse order (newest first) + result := make([]*OrderLedger, len(ledgers)) + for i, l := range ledgers { + copied := *l + copied.Charges = m.ledgerCharges[l.ID] + result[len(ledgers)-1-i] = &copied + } + return result, nil +} + +// GetLedgerByID retrieves a specific ledger by ID +func (m *MockRepository) GetLedgerByID(id int64) (*OrderLedger, error) { + for _, ledgers := range m.ledgers { + for _, l := range ledgers { + if l.ID == id { + result := *l + result.Charges = m.ledgerCharges[l.ID] + return &result, nil + } + } + } + return nil, nil +} + +// ListLedgers returns ledgers matching the given filters with pagination +func (m *MockRepository) ListLedgers(filters LedgerFilters) (*LedgerListResult, error) { + var matching []*OrderLedger + + for _, ledgers := range m.ledgers { + for _, l := range ledgers { + // Apply filters + if filters.OrderID != "" && l.OrderID != filters.OrderID { + continue + } + if filters.Provider != "" && l.Provider != filters.Provider { + continue + } + if filters.State != "" && l.LedgerState != filters.State { + continue + } + matching = append(matching, l) + } + } + + // Apply defaults + limit := filters.Limit + if limit == 0 { + limit = 50 + } + + // Apply pagination + total := len(matching) + start := filters.Offset + if start > total { + start = total + } + end := start + limit + if end > total { + end = total + } + + return &LedgerListResult{ + Ledgers: matching[start:end], + TotalCount: total, + Limit: limit, + Offset: filters.Offset, + }, nil +} + +// UpdateChargeMatch updates a ledger charge's match status +func (m *MockRepository) UpdateChargeMatch(chargeID int64, transactionID string, confidence float64, splitCount int) error { + for ledgerID, charges := range m.ledgerCharges { + for i, charge := range charges { + if charge.ID == chargeID { + m.ledgerCharges[ledgerID][i].MonarchTransactionID = transactionID + m.ledgerCharges[ledgerID][i].IsMatched = true + m.ledgerCharges[ledgerID][i].MatchConfidence = confidence + m.ledgerCharges[ledgerID][i].SplitCount = splitCount + return nil + } + } + } + return nil +} + +// GetUnmatchedCharges returns charges that haven't been matched to Monarch transactions +func (m *MockRepository) GetUnmatchedCharges(provider string, limit int) ([]LedgerCharge, error) { + if limit == 0 { + limit = 50 + } + + var result []LedgerCharge + for _, charges := range m.ledgerCharges { + for _, charge := range charges { + if charge.IsMatched { + continue + } + if charge.ChargeType != "payment" { + continue + } + // Check provider via the ledger + ledger, _ := m.GetLedgerByID(charge.OrderLedgerID) + if ledger != nil && provider != "" && ledger.Provider != provider { + continue + } + result = append(result, charge) + if len(result) >= limit { + return result, nil + } + } + } + return result, nil } diff --git a/internal/infrastructure/storage/models.go b/internal/infrastructure/storage/models.go index e80bacc..c3b7abd 100644 --- a/internal/infrastructure/storage/models.go +++ b/internal/infrastructure/storage/models.go @@ -121,3 +121,70 @@ type APICall struct { Error string DurationMs int64 } + +// LedgerState represents the current state of an order's ledger +type LedgerState string + +const ( + LedgerStatePending LedgerState = "payment_pending" + LedgerStateCharged LedgerState = "charged" + LedgerStateRefunded LedgerState = "refunded" + LedgerStatePartialRefund LedgerState = "partial_refund" +) + +// OrderLedger represents a snapshot of an order's ledger at a point in time +type OrderLedger struct { + ID int64 `json:"id"` + OrderID string `json:"order_id"` + SyncRunID int64 `json:"sync_run_id,omitempty"` + Provider string `json:"provider"` + FetchedAt time.Time `json:"fetched_at"` + LedgerState LedgerState `json:"ledger_state"` + LedgerVersion int `json:"ledger_version"` + LedgerJSON string `json:"ledger_json"` // Raw provider JSON + TotalCharged float64 `json:"total_charged"` // Sum of all charges + ChargeCount int `json:"charge_count"` // Number of charges + PaymentMethodTypes string `json:"payment_method_types"` // Comma-separated: "CREDITCARD,GIFTCARD" + HasRefunds bool `json:"has_refunds"` + IsValid bool `json:"is_valid"` + ValidationNotes string `json:"validation_notes,omitempty"` + + // Populated from ledger_charges table + Charges []LedgerCharge `json:"charges,omitempty"` +} + +// LedgerCharge represents a single charge within a ledger +type LedgerCharge struct { + ID int64 `json:"id"` + OrderLedgerID int64 `json:"order_ledger_id"` + OrderID string `json:"order_id"` + SyncRunID int64 `json:"sync_run_id,omitempty"` + ChargeSequence int `json:"charge_sequence"` // Order within ledger + ChargeAmount float64 `json:"charge_amount"` + ChargeType string `json:"charge_type"` // "payment", "refund" + PaymentMethod string `json:"payment_method"` // "CREDITCARD", "GIFTCARD" + CardType string `json:"card_type,omitempty"` // "VISA", "AMEX" + CardLastFour string `json:"card_last_four,omitempty"` + MonarchTransactionID string `json:"monarch_transaction_id,omitempty"` + IsMatched bool `json:"is_matched"` + MatchConfidence float64 `json:"match_confidence,omitempty"` + MatchedAt time.Time `json:"matched_at,omitempty"` + SplitCount int `json:"split_count,omitempty"` +} + +// LedgerFilters defines filters for querying ledgers +type LedgerFilters struct { + OrderID string // Filter by order ID + Provider string // Filter by provider + State LedgerState // Filter by ledger state + Limit int // Max results (0 = default 50) + Offset int // Pagination offset +} + +// LedgerListResult contains paginated ledger results +type LedgerListResult struct { + Ledgers []*OrderLedger `json:"ledgers"` + TotalCount int `json:"total_count"` + Limit int `json:"limit"` + Offset int `json:"offset"` +} diff --git a/internal/infrastructure/storage/sqlite.go b/internal/infrastructure/storage/sqlite.go index 9fa8f26..f0c2567 100644 --- a/internal/infrastructure/storage/sqlite.go +++ b/internal/infrastructure/storage/sqlite.go @@ -4,6 +4,7 @@ import ( "database/sql" "encoding/json" "fmt" + "time" _ "github.com/mattn/go-sqlite3" ) @@ -662,3 +663,573 @@ func (s *Storage) GetSyncRun(runID int64) (*SyncRun, error) { return &r, nil } + +// ================================================================ +// LEDGER REPOSITORY IMPLEMENTATION +// ================================================================ + +// SaveLedger saves a ledger snapshot with its charges in a transaction +func (s *Storage) SaveLedger(ledger *OrderLedger) error { + tx, err := s.db.Begin() + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + defer func() { _ = tx.Rollback() }() + + // Determine the next ledger version for this order + var currentVersion int + err = tx.QueryRow(` + SELECT COALESCE(MAX(ledger_version), 0) + FROM order_ledgers + WHERE order_id = ? AND provider = ? + `, ledger.OrderID, ledger.Provider).Scan(¤tVersion) + if err != nil { + return fmt.Errorf("failed to get current version: %w", err) + } + ledger.LedgerVersion = currentVersion + 1 + + // Insert the ledger + result, err := tx.Exec(` + INSERT INTO order_ledgers + (order_id, sync_run_id, provider, ledger_state, ledger_version, + ledger_json, total_charged, charge_count, payment_method_types, + has_refunds, is_valid, validation_notes) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `, + ledger.OrderID, + nullInt64(ledger.SyncRunID), + ledger.Provider, + ledger.LedgerState, + ledger.LedgerVersion, + ledger.LedgerJSON, + ledger.TotalCharged, + ledger.ChargeCount, + ledger.PaymentMethodTypes, + ledger.HasRefunds, + ledger.IsValid, + ledger.ValidationNotes, + ) + if err != nil { + return fmt.Errorf("failed to insert ledger: %w", err) + } + + ledgerID, err := result.LastInsertId() + if err != nil { + return fmt.Errorf("failed to get ledger ID: %w", err) + } + ledger.ID = ledgerID + + // Insert charges + for i := range ledger.Charges { + charge := &ledger.Charges[i] + charge.OrderLedgerID = ledgerID + charge.OrderID = ledger.OrderID + charge.SyncRunID = ledger.SyncRunID + + result, err := tx.Exec(` + INSERT INTO ledger_charges + (order_ledger_id, order_id, sync_run_id, charge_sequence, + charge_amount, charge_type, payment_method, card_type, card_last_four, + monarch_transaction_id, is_matched, match_confidence, matched_at, split_count) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `, + charge.OrderLedgerID, + charge.OrderID, + nullInt64(charge.SyncRunID), + charge.ChargeSequence, + charge.ChargeAmount, + charge.ChargeType, + charge.PaymentMethod, + charge.CardType, + charge.CardLastFour, + nullString(charge.MonarchTransactionID), + charge.IsMatched, + charge.MatchConfidence, + nullTime(charge.MatchedAt), + charge.SplitCount, + ) + if err != nil { + return fmt.Errorf("failed to insert charge: %w", err) + } + + chargeID, _ := result.LastInsertId() + charge.ID = chargeID + } + + return tx.Commit() +} + +// GetLatestLedger retrieves the most recent ledger for an order +func (s *Storage) GetLatestLedger(orderID string) (*OrderLedger, error) { + query := ` + SELECT id, order_id, sync_run_id, provider, fetched_at, ledger_state, + ledger_version, ledger_json, total_charged, charge_count, + payment_method_types, has_refunds, is_valid, validation_notes + FROM order_ledgers + WHERE order_id = ? + ORDER BY ledger_version DESC + LIMIT 1 + ` + + ledger := &OrderLedger{} + var syncRunID sql.NullInt64 + var validationNotes sql.NullString + + err := s.db.QueryRow(query, orderID).Scan( + &ledger.ID, + &ledger.OrderID, + &syncRunID, + &ledger.Provider, + &ledger.FetchedAt, + &ledger.LedgerState, + &ledger.LedgerVersion, + &ledger.LedgerJSON, + &ledger.TotalCharged, + &ledger.ChargeCount, + &ledger.PaymentMethodTypes, + &ledger.HasRefunds, + &ledger.IsValid, + &validationNotes, + ) + if err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err + } + + if syncRunID.Valid { + ledger.SyncRunID = syncRunID.Int64 + } + if validationNotes.Valid { + ledger.ValidationNotes = validationNotes.String + } + + // Load charges + charges, err := s.getChargesForLedger(ledger.ID) + if err != nil { + return nil, err + } + ledger.Charges = charges + + return ledger, nil +} + +// GetLedgerHistory retrieves all ledger snapshots for an order (newest first) +func (s *Storage) GetLedgerHistory(orderID string) ([]*OrderLedger, error) { + query := ` + SELECT id, order_id, sync_run_id, provider, fetched_at, ledger_state, + ledger_version, ledger_json, total_charged, charge_count, + payment_method_types, has_refunds, is_valid, validation_notes + FROM order_ledgers + WHERE order_id = ? + ORDER BY ledger_version DESC + ` + + rows, err := s.db.Query(query, orderID) + if err != nil { + return nil, err + } + defer func() { _ = rows.Close() }() + + var ledgers []*OrderLedger + for rows.Next() { + ledger := &OrderLedger{} + var syncRunID sql.NullInt64 + var validationNotes sql.NullString + + err := rows.Scan( + &ledger.ID, + &ledger.OrderID, + &syncRunID, + &ledger.Provider, + &ledger.FetchedAt, + &ledger.LedgerState, + &ledger.LedgerVersion, + &ledger.LedgerJSON, + &ledger.TotalCharged, + &ledger.ChargeCount, + &ledger.PaymentMethodTypes, + &ledger.HasRefunds, + &ledger.IsValid, + &validationNotes, + ) + if err != nil { + return nil, err + } + + if syncRunID.Valid { + ledger.SyncRunID = syncRunID.Int64 + } + if validationNotes.Valid { + ledger.ValidationNotes = validationNotes.String + } + + // Load charges for each ledger + charges, err := s.getChargesForLedger(ledger.ID) + if err != nil { + return nil, err + } + ledger.Charges = charges + + ledgers = append(ledgers, ledger) + } + + return ledgers, rows.Err() +} + +// GetLedgerByID retrieves a specific ledger by ID +func (s *Storage) GetLedgerByID(id int64) (*OrderLedger, error) { + query := ` + SELECT id, order_id, sync_run_id, provider, fetched_at, ledger_state, + ledger_version, ledger_json, total_charged, charge_count, + payment_method_types, has_refunds, is_valid, validation_notes + FROM order_ledgers + WHERE id = ? + ` + + ledger := &OrderLedger{} + var syncRunID sql.NullInt64 + var validationNotes sql.NullString + + err := s.db.QueryRow(query, id).Scan( + &ledger.ID, + &ledger.OrderID, + &syncRunID, + &ledger.Provider, + &ledger.FetchedAt, + &ledger.LedgerState, + &ledger.LedgerVersion, + &ledger.LedgerJSON, + &ledger.TotalCharged, + &ledger.ChargeCount, + &ledger.PaymentMethodTypes, + &ledger.HasRefunds, + &ledger.IsValid, + &validationNotes, + ) + if err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err + } + + if syncRunID.Valid { + ledger.SyncRunID = syncRunID.Int64 + } + if validationNotes.Valid { + ledger.ValidationNotes = validationNotes.String + } + + // Load charges + charges, err := s.getChargesForLedger(ledger.ID) + if err != nil { + return nil, err + } + ledger.Charges = charges + + return ledger, nil +} + +// ListLedgers returns ledgers matching the given filters with pagination +func (s *Storage) ListLedgers(filters LedgerFilters) (*LedgerListResult, error) { + // Set defaults + if filters.Limit <= 0 { + filters.Limit = 50 + } + if filters.Limit > 500 { + filters.Limit = 500 + } + + // Build WHERE clause + where := "WHERE 1=1" + args := []interface{}{} + + if filters.OrderID != "" { + where += " AND order_id = ?" + args = append(args, filters.OrderID) + } + if filters.Provider != "" { + where += " AND provider = ?" + args = append(args, filters.Provider) + } + if filters.State != "" { + where += " AND ledger_state = ?" + args = append(args, filters.State) + } + + // Get total count + countQuery := fmt.Sprintf("SELECT COUNT(*) FROM order_ledgers %s", where) + var totalCount int + if err := s.db.QueryRow(countQuery, args...).Scan(&totalCount); err != nil { + return nil, err + } + + // Get paginated results + query := fmt.Sprintf(` + SELECT id, order_id, sync_run_id, provider, fetched_at, ledger_state, + ledger_version, ledger_json, total_charged, charge_count, + payment_method_types, has_refunds, is_valid, validation_notes + FROM order_ledgers + %s + ORDER BY fetched_at DESC + LIMIT ? OFFSET ? + `, where) + + args = append(args, filters.Limit, filters.Offset) + + rows, err := s.db.Query(query, args...) + if err != nil { + return nil, err + } + defer func() { _ = rows.Close() }() + + var ledgers []*OrderLedger + for rows.Next() { + ledger := &OrderLedger{} + var syncRunID sql.NullInt64 + var validationNotes sql.NullString + + err := rows.Scan( + &ledger.ID, + &ledger.OrderID, + &syncRunID, + &ledger.Provider, + &ledger.FetchedAt, + &ledger.LedgerState, + &ledger.LedgerVersion, + &ledger.LedgerJSON, + &ledger.TotalCharged, + &ledger.ChargeCount, + &ledger.PaymentMethodTypes, + &ledger.HasRefunds, + &ledger.IsValid, + &validationNotes, + ) + if err != nil { + return nil, err + } + + if syncRunID.Valid { + ledger.SyncRunID = syncRunID.Int64 + } + if validationNotes.Valid { + ledger.ValidationNotes = validationNotes.String + } + + ledgers = append(ledgers, ledger) + } + + if err := rows.Err(); err != nil { + return nil, err + } + + return &LedgerListResult{ + Ledgers: ledgers, + TotalCount: totalCount, + Limit: filters.Limit, + Offset: filters.Offset, + }, nil +} + +// UpdateChargeMatch updates a ledger charge's match status +func (s *Storage) UpdateChargeMatch(chargeID int64, transactionID string, confidence float64, splitCount int) error { + query := ` + UPDATE ledger_charges + SET monarch_transaction_id = ?, + is_matched = 1, + match_confidence = ?, + matched_at = CURRENT_TIMESTAMP, + split_count = ? + WHERE id = ? + ` + + _, err := s.db.Exec(query, transactionID, confidence, splitCount, chargeID) + return err +} + +// GetUnmatchedCharges returns charges that haven't been matched to Monarch transactions +func (s *Storage) GetUnmatchedCharges(provider string, limit int) ([]LedgerCharge, error) { + if limit <= 0 { + limit = 100 + } + + var query string + var args []interface{} + + if provider != "" { + query = ` + SELECT c.id, c.order_ledger_id, c.order_id, c.sync_run_id, c.charge_sequence, + c.charge_amount, c.charge_type, c.payment_method, c.card_type, c.card_last_four, + c.monarch_transaction_id, c.is_matched, c.match_confidence, c.matched_at, c.split_count + FROM ledger_charges c + JOIN order_ledgers l ON c.order_ledger_id = l.id + WHERE c.is_matched = 0 + AND c.charge_type = 'payment' + AND l.provider = ? + ORDER BY l.fetched_at DESC + LIMIT ? + ` + args = []interface{}{provider, limit} + } else { + query = ` + SELECT c.id, c.order_ledger_id, c.order_id, c.sync_run_id, c.charge_sequence, + c.charge_amount, c.charge_type, c.payment_method, c.card_type, c.card_last_four, + c.monarch_transaction_id, c.is_matched, c.match_confidence, c.matched_at, c.split_count + FROM ledger_charges c + JOIN order_ledgers l ON c.order_ledger_id = l.id + WHERE c.is_matched = 0 + AND c.charge_type = 'payment' + ORDER BY l.fetched_at DESC + LIMIT ? + ` + args = []interface{}{limit} + } + + rows, err := s.db.Query(query, args...) + if err != nil { + return nil, err + } + defer func() { _ = rows.Close() }() + + var charges []LedgerCharge + for rows.Next() { + var charge LedgerCharge + var syncRunID sql.NullInt64 + var txID sql.NullString + var cardType, cardLastFour sql.NullString + var matchedAt sql.NullTime + + err := rows.Scan( + &charge.ID, + &charge.OrderLedgerID, + &charge.OrderID, + &syncRunID, + &charge.ChargeSequence, + &charge.ChargeAmount, + &charge.ChargeType, + &charge.PaymentMethod, + &cardType, + &cardLastFour, + &txID, + &charge.IsMatched, + &charge.MatchConfidence, + &matchedAt, + &charge.SplitCount, + ) + if err != nil { + return nil, err + } + + if syncRunID.Valid { + charge.SyncRunID = syncRunID.Int64 + } + if txID.Valid { + charge.MonarchTransactionID = txID.String + } + if cardType.Valid { + charge.CardType = cardType.String + } + if cardLastFour.Valid { + charge.CardLastFour = cardLastFour.String + } + if matchedAt.Valid { + charge.MatchedAt = matchedAt.Time + } + + charges = append(charges, charge) + } + + return charges, rows.Err() +} + +// getChargesForLedger retrieves all charges for a ledger +func (s *Storage) getChargesForLedger(ledgerID int64) ([]LedgerCharge, error) { + query := ` + SELECT id, order_ledger_id, order_id, sync_run_id, charge_sequence, + charge_amount, charge_type, payment_method, card_type, card_last_four, + monarch_transaction_id, is_matched, match_confidence, matched_at, split_count + FROM ledger_charges + WHERE order_ledger_id = ? + ORDER BY charge_sequence + ` + + rows, err := s.db.Query(query, ledgerID) + if err != nil { + return nil, err + } + defer func() { _ = rows.Close() }() + + var charges []LedgerCharge + for rows.Next() { + var charge LedgerCharge + var syncRunID sql.NullInt64 + var txID sql.NullString + var cardType, cardLastFour sql.NullString + var matchedAt sql.NullTime + + err := rows.Scan( + &charge.ID, + &charge.OrderLedgerID, + &charge.OrderID, + &syncRunID, + &charge.ChargeSequence, + &charge.ChargeAmount, + &charge.ChargeType, + &charge.PaymentMethod, + &cardType, + &cardLastFour, + &txID, + &charge.IsMatched, + &charge.MatchConfidence, + &matchedAt, + &charge.SplitCount, + ) + if err != nil { + return nil, err + } + + if syncRunID.Valid { + charge.SyncRunID = syncRunID.Int64 + } + if txID.Valid { + charge.MonarchTransactionID = txID.String + } + if cardType.Valid { + charge.CardType = cardType.String + } + if cardLastFour.Valid { + charge.CardLastFour = cardLastFour.String + } + if matchedAt.Valid { + charge.MatchedAt = matchedAt.Time + } + + charges = append(charges, charge) + } + + return charges, rows.Err() +} + +// Helper functions for nullable values +func nullInt64(v int64) interface{} { + if v == 0 { + return nil + } + return v +} + +func nullString(v string) interface{} { + if v == "" { + return nil + } + return v +} + +func nullTime(t time.Time) interface{} { + if t.IsZero() { + return nil + } + return t +} diff --git a/web/e2e/sync.spec.ts b/web/e2e/sync.spec.ts new file mode 100644 index 0000000..66c2bb7 --- /dev/null +++ b/web/e2e/sync.spec.ts @@ -0,0 +1,80 @@ +import { test, expect } from '@playwright/test' + +test.describe('Sync Page', () => { + test.beforeEach(async ({ page }) => { + await page.goto('/sync') + // Wait for page to fully load + await page.waitForLoadState('networkidle') + }) + + test('should display the sync page with title', async ({ page }) => { + // The page has an h1 with text "Sync" + await expect(page.locator('h1')).toContainText('Sync') + }) + + test('should have provider dropdown with options', async ({ page }) => { + // Find the provider select/dropdown + const providerSelect = page.locator('select').first() + await expect(providerSelect).toBeVisible() + + // Check that Walmart is the default selection + await expect(providerSelect).toHaveValue('walmart') + }) + + test('should have lookback days input', async ({ page }) => { + // Find the number input for lookback days (has value 14 by default) + const lookbackInput = page.locator('input[type="number"]').first() + await expect(lookbackInput).toBeVisible() + await expect(lookbackInput).toHaveValue('14') + }) + + test('should have start sync button', async ({ page }) => { + const startButton = page.getByRole('button', { name: /start sync/i }) + await expect(startButton).toBeVisible() + }) + + test('should have sync configuration section', async ({ page }) => { + // Check for "Sync Configuration" text + await expect(page.getByText('Sync Configuration')).toBeVisible() + }) + + test('should have provider options', async ({ page }) => { + const providerSelect = page.locator('select').first() + + // Check all three options exist + await expect(providerSelect.locator('option[value="walmart"]')).toBeAttached() + await expect(providerSelect.locator('option[value="costco"]')).toBeAttached() + await expect(providerSelect.locator('option[value="amazon"]')).toBeAttached() + }) + + test('should be able to change provider', async ({ page }) => { + const providerSelect = page.locator('select').first() + + // Change to costco + await providerSelect.selectOption('costco') + await expect(providerSelect).toHaveValue('costco') + + // Change to amazon + await providerSelect.selectOption('amazon') + await expect(providerSelect).toHaveValue('amazon') + }) + + test('should have sync navigation in sidebar', async ({ page }) => { + // Check that Sync link exists in sidebar + const syncLink = page.locator('a[href="/sync"]') + await expect(syncLink).toBeVisible() + }) + + test('should navigate to sync page from home', async ({ page }) => { + // Go to home first + await page.goto('/') + await page.waitForLoadState('networkidle') + + // Click on Sync in navigation + await page.locator('a[href="/sync"]').first().click() + + // Should be on sync page + await expect(page).toHaveURL(/\/sync/) + await expect(page.locator('h1')).toContainText('Sync') + }) +}) diff --git a/web/next.config.mjs b/web/next.config.mjs index 1d61478..8c59a49 100644 --- a/web/next.config.mjs +++ b/web/next.config.mjs @@ -1,4 +1,27 @@ /** @type {import('next').NextConfig} */ -const nextConfig = {} +const nextConfig = { + async headers() { + return [ + { + source: '/:path*.svg', + headers: [ + { + key: 'Cache-Control', + value: 'public, max-age=31536000, immutable', + }, + ], + }, + { + source: '/favicon.svg', + headers: [ + { + key: 'Cache-Control', + value: 'public, max-age=31536000, immutable', + }, + ], + }, + ] + }, +} export default nextConfig diff --git a/web/public/favicon.svg b/web/public/favicon.svg new file mode 100644 index 0000000..7995db6 --- /dev/null +++ b/web/public/favicon.svg @@ -0,0 +1,18 @@ + + Retail Sync + Sync icon with rotating arrows and receipt symbol for retail order synchronization + + + + + + + + + + + + + diff --git a/web/public/teams/retail-sync.svg b/web/public/teams/retail-sync.svg new file mode 100644 index 0000000..efbee44 --- /dev/null +++ b/web/public/teams/retail-sync.svg @@ -0,0 +1,19 @@ + + Retail Sync + Icon showing sync arrows with receipt symbol, representing retail order synchronization + + + + + + + + + + + + + + diff --git a/web/src/app/(app)/application-layout.tsx b/web/src/app/(app)/application-layout.tsx index fde5f2c..e529faa 100644 --- a/web/src/app/(app)/application-layout.tsx +++ b/web/src/app/(app)/application-layout.tsx @@ -35,6 +35,7 @@ import { HomeIcon, QuestionMarkCircleIcon, ShoppingCartIcon, + CloudArrowUpIcon, } from '@heroicons/react/20/solid' import { usePathname } from 'next/navigation' @@ -87,8 +88,8 @@ export function ApplicationLayout({ children }: { children: React.ReactNode }) { - - Monarch Sync + + Retail Sync @@ -98,6 +99,10 @@ export function ApplicationLayout({ children }: { children: React.ReactNode }) { Dashboard + + + Sync + Orders diff --git a/web/src/app/(app)/orders/page.tsx b/web/src/app/(app)/orders/page.tsx index fd95998..584ce87 100644 --- a/web/src/app/(app)/orders/page.tsx +++ b/web/src/app/(app)/orders/page.tsx @@ -139,6 +139,7 @@ export default async function OrdersPage({ searchParams }: PageProps) { + diff --git a/web/src/app/(app)/settings/page.tsx b/web/src/app/(app)/settings/page.tsx index d991a66..4bc2535 100644 --- a/web/src/app/(app)/settings/page.tsx +++ b/web/src/app/(app)/settings/page.tsx @@ -64,7 +64,7 @@ export default function Settings() {
Quick Start - Monarch Sync is a CLI tool that syncs your retail purchases with Monarch Money. Here's how to get started. + Retail Sync is a CLI tool that syncs your retail purchases with Monarch Money. Here's how to get started. diff --git a/web/src/app/(app)/sync/page.tsx b/web/src/app/(app)/sync/page.tsx new file mode 100644 index 0000000..6cac259 --- /dev/null +++ b/web/src/app/(app)/sync/page.tsx @@ -0,0 +1,541 @@ +'use client' + +import { Badge } from '@/components/badge' +import { Button } from '@/components/button' +import { Checkbox, CheckboxField } from '@/components/checkbox' +import { Divider } from '@/components/divider' +import { Fieldset, Label, Legend } from '@/components/fieldset' +import { Heading, Subheading } from '@/components/heading' +import { Input } from '@/components/input' +import { Select } from '@/components/select' +import { Table, TableBody, TableCell, TableHead, TableHeader, TableRow } from '@/components/table' +import { Text } from '@/components/text' +import { + cancelSyncJob, + getActiveSyncJobs, + getSyncJobs, + startSync, + type SyncJob, + type StartSyncRequest, +} from '@/lib/api' +import { + ArrowPathIcon, + ExclamationTriangleIcon, + InformationCircleIcon, + PlayIcon, + ShieldCheckIcon, + XMarkIcon, +} from '@heroicons/react/16/solid' +import { useCallback, useEffect, useState } from 'react' + +// Helper text component for form fields +function HelpText({ children }: { children: React.ReactNode }) { + return

{children}

+} + +function formatDate(dateString: string): string { + const date = new Date(dateString) + return date.toLocaleString('en-US', { + month: 'short', + day: 'numeric', + year: 'numeric', + hour: '2-digit', + minute: '2-digit', + }) +} + +function formatRelativeTime(dateString: string): string { + const date = new Date(dateString) + const now = new Date() + const diffMs = now.getTime() - date.getTime() + const diffMins = Math.floor(diffMs / 60000) + const diffHours = Math.floor(diffMins / 60) + const diffDays = Math.floor(diffHours / 24) + + if (diffMins < 1) return 'just now' + if (diffMins < 60) return `${diffMins}m ago` + if (diffHours < 24) return `${diffHours}h ago` + return `${diffDays}d ago` +} + +function StatusBadge({ status }: { status: string }) { + const colorMap: Record = { + completed: 'green', + failed: 'red', + running: 'cyan', + pending: 'amber', + cancelled: 'zinc', + } + const color = colorMap[status] || 'zinc' + return ( + + {status} + + ) +} + +function ProviderBadge({ provider }: { provider: string }) { + const colorMap: Record = { + walmart: 'blue', + costco: 'red', + amazon: 'orange', + } + const color = colorMap[provider] || 'zinc' + return {provider} +} + +function ProgressBar({ current, total }: { current: number; total: number }) { + const percentage = total > 0 ? (current / total) * 100 : 0 + return ( +
+
0 ? 10 : 0)}%` }} + > + {total > 0 && `${current}/${total}`} +
+
+ ) +} + +// Mobile-friendly job card component +function JobCard({ job, onCancel }: { job: SyncJob; onCancel: (jobId: string) => void }) { + return ( +
+
+
+ + + {job.dry_run && ( + Dry + )} +
+ {job.status === 'running' && ( + + )} +
+
+ {job.job_id.substring(0, 8)} + {formatRelativeTime(job.started_at)} +
+ {job.status === 'running' && ( +
+ + + {job.progress.current_phase} + {job.progress.errored_orders > 0 && ` (${job.progress.errored_orders} errors)`} + +
+ )} + {job.status !== 'running' && job.result && ( +
+ + {job.result.orders_processed} / {job.result.orders_found} orders + {job.result.orders_errored > 0 && ` (${job.result.orders_errored} errors)`} + +
+ )} +
+ ) +} + +export default function SyncPage() { + const [jobs, setJobs] = useState([]) + const [loading, setLoading] = useState(false) + const [submitting, setSubmitting] = useState(false) + const [error, setError] = useState(null) + const [success, setSuccess] = useState(null) + const [lastUpdated, setLastUpdated] = useState(null) + const [showAdvanced, setShowAdvanced] = useState(false) + + // Form state - load provider from localStorage + const [provider, setProvider] = useState<'walmart' | 'costco' | 'amazon'>(() => { + if (typeof window !== 'undefined') { + const saved = localStorage.getItem('sync_provider') + if (saved === 'walmart' || saved === 'costco' || saved === 'amazon') { + return saved + } + } + return 'walmart' + }) + const [dryRun, setDryRun] = useState(true) + const [lookbackDays, setLookbackDays] = useState(14) + const [maxOrders, setMaxOrders] = useState(undefined) + const [verbose, setVerbose] = useState(false) + const [force, setForce] = useState(false) + const [orderId, setOrderId] = useState('') + + // Save provider preference to localStorage + useEffect(() => { + if (typeof window !== 'undefined') { + localStorage.setItem('sync_provider', provider) + } + }, [provider]) + + // Auto-dismiss success/error messages after 5 seconds + useEffect(() => { + if (success || error) { + const timer = setTimeout(() => { + setSuccess(null) + setError(null) + }, 5000) + return () => clearTimeout(timer) + } + }, [success, error]) + + // Auto-refresh active jobs + useEffect(() => { + loadJobs() + const interval = setInterval(() => { + loadActiveJobs() + }, 3000) // Poll every 3 seconds + return () => clearInterval(interval) + // eslint-disable-next-line react-hooks/exhaustive-deps + }, []) + + async function loadJobs() { + try { + setLoading(true) + const data = await getSyncJobs() + setJobs(data.jobs) + setLastUpdated(new Date()) + setError(null) + } catch (err) { + setError(err instanceof Error ? err.message : 'Failed to load sync jobs') + } finally { + setLoading(false) + } + } + + const loadActiveJobs = useCallback(async () => { + try { + const data = await getActiveSyncJobs() + setLastUpdated(new Date()) + // Update only active jobs to avoid flickering + if (data.jobs.length > 0) { + setJobs((prevJobs) => { + const activeJobIds = new Set(data.jobs.map((j) => j.job_id)) + const inactiveJobs = prevJobs.filter((j) => !activeJobIds.has(j.job_id)) + return [...data.jobs, ...inactiveJobs] + }) + } + } catch { + // Silently fail on polling errors to avoid noise + } + }, []) + + async function handleSubmit(e: React.FormEvent) { + e.preventDefault() + setSubmitting(true) + setError(null) + setSuccess(null) + + const request: StartSyncRequest = { + provider, + dry_run: dryRun, + lookback_days: lookbackDays, + max_orders: maxOrders, + verbose, + force, + order_id: orderId || undefined, + } + + try { + const response = await startSync(request) + setSuccess(response.message) + // Reload jobs to show the new one + await loadJobs() + // Reset form to defaults (keep provider) + setDryRun(true) + setForce(false) + setOrderId('') + setMaxOrders(undefined) + } catch (err) { + setError(err instanceof Error ? err.message : 'Failed to start sync') + } finally { + setSubmitting(false) + } + } + + async function handleCancel(jobId: string) { + const confirmed = window.confirm( + 'Are you sure you want to cancel this sync job? This action cannot be undone.' + ) + if (!confirmed) { + return + } + + try { + await cancelSyncJob(jobId) + setSuccess('Job cancelled successfully') + await loadJobs() + } catch (err) { + setError(err instanceof Error ? err.message : 'Failed to cancel job') + } + } + + return ( + <> + Sync + Start a new sync job to import orders from your providers. + + {error && ( +
+ +

{error}

+
+ )} + + {success && ( +
+ +

{success}

+
+ )} + +
+
+ Sync Configuration + + {/* Prominent Dry Run Toggle */} +
+ + setDryRun(checked)} /> + + +

+ {dryRun + ? 'Changes will NOT be saved to Monarch Money. Use this to preview what would happen.' + : 'Changes WILL be saved to Monarch Money. Transaction splits will be created.'} +

+
+ + {/* Main form grid - 4 columns on large screens */} +
+
+ + + Your order history will be fetched from this provider +
+ +
+ + setLookbackDays(parseInt(e.target.value) || 14)} + min={1} + max={365} + /> + Import orders from the past X days (1-365) +
+ +
+ + setMaxOrders(e.target.value ? parseInt(e.target.value) : undefined)} + min={1} + placeholder="No limit" + /> + Limit number of orders to process +
+ +
+ + setOrderId(e.target.value)} + placeholder="Specific order" + /> + Process only this order ID +
+
+ + {/* Advanced Options Toggle */} +
+ + + {showAdvanced && ( +
+ + setForce(checked)} /> + + + + + setVerbose(checked)} /> + + +
+ )} +
+ + + +
+ +
+
+
+ + + +
+
+ Sync Jobs + {lastUpdated && ( + + Updated {formatRelativeTime(lastUpdated.toISOString())} + + )} +
+ +
+ + {/* Desktop Table View */} +
+ + + + Job ID + Provider + Status + Progress + Started + Actions + + + + {jobs.map((job) => ( + + {job.job_id.substring(0, 8)} + + + + +
+ + {job.dry_run && ( + Dry + )} +
+
+ + {job.status === 'running' ? ( +
+ + + {job.progress.current_phase} + {job.progress.errored_orders > 0 && ` (${job.progress.errored_orders} errors)`} + +
+ ) : job.result ? ( + + {job.result.orders_processed} / {job.result.orders_found} + {job.result.orders_errored > 0 && ` (${job.result.orders_errored} errors)`} + + ) : ( + - + )} +
+ {formatDate(job.started_at)} + + {job.status === 'running' && ( + + )} + +
+ ))} +
+
+
+ + {/* Mobile Card View */} +
+ {jobs.map((job) => ( + + ))} +
+ + {jobs.length === 0 && !loading && ( +
+
+ +
+ No sync jobs found yet. + + Configure your sync settings above and click Start Sync. + +
+ )} + + ) +} diff --git a/web/src/app/layout.tsx b/web/src/app/layout.tsx index 7d34965..b807f70 100644 --- a/web/src/app/layout.tsx +++ b/web/src/app/layout.tsx @@ -4,10 +4,13 @@ import { ThemeProvider } from '@/lib/theme-context' export const metadata: Metadata = { title: { - template: '%s - Monarch Sync', - default: 'Monarch Sync', + template: '%s - Retail Sync', + default: 'Retail Sync', + }, + description: 'Sync your Walmart, Costco, and Amazon orders with Monarch Money. Third-party tool, not affiliated with Monarch Money Inc.', + icons: { + icon: '/favicon.svg', }, - description: 'Sync your Walmart, Costco, and Amazon orders with Monarch Money', } export default async function RootLayout({ children }: { children: React.ReactNode }) { diff --git a/web/src/lib/api/client.ts b/web/src/lib/api/client.ts index 7b3b971..76c811f 100644 --- a/web/src/lib/api/client.ts +++ b/web/src/lib/api/client.ts @@ -1,17 +1,42 @@ -import { Order, OrderFilters, OrderListResponse, SyncRun, SyncRunListResponse, HealthResponse, StatsResponse } from './types' - -const API_BASE_URL = process.env.NEXT_PUBLIC_API_URL || 'http://localhost:8085' - -async function fetchJSON(url: string): Promise { +import { + Order, + OrderFilters, + OrderListResponse, + SyncRun, + SyncRunListResponse, + HealthResponse, + StatsResponse, + StartSyncRequest, + StartSyncResponse, + SyncJob, + SyncJobListResponse, +} from './types' + +const API_BASE_URL = process.env.NEXT_PUBLIC_API_URL || 'http://localhost:8080' + +async function fetchJSON(url: string, options?: RequestInit): Promise { const response = await fetch(url, { headers: { 'Content-Type': 'application/json', }, cache: 'no-store', + ...options, }) if (!response.ok) { - throw new Error(`API error: ${response.status} ${response.statusText}`) + // Try to parse error message from response body + let errorMessage = `API error: ${response.status} ${response.statusText}` + try { + const errorBody = await response.json() + if (errorBody.error) { + errorMessage = errorBody.error + } else if (errorBody.message) { + errorMessage = errorBody.message + } + } catch { + // If we can't parse the body, use the default message + } + throw new Error(errorMessage) } return response.json() @@ -85,3 +110,29 @@ export async function getOrderStats(): Promise { totalAmount, } } + +// Sync Job API functions +export async function startSync(request: StartSyncRequest): Promise { + return fetchJSON(`${API_BASE_URL}/api/sync`, { + method: 'POST', + body: JSON.stringify(request), + }) +} + +export async function getSyncJobs(): Promise { + return fetchJSON(`${API_BASE_URL}/api/sync`) +} + +export async function getActiveSyncJobs(): Promise { + return fetchJSON(`${API_BASE_URL}/api/sync/active`) +} + +export async function getSyncJob(jobId: string): Promise { + return fetchJSON(`${API_BASE_URL}/api/sync/${encodeURIComponent(jobId)}`) +} + +export async function cancelSyncJob(jobId: string): Promise { + await fetchJSON(`${API_BASE_URL}/api/sync/${encodeURIComponent(jobId)}`, { + method: 'DELETE', + }) +} diff --git a/web/src/lib/api/types.ts b/web/src/lib/api/types.ts index 7f63df8..4c26ff3 100644 --- a/web/src/lib/api/types.ts +++ b/web/src/lib/api/types.ts @@ -93,3 +93,53 @@ export interface StatsResponse { total_splits: number provider_stats: ProviderStats[] } + +// Sync Job Types +export interface StartSyncRequest { + provider: 'walmart' | 'costco' | 'amazon' + dry_run?: boolean + lookback_days?: number + max_orders?: number + verbose?: boolean + order_id?: string + force?: boolean +} + +export interface StartSyncResponse { + job_id: string + message: string +} + +export interface SyncJobProgress { + current_phase: string + total_orders: number + processed_orders: number + skipped_orders: number + errored_orders: number +} + +export interface SyncJobResult { + orders_found: number + orders_processed: number + orders_skipped: number + orders_errored: number + dry_run: boolean +} + +export interface SyncJob { + job_id: string + provider: string + status: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled' + dry_run: boolean + progress: SyncJobProgress + request?: StartSyncRequest + started_at: string + completed_at?: string + result?: SyncJobResult + error?: string +} + +export interface SyncJobListResponse { + jobs: SyncJob[] + count: number +}