From 2ee78a0e474b8ed895bc11e5aa27eae74cf79d86 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 1 Mar 2026 08:47:25 +0000 Subject: [PATCH 1/7] Initial plan From 840f59bce17836ab330c1c24c85bb52ad690c5c5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 1 Mar 2026 08:54:46 +0000 Subject: [PATCH 2/7] Add PostgreSQL implementations for EventStore, APIKeyStore, IdempotencyStore, and DLQStore Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- store/pg_api_keys.go | 231 ++++++++++++++++++++++++++++ store/pg_dlq.go | 328 ++++++++++++++++++++++++++++++++++++++++ store/pg_event_store.go | 221 +++++++++++++++++++++++++++ store/pg_idempotency.go | 122 +++++++++++++++ 4 files changed, 902 insertions(+) create mode 100644 store/pg_api_keys.go create mode 100644 store/pg_dlq.go create mode 100644 store/pg_event_store.go create mode 100644 store/pg_idempotency.go diff --git a/store/pg_api_keys.go b/store/pg_api_keys.go new file mode 100644 index 00000000..a76cceec --- /dev/null +++ b/store/pg_api_keys.go @@ -0,0 +1,231 @@ +package store + +import ( + "context" + "encoding/json" + "fmt" + "sort" + "time" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" +) + +// PGAPIKeyStore implements APIKeyStore backed by PostgreSQL using pgxpool. +type PGAPIKeyStore struct { + pool *pgxpool.Pool +} + +// NewPGAPIKeyStore creates a new PGAPIKeyStore backed by the given connection pool +// and ensures the required schema exists. +func NewPGAPIKeyStore(pool *pgxpool.Pool) (*PGAPIKeyStore, error) { + s := &PGAPIKeyStore{pool: pool} + if err := s.createTable(context.Background()); err != nil { + return nil, err + } + return s, nil +} + +func (s *PGAPIKeyStore) createTable(ctx context.Context) error { + _, err := s.pool.Exec(ctx, ` + CREATE TABLE IF NOT EXISTS api_keys ( + id UUID PRIMARY KEY, + name TEXT NOT NULL, + key_hash TEXT NOT NULL UNIQUE, + key_prefix TEXT NOT NULL, + company_id UUID NOT NULL, + org_id UUID, + project_id UUID, + permissions JSONB NOT NULL DEFAULT '[]', + created_by UUID NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + expires_at TIMESTAMPTZ, + last_used_at TIMESTAMPTZ, + is_active BOOLEAN NOT NULL DEFAULT TRUE + ); + CREATE INDEX IF NOT EXISTS idx_api_keys_key_hash ON api_keys(key_hash); + CREATE INDEX IF NOT EXISTS idx_api_keys_company_id ON api_keys(company_id); + `) + if err != nil { + return fmt.Errorf("create api_keys table: %w", err) + } + return nil +} + +func (s *PGAPIKeyStore) Create(ctx context.Context, key *APIKey) (string, error) { + rawKey, err := generateRawKey() + if err != nil { + return "", err + } + + if key.ID == uuid.Nil { + key.ID = uuid.New() + } + key.KeyHash = hashKey(rawKey) + key.KeyPrefix = rawKey[:len(apiKeyPrefix)+8] + key.CreatedAt = time.Now() + if key.Permissions == nil { + key.Permissions = []string{} + } + + permsJSON, err := json.Marshal(key.Permissions) + if err != nil { + return "", fmt.Errorf("marshal permissions: %w", err) + } + + _, err = s.pool.Exec(ctx, ` + INSERT INTO api_keys (id, name, key_hash, key_prefix, company_id, org_id, project_id, + permissions, created_by, created_at, expires_at, last_used_at, is_active) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)`, + key.ID, + key.Name, + key.KeyHash, + key.KeyPrefix, + key.CompanyID, + key.OrgID, + key.ProjectID, + permsJSON, + key.CreatedBy, + key.CreatedAt, + key.ExpiresAt, + key.LastUsedAt, + key.IsActive, + ) + if err != nil { + return "", fmt.Errorf("insert api key: %w", err) + } + return rawKey, nil +} + +func (s *PGAPIKeyStore) Get(ctx context.Context, id uuid.UUID) (*APIKey, error) { + rows, err := s.pool.Query(ctx, ` + SELECT id, name, key_hash, key_prefix, company_id, org_id, project_id, + permissions, created_by, created_at, expires_at, last_used_at, is_active + FROM api_keys WHERE id = $1`, id) + if err != nil { + return nil, fmt.Errorf("query api key: %w", err) + } + defer rows.Close() + return scanPGAPIKeyOne(rows) +} + +func (s *PGAPIKeyStore) GetByHash(ctx context.Context, keyHash string) (*APIKey, error) { + rows, err := s.pool.Query(ctx, ` + SELECT id, name, key_hash, key_prefix, company_id, org_id, project_id, + permissions, created_by, created_at, expires_at, last_used_at, is_active + FROM api_keys WHERE key_hash = $1`, keyHash) + if err != nil { + return nil, fmt.Errorf("query api key by hash: %w", err) + } + defer rows.Close() + return scanPGAPIKeyOne(rows) +} + +func (s *PGAPIKeyStore) List(ctx context.Context, companyID uuid.UUID) ([]*APIKey, error) { + rows, err := s.pool.Query(ctx, ` + SELECT id, name, key_hash, key_prefix, company_id, org_id, project_id, + permissions, created_by, created_at, expires_at, last_used_at, is_active + FROM api_keys WHERE company_id = $1 ORDER BY created_at ASC`, companyID) + if err != nil { + return nil, fmt.Errorf("query api keys: %w", err) + } + defer rows.Close() + + var results []*APIKey + for rows.Next() { + k, err := scanPGAPIKey(rows) + if err != nil { + return nil, err + } + results = append(results, k) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate api keys: %w", err) + } + sort.Slice(results, func(i, j int) bool { + return results[i].CreatedAt.Before(results[j].CreatedAt) + }) + return results, nil +} + +func (s *PGAPIKeyStore) Delete(ctx context.Context, id uuid.UUID) error { + tag, err := s.pool.Exec(ctx, `DELETE FROM api_keys WHERE id = $1`, id) + if err != nil { + return fmt.Errorf("delete api key: %w", err) + } + if tag.RowsAffected() == 0 { + return ErrNotFound + } + return nil +} + +func (s *PGAPIKeyStore) UpdateLastUsed(ctx context.Context, id uuid.UUID) error { + tag, err := s.pool.Exec(ctx, + `UPDATE api_keys SET last_used_at = NOW() WHERE id = $1`, id) + if err != nil { + return fmt.Errorf("update last_used_at: %w", err) + } + if tag.RowsAffected() == 0 { + return ErrNotFound + } + return nil +} + +func (s *PGAPIKeyStore) Validate(ctx context.Context, rawKey string) (*APIKey, error) { + h := hashKey(rawKey) + k, err := s.GetByHash(ctx, h) + if err != nil { + return nil, ErrNotFound + } + if !k.IsActive { + return nil, ErrKeyInactive + } + if k.ExpiresAt != nil && k.ExpiresAt.Before(time.Now()) { + return nil, ErrKeyExpired + } + return k, nil +} + +// --------------------------------------------------------------------------- +// PostgreSQL scan helpers +// --------------------------------------------------------------------------- + +func scanPGAPIKeyOne(rows pgx.Rows) (*APIKey, error) { + if !rows.Next() { + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("query api key: %w", err) + } + return nil, ErrNotFound + } + return scanPGAPIKey(rows) +} + +func scanPGAPIKey(rows pgx.Rows) (*APIKey, error) { + var k APIKey + var permsJSON []byte + err := rows.Scan( + &k.ID, &k.Name, &k.KeyHash, &k.KeyPrefix, + &k.CompanyID, &k.OrgID, &k.ProjectID, + &permsJSON, &k.CreatedBy, &k.CreatedAt, + &k.ExpiresAt, &k.LastUsedAt, &k.IsActive, + ) + if err != nil { + return nil, fmt.Errorf("scan api key: %w", err) + } + if permsJSON != nil { + if err := json.Unmarshal(permsJSON, &k.Permissions); err != nil { + return nil, fmt.Errorf("unmarshal permissions: %w", err) + } + } + if k.Permissions == nil { + k.Permissions = []string{} + } + return &k, nil +} + +// --------------------------------------------------------------------------- +// Compile-time interface assertion +// --------------------------------------------------------------------------- + +var _ APIKeyStore = (*PGAPIKeyStore)(nil) diff --git a/store/pg_dlq.go b/store/pg_dlq.go new file mode 100644 index 00000000..7931d349 --- /dev/null +++ b/store/pg_dlq.go @@ -0,0 +1,328 @@ +package store + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" +) + +// PGDLQStore implements DLQStore backed by PostgreSQL using pgxpool. +type PGDLQStore struct { + pool *pgxpool.Pool +} + +// NewPGDLQStore creates a new PGDLQStore backed by the given connection pool +// and ensures the required schema exists. +func NewPGDLQStore(pool *pgxpool.Pool) (*PGDLQStore, error) { + s := &PGDLQStore{pool: pool} + if err := s.init(context.Background()); err != nil { + return nil, err + } + return s, nil +} + +func (s *PGDLQStore) init(ctx context.Context) error { + _, err := s.pool.Exec(ctx, ` + CREATE TABLE IF NOT EXISTS dlq_entries ( + id UUID PRIMARY KEY, + original_event JSONB, + pipeline_name TEXT NOT NULL, + step_name TEXT NOT NULL, + error_message TEXT NOT NULL, + error_type TEXT NOT NULL, + retry_count INTEGER NOT NULL DEFAULT 0, + max_retries INTEGER NOT NULL DEFAULT 0, + status TEXT NOT NULL DEFAULT 'pending', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + resolved_at TIMESTAMPTZ, + metadata JSONB + ); + CREATE INDEX IF NOT EXISTS idx_dlq_entries_pipeline_name ON dlq_entries(pipeline_name); + CREATE INDEX IF NOT EXISTS idx_dlq_entries_step_name ON dlq_entries(step_name); + CREATE INDEX IF NOT EXISTS idx_dlq_entries_status ON dlq_entries(status); + CREATE INDEX IF NOT EXISTS idx_dlq_entries_error_type ON dlq_entries(error_type); + CREATE INDEX IF NOT EXISTS idx_dlq_entries_created_at ON dlq_entries(created_at); + CREATE INDEX IF NOT EXISTS idx_dlq_entries_updated_at ON dlq_entries(updated_at); + `) + if err != nil { + return fmt.Errorf("create dlq_entries table: %w", err) + } + return nil +} + +func (s *PGDLQStore) Add(ctx context.Context, entry *DLQEntry) error { + if entry.ID == uuid.Nil { + entry.ID = uuid.New() + } + now := time.Now().UTC() + entry.CreatedAt = now + entry.UpdatedAt = now + if entry.Status == "" { + entry.Status = DLQStatusPending + } + + var metadataJSON []byte + if entry.Metadata != nil { + var err error + metadataJSON, err = json.Marshal(entry.Metadata) + if err != nil { + return fmt.Errorf("marshal metadata: %w", err) + } + } + + var originalEvent []byte + if entry.OriginalEvent != nil { + originalEvent = []byte(entry.OriginalEvent) + } + + _, err := s.pool.Exec(ctx, + `INSERT INTO dlq_entries (id, original_event, pipeline_name, step_name, error_message, error_type, + retry_count, max_retries, status, created_at, updated_at, resolved_at, metadata) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)`, + entry.ID, + originalEvent, + entry.PipelineName, + entry.StepName, + entry.ErrorMessage, + entry.ErrorType, + entry.RetryCount, + entry.MaxRetries, + string(entry.Status), + entry.CreatedAt, + entry.UpdatedAt, + entry.ResolvedAt, + metadataJSON, + ) + if err != nil { + return fmt.Errorf("insert dlq entry: %w", err) + } + return nil +} + +func (s *PGDLQStore) Get(ctx context.Context, id uuid.UUID) (*DLQEntry, error) { + rows, err := s.pool.Query(ctx, + `SELECT id, original_event, pipeline_name, step_name, error_message, error_type, + retry_count, max_retries, status, created_at, updated_at, resolved_at, metadata + FROM dlq_entries WHERE id = $1`, + id, + ) + if err != nil { + return nil, fmt.Errorf("query dlq entry: %w", err) + } + defer rows.Close() + + if !rows.Next() { + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("query dlq entry: %w", err) + } + return nil, ErrNotFound + } + return scanPGDLQEntry(rows) +} + +func (s *PGDLQStore) List(ctx context.Context, filter DLQFilter) ([]*DLQEntry, error) { + query, args := buildPGDLQQuery( + `SELECT id, original_event, pipeline_name, step_name, error_message, error_type, + retry_count, max_retries, status, created_at, updated_at, resolved_at, metadata + FROM dlq_entries`, + filter, + ) + query += " ORDER BY created_at DESC" + + if filter.Limit > 0 { + query += fmt.Sprintf(" LIMIT $%d", len(args)+1) + args = append(args, filter.Limit) + if filter.Offset > 0 { + query += fmt.Sprintf(" OFFSET $%d", len(args)+1) + args = append(args, filter.Offset) + } + } else if filter.Offset > 0 { + query += fmt.Sprintf(" OFFSET $%d", len(args)+1) + args = append(args, filter.Offset) + } + + rows, err := s.pool.Query(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("query dlq entries: %w", err) + } + defer rows.Close() + + var results []*DLQEntry + for rows.Next() { + entry, err := scanPGDLQEntry(rows) + if err != nil { + return nil, err + } + results = append(results, entry) + } + if err := rows.Err(); err != nil { + return nil, err + } + if results == nil { + results = []*DLQEntry{} + } + return results, nil +} + +func (s *PGDLQStore) Count(ctx context.Context, filter DLQFilter) (int64, error) { + query, args := buildPGDLQQuery(`SELECT COUNT(*) FROM dlq_entries`, filter) + + var count int64 + err := s.pool.QueryRow(ctx, query, args...).Scan(&count) + if err != nil { + return 0, fmt.Errorf("count dlq entries: %w", err) + } + return count, nil +} + +func (s *PGDLQStore) UpdateStatus(ctx context.Context, id uuid.UUID, status DLQStatus) error { + tag, err := s.pool.Exec(ctx, + `UPDATE dlq_entries SET status = $2, updated_at = NOW() WHERE id = $1`, + id, string(status), + ) + if err != nil { + return fmt.Errorf("update dlq status: %w", err) + } + if tag.RowsAffected() == 0 { + return ErrNotFound + } + return nil +} + +func (s *PGDLQStore) Retry(ctx context.Context, id uuid.UUID) error { + tag, err := s.pool.Exec(ctx, + `UPDATE dlq_entries SET retry_count = retry_count + 1, status = $2, updated_at = NOW() WHERE id = $1`, + id, string(DLQStatusRetrying), + ) + if err != nil { + return fmt.Errorf("retry dlq entry: %w", err) + } + if tag.RowsAffected() == 0 { + return ErrNotFound + } + return nil +} + +func (s *PGDLQStore) Discard(ctx context.Context, id uuid.UUID) error { + tag, err := s.pool.Exec(ctx, + `UPDATE dlq_entries SET status = $2, updated_at = NOW() WHERE id = $1`, + id, string(DLQStatusDiscarded), + ) + if err != nil { + return fmt.Errorf("discard dlq entry: %w", err) + } + if tag.RowsAffected() == 0 { + return ErrNotFound + } + return nil +} + +func (s *PGDLQStore) Resolve(ctx context.Context, id uuid.UUID) error { + tag, err := s.pool.Exec(ctx, + `UPDATE dlq_entries SET status = $2, resolved_at = NOW(), updated_at = NOW() WHERE id = $1`, + id, string(DLQStatusResolved), + ) + if err != nil { + return fmt.Errorf("resolve dlq entry: %w", err) + } + if tag.RowsAffected() == 0 { + return ErrNotFound + } + return nil +} + +func (s *PGDLQStore) Purge(ctx context.Context, olderThan time.Duration) (int64, error) { + cutoff := time.Now().UTC().Add(-olderThan) + tag, err := s.pool.Exec(ctx, + `DELETE FROM dlq_entries WHERE status IN ($1, $2) AND updated_at < $3`, + string(DLQStatusResolved), string(DLQStatusDiscarded), cutoff, + ) + if err != nil { + return 0, fmt.Errorf("purge dlq entries: %w", err) + } + return tag.RowsAffected(), nil +} + +// --------------------------------------------------------------------------- +// PostgreSQL helpers +// --------------------------------------------------------------------------- + +// buildPGDLQQuery constructs a WHERE clause using PostgreSQL $N placeholders. +func buildPGDLQQuery(base string, filter DLQFilter) (string, []any) { + var conditions []string + var args []any + idx := 1 + + if filter.PipelineName != "" { + conditions = append(conditions, fmt.Sprintf("pipeline_name = $%d", idx)) + args = append(args, filter.PipelineName) + idx++ + } + if filter.StepName != "" { + conditions = append(conditions, fmt.Sprintf("step_name = $%d", idx)) + args = append(args, filter.StepName) + idx++ + } + if filter.Status != "" { + conditions = append(conditions, fmt.Sprintf("status = $%d", idx)) + args = append(args, string(filter.Status)) + idx++ + } + if filter.ErrorType != "" { + conditions = append(conditions, fmt.Sprintf("error_type = $%d", idx)) + args = append(args, filter.ErrorType) + } + + query := base + if len(conditions) > 0 { + query += " WHERE " + for i, c := range conditions { + if i > 0 { + query += " AND " + } + query += c + } + } + return query, args +} + +// scanPGDLQEntry scans a DLQ entry from pgx.Rows. +func scanPGDLQEntry(rows pgx.Rows) (*DLQEntry, error) { + var entry DLQEntry + var statusStr string + var originalEvent, metadataJSON []byte + + err := rows.Scan( + &entry.ID, &originalEvent, + &entry.PipelineName, &entry.StepName, + &entry.ErrorMessage, &entry.ErrorType, + &entry.RetryCount, &entry.MaxRetries, + &statusStr, &entry.CreatedAt, &entry.UpdatedAt, + &entry.ResolvedAt, &metadataJSON, + ) + if err != nil { + return nil, fmt.Errorf("scan dlq entry: %w", err) + } + + entry.Status = DLQStatus(statusStr) + if originalEvent != nil { + entry.OriginalEvent = json.RawMessage(originalEvent) + } + if metadataJSON != nil { + _ = json.Unmarshal(metadataJSON, &entry.Metadata) + } + + return &entry, nil +} + +// --------------------------------------------------------------------------- +// Compile-time interface assertion +// --------------------------------------------------------------------------- + +var _ DLQStore = (*PGDLQStore)(nil) diff --git a/store/pg_event_store.go b/store/pg_event_store.go new file mode 100644 index 00000000..ca966ac5 --- /dev/null +++ b/store/pg_event_store.go @@ -0,0 +1,221 @@ +package store + +import ( + "context" + "fmt" + "sort" + "time" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgxpool" +) + +// PGEventStore implements EventStore backed by PostgreSQL using pgxpool. +type PGEventStore struct { + pool *pgxpool.Pool +} + +// NewPGEventStore creates a new PGEventStore backed by the given connection pool +// and ensures the required schema exists. +func NewPGEventStore(pool *pgxpool.Pool) (*PGEventStore, error) { + s := &PGEventStore{pool: pool} + if err := s.init(context.Background()); err != nil { + return nil, err + } + return s, nil +} + +// init creates the execution_events table and indexes. +func (s *PGEventStore) init(ctx context.Context) error { + _, err := s.pool.Exec(ctx, ` + CREATE TABLE IF NOT EXISTS execution_events ( + id UUID PRIMARY KEY, + execution_id UUID NOT NULL, + sequence_num BIGINT NOT NULL, + event_type TEXT NOT NULL, + event_data JSONB, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE(execution_id, sequence_num) + ); + CREATE INDEX IF NOT EXISTS idx_execution_events_execution_id ON execution_events(execution_id); + CREATE INDEX IF NOT EXISTS idx_execution_events_event_type ON execution_events(event_type); + CREATE INDEX IF NOT EXISTS idx_execution_events_created_at ON execution_events(created_at); + `) + if err != nil { + return fmt.Errorf("create execution_events table: %w", err) + } + return nil +} + +func (s *PGEventStore) Append(ctx context.Context, executionID uuid.UUID, eventType string, data map[string]any) error { + tx, err := s.pool.Begin(ctx) + if err != nil { + return fmt.Errorf("begin tx: %w", err) + } + defer func() { _ = tx.Rollback(ctx) }() + + // Get next sequence number for this execution. + var maxSeq *int64 + err = tx.QueryRow(ctx, + `SELECT MAX(sequence_num) FROM execution_events WHERE execution_id = $1`, + executionID, + ).Scan(&maxSeq) + if err != nil { + return fmt.Errorf("get max sequence: %w", err) + } + + seq := int64(1) + if maxSeq != nil { + seq = *maxSeq + 1 + } + + id := uuid.New() + now := time.Now().UTC() + + _, err = tx.Exec(ctx, + `INSERT INTO execution_events (id, execution_id, sequence_num, event_type, event_data, created_at) + VALUES ($1, $2, $3, $4, $5, $6)`, + id, executionID, seq, eventType, data, now, + ) + if err != nil { + return fmt.Errorf("insert event: %w", err) + } + + return tx.Commit(ctx) +} + +func (s *PGEventStore) GetEvents(ctx context.Context, executionID uuid.UUID) ([]ExecutionEvent, error) { + rows, err := s.pool.Query(ctx, + `SELECT id, execution_id, sequence_num, event_type, event_data, created_at + FROM execution_events + WHERE execution_id = $1 + ORDER BY sequence_num ASC`, + executionID, + ) + if err != nil { + return nil, fmt.Errorf("query events: %w", err) + } + defer rows.Close() + + var events []ExecutionEvent + for rows.Next() { + var ev ExecutionEvent + var data []byte + if err := rows.Scan(&ev.ID, &ev.ExecutionID, &ev.SequenceNum, &ev.EventType, &data, &ev.CreatedAt); err != nil { + return nil, fmt.Errorf("scan event: %w", err) + } + if data != nil { + ev.EventData = data + } + events = append(events, ev) + } + return events, rows.Err() +} + +func (s *PGEventStore) GetTimeline(ctx context.Context, executionID uuid.UUID) (*MaterializedExecution, error) { + events, err := s.GetEvents(ctx, executionID) + if err != nil { + return nil, err + } + if len(events) == 0 { + return nil, ErrNotFound + } + m := materialize(events) + if m == nil { + return nil, ErrNotFound + } + return m, nil +} + +func (s *PGEventStore) ListExecutions(ctx context.Context, filter ExecutionEventFilter) ([]MaterializedExecution, error) { + // Get distinct execution IDs. + rows, err := s.pool.Query(ctx, + `SELECT DISTINCT execution_id FROM execution_events ORDER BY execution_id`) + if err != nil { + return nil, fmt.Errorf("query execution IDs: %w", err) + } + defer rows.Close() + + var execIDs []uuid.UUID + for rows.Next() { + var id uuid.UUID + if err := rows.Scan(&id); err != nil { + return nil, fmt.Errorf("scan execution ID: %w", err) + } + execIDs = append(execIDs, id) + } + if err := rows.Err(); err != nil { + return nil, err + } + + var results []MaterializedExecution + for _, eid := range execIDs { + events, err := s.GetEvents(ctx, eid) + if err != nil { + return nil, err + } + m := materialize(events) + if m == nil { + continue + } + + // Apply filters. + if filter.Pipeline != "" && m.Pipeline != filter.Pipeline { + continue + } + if filter.TenantID != "" && m.TenantID != filter.TenantID { + continue + } + if filter.Status != "" && m.Status != filter.Status { + continue + } + if filter.Since != nil && (m.StartedAt == nil || m.StartedAt.Before(*filter.Since)) { + continue + } + if filter.Until != nil && (m.StartedAt == nil || m.StartedAt.After(*filter.Until)) { + continue + } + + results = append(results, *m) + } + + // Sort by started time descending. + sortExecutions(results) + + // Apply offset/limit. + if filter.Offset > 0 { + if filter.Offset >= len(results) { + return nil, nil + } + results = results[filter.Offset:] + } + if filter.Limit > 0 && filter.Limit < len(results) { + results = results[:filter.Limit] + } + + return results, nil +} + +// sortExecutions sorts MaterializedExecution slice by StartedAt descending. +func sortExecutions(results []MaterializedExecution) { + sort.Slice(results, func(i, j int) bool { + ti := results[i].StartedAt + tj := results[j].StartedAt + if ti == nil && tj == nil { + return false + } + if ti == nil { + return false + } + if tj == nil { + return true + } + return ti.After(*tj) + }) +} + +// --------------------------------------------------------------------------- +// Compile-time interface assertion +// --------------------------------------------------------------------------- + +var _ EventStore = (*PGEventStore)(nil) diff --git a/store/pg_idempotency.go b/store/pg_idempotency.go new file mode 100644 index 00000000..24b675e5 --- /dev/null +++ b/store/pg_idempotency.go @@ -0,0 +1,122 @@ +package store + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/jackc/pgx/v5/pgxpool" +) + +// PGIdempotencyStore implements IdempotencyStore backed by PostgreSQL using pgxpool. +type PGIdempotencyStore struct { + pool *pgxpool.Pool +} + +// NewPGIdempotencyStore creates a new PGIdempotencyStore backed by the given +// connection pool and ensures the required schema exists. +func NewPGIdempotencyStore(pool *pgxpool.Pool) (*PGIdempotencyStore, error) { + s := &PGIdempotencyStore{pool: pool} + if err := s.createTable(context.Background()); err != nil { + return nil, err + } + return s, nil +} + +func (s *PGIdempotencyStore) createTable(ctx context.Context) error { + _, err := s.pool.Exec(ctx, ` + CREATE TABLE IF NOT EXISTS idempotency_keys ( + key TEXT PRIMARY KEY, + execution_id UUID NOT NULL, + step_name TEXT NOT NULL, + result JSONB, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + expires_at TIMESTAMPTZ NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_idempotency_expires ON idempotency_keys(expires_at); + `) + if err != nil { + return fmt.Errorf("create idempotency_keys table: %w", err) + } + return nil +} + +func (s *PGIdempotencyStore) Check(ctx context.Context, key string) (*IdempotencyRecord, error) { + rows, err := s.pool.Query(ctx, + `SELECT key, execution_id, step_name, result, created_at, expires_at + FROM idempotency_keys + WHERE key = $1 AND expires_at > NOW()`, + key, + ) + if err != nil { + return nil, fmt.Errorf("query idempotency key: %w", err) + } + defer rows.Close() + + if !rows.Next() { + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("query idempotency key: %w", err) + } + return nil, nil // key not found or expired + } + + var rec IdempotencyRecord + var resultJSON []byte + err = rows.Scan(&rec.Key, &rec.ExecutionID, &rec.StepName, &resultJSON, &rec.CreatedAt, &rec.ExpiresAt) + if err != nil { + return nil, fmt.Errorf("scan idempotency key: %w", err) + } + if resultJSON != nil { + rec.Result = json.RawMessage(resultJSON) + } + return &rec, nil +} + +func (s *PGIdempotencyStore) Store(ctx context.Context, record *IdempotencyRecord) error { + if record.Key == "" { + return fmt.Errorf("idempotency key is required") + } + if record.CreatedAt.IsZero() { + record.CreatedAt = time.Now() + } + + var resultJSON []byte + if record.Result != nil { + resultJSON = []byte(record.Result) + } + + _, err := s.pool.Exec(ctx, + `INSERT INTO idempotency_keys (key, execution_id, step_name, result, created_at, expires_at) + VALUES ($1, $2, $3, $4, $5, $6)`, + record.Key, + record.ExecutionID, + record.StepName, + resultJSON, + record.CreatedAt, + record.ExpiresAt, + ) + if err != nil { + if isDuplicateError(err) { + return ErrDuplicate + } + return fmt.Errorf("insert idempotency key: %w", err) + } + return nil +} + +func (s *PGIdempotencyStore) Cleanup(ctx context.Context) (int64, error) { + tag, err := s.pool.Exec(ctx, + `DELETE FROM idempotency_keys WHERE expires_at <= NOW()`, + ) + if err != nil { + return 0, fmt.Errorf("cleanup expired keys: %w", err) + } + return tag.RowsAffected(), nil +} + +// --------------------------------------------------------------------------- +// Compile-time interface assertion +// --------------------------------------------------------------------------- + +var _ IdempotencyStore = (*PGIdempotencyStore)(nil) From 31c00fb82d8ea141c8e84e7ce6b2a286078ab137 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 1 Mar 2026 08:55:41 +0000 Subject: [PATCH 3/7] Remove redundant sort in PGAPIKeyStore.List (already sorted by query) Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- store/pg_api_keys.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/store/pg_api_keys.go b/store/pg_api_keys.go index a76cceec..56014551 100644 --- a/store/pg_api_keys.go +++ b/store/pg_api_keys.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "sort" "time" "github.com/google/uuid" @@ -143,9 +142,6 @@ func (s *PGAPIKeyStore) List(ctx context.Context, companyID uuid.UUID) ([]*APIKe if err := rows.Err(); err != nil { return nil, fmt.Errorf("iterate api keys: %w", err) } - sort.Slice(results, func(i, j int) bool { - return results[i].CreatedAt.Before(results[j].CreatedAt) - }) return results, nil } From 45d60bce14200040682043b5c0bb80bef3370253 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 1 Mar 2026 08:59:23 +0000 Subject: [PATCH 4/7] Add PostgreSQL implementations for EventStore, APIKeyStore, IdempotencyStore, and DLQStore Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index db1134f3..2b2c47b8 100644 --- a/go.mod +++ b/go.mod @@ -50,6 +50,7 @@ require ( github.com/redis/go-redis/v9 v9.18.0 github.com/stripe/stripe-go/v82 v82.5.1 github.com/tliron/glsp v0.2.2 + github.com/xdg-go/scram v1.2.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0 go.opentelemetry.io/otel v1.40.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.40.0 @@ -208,7 +209,6 @@ require ( github.com/tliron/commonlog v0.2.8 // indirect github.com/tliron/kutil v0.3.11 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect - github.com/xdg-go/scram v1.2.0 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect github.com/yosida95/uritemplate/v3 v3.0.2 // indirect github.com/yuin/gopher-lua v1.1.1 // indirect From 0fccbc2afb6587658adeaa93438a53543c63d183 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 1 Mar 2026 10:21:37 +0000 Subject: [PATCH 5/7] Fix DLQ idx consistency, fix Append race with advisory lock, add PG integration tests Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- store/pg_dlq.go | 1 + store/pg_event_store.go | 17 ++ store/pg_stores_test.go | 555 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 573 insertions(+) create mode 100644 store/pg_stores_test.go diff --git a/store/pg_dlq.go b/store/pg_dlq.go index 7931d349..15280c3b 100644 --- a/store/pg_dlq.go +++ b/store/pg_dlq.go @@ -277,6 +277,7 @@ func buildPGDLQQuery(base string, filter DLQFilter) (string, []any) { if filter.ErrorType != "" { conditions = append(conditions, fmt.Sprintf("error_type = $%d", idx)) args = append(args, filter.ErrorType) + idx++ //nolint:ineffassign // keep idx consistent for future conditions } query := base diff --git a/store/pg_event_store.go b/store/pg_event_store.go index ca966ac5..f05b9cf8 100644 --- a/store/pg_event_store.go +++ b/store/pg_event_store.go @@ -2,6 +2,7 @@ package store import ( "context" + "encoding/binary" "fmt" "sort" "time" @@ -54,6 +55,14 @@ func (s *PGEventStore) Append(ctx context.Context, executionID uuid.UUID, eventT } defer func() { _ = tx.Rollback(ctx) }() + // Acquire a per-execution advisory transaction lock to serialize concurrent + // appends for the same execution. pg_advisory_xact_lock is released + // automatically when the transaction commits or rolls back. + lockKey := executionIDToLockKey(executionID) + if _, err := tx.Exec(ctx, `SELECT pg_advisory_xact_lock($1)`, lockKey); err != nil { + return fmt.Errorf("acquire advisory lock: %w", err) + } + // Get next sequence number for this execution. var maxSeq *int64 err = tx.QueryRow(ctx, @@ -84,6 +93,14 @@ func (s *PGEventStore) Append(ctx context.Context, executionID uuid.UUID, eventT return tx.Commit(ctx) } +// executionIDToLockKey derives a stable int64 advisory lock key from a UUID by +// XOR-ing its two 64-bit halves. Used for pg_advisory_xact_lock per execution. +func executionIDToLockKey(id uuid.UUID) int64 { + hi := int64(binary.BigEndian.Uint64(id[:8])) //nolint:gosec // intentional truncation for advisory lock key + lo := int64(binary.BigEndian.Uint64(id[8:])) //nolint:gosec // intentional truncation for advisory lock key + return hi ^ lo +} + func (s *PGEventStore) GetEvents(ctx context.Context, executionID uuid.UUID) ([]ExecutionEvent, error) { rows, err := s.pool.Query(ctx, `SELECT id, execution_id, sequence_num, event_type, event_data, created_at diff --git a/store/pg_stores_test.go b/store/pg_stores_test.go new file mode 100644 index 00000000..54b98105 --- /dev/null +++ b/store/pg_stores_test.go @@ -0,0 +1,555 @@ +package store + +import ( + "context" + "encoding/json" + "os" + "sync" + "testing" + "time" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgxpool" +) + +// --------------------------------------------------------------------------- +// Shared integration test helper +// --------------------------------------------------------------------------- + +// newTestPGPool opens a pgxpool connection using the PG_URL env var. +// The test is skipped when PG_URL is not set. +func newTestPGPool(t *testing.T) *pgxpool.Pool { + t.Helper() + pgURL := os.Getenv("PG_URL") + if pgURL == "" { + t.Skip("PG_URL not set") + } + + ctx := context.Background() + pool, err := pgxpool.New(ctx, pgURL) + if err != nil { + t.Fatalf("connect to postgres: %v", err) + } + if err := pool.Ping(ctx); err != nil { + pool.Close() + t.Fatalf("ping postgres: %v", err) + } + t.Cleanup(pool.Close) + return pool +} + +// =========================================================================== +// PGEventStore integration tests +// =========================================================================== + +func TestPGEventStore_Integration(t *testing.T) { + pool := newTestPGPool(t) + ctx := context.Background() + + store, err := NewPGEventStore(pool) + if err != nil { + t.Fatalf("NewPGEventStore: %v", err) + } + + execID := uuid.New() + t.Cleanup(func() { + _, _ = pool.Exec(ctx, `DELETE FROM execution_events WHERE execution_id = $1`, execID) + }) + + // Append events. + appendStarted(t, store, execID, "pg-pipeline", "pg-tenant") + appendStepStarted(t, store, execID, "step-a") + if err := store.Append(ctx, execID, EventStepInputRecorded, map[string]any{ + "step_name": "step-a", + "input": map[string]any{"order_id": "pg-001"}, + }); err != nil { + t.Fatalf("Append input: %v", err) + } + if err := store.Append(ctx, execID, EventStepOutputRecorded, map[string]any{ + "step_name": "step-a", + "output": map[string]any{"valid": true}, + }); err != nil { + t.Fatalf("Append output: %v", err) + } + appendStepCompleted(t, store, execID, "step-a") + appendCompleted(t, store, execID) + + // GetEvents. + events, err := store.GetEvents(ctx, execID) + if err != nil { + t.Fatalf("GetEvents: %v", err) + } + if len(events) != 6 { + t.Fatalf("expected 6 events, got %d", len(events)) + } + for i, ev := range events { + if ev.ExecutionID != execID { + t.Errorf("event[%d]: wrong execution_id", i) + } + if ev.SequenceNum != int64(i+1) { + t.Errorf("event[%d]: expected sequence %d, got %d", i, i+1, ev.SequenceNum) + } + if ev.CreatedAt.IsZero() { + t.Errorf("event[%d]: zero created_at", i) + } + if len(ev.EventData) == 0 { + t.Errorf("event[%d]: empty event_data", i) + } + } + + // GetTimeline. + timeline, err := store.GetTimeline(ctx, execID) + if err != nil { + t.Fatalf("GetTimeline: %v", err) + } + if timeline.Pipeline != "pg-pipeline" { + t.Errorf("Timeline.Pipeline = %q, want 'pg-pipeline'", timeline.Pipeline) + } + if timeline.TenantID != "pg-tenant" { + t.Errorf("Timeline.TenantID = %q, want 'pg-tenant'", timeline.TenantID) + } + if timeline.Status != "completed" { + t.Errorf("Timeline.Status = %q, want 'completed'", timeline.Status) + } + if len(timeline.Steps) != 1 || timeline.Steps[0].StepName != "step-a" { + t.Errorf("Timeline.Steps = %v, want [{step-a}]", timeline.Steps) + } + if timeline.Steps[0].InputData == nil { + t.Error("step InputData should not be nil") + } + + // GetTimeline on unknown execution. + _, err = store.GetTimeline(ctx, uuid.New()) + if err != ErrNotFound { + t.Errorf("GetTimeline missing: got %v, want ErrNotFound", err) + } + + // ListExecutions. + list, err := store.ListExecutions(ctx, ExecutionEventFilter{Pipeline: "pg-pipeline"}) + if err != nil { + t.Fatalf("ListExecutions: %v", err) + } + found := false + for _, m := range list { + if m.ExecutionID == execID { + found = true + } + } + if !found { + t.Error("expected to find execID in ListExecutions") + } +} + +func TestPGEventStore_ConcurrentAppend(t *testing.T) { + pool := newTestPGPool(t) + ctx := context.Background() + + store, err := NewPGEventStore(pool) + if err != nil { + t.Fatalf("NewPGEventStore: %v", err) + } + + execID := uuid.New() + t.Cleanup(func() { + _, _ = pool.Exec(ctx, `DELETE FROM execution_events WHERE execution_id = $1`, execID) + }) + + const goroutines = 10 + errs := make([]error, goroutines) + var wg sync.WaitGroup + for i := 0; i < goroutines; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + errs[idx] = store.Append(ctx, execID, EventStepStarted, map[string]any{ + "step_name": "concurrent", + }) + }(i) + } + wg.Wait() + + for i, e := range errs { + if e != nil { + t.Errorf("goroutine %d Append error: %v", i, e) + } + } + + events, err := store.GetEvents(ctx, execID) + if err != nil { + t.Fatalf("GetEvents: %v", err) + } + if len(events) != goroutines { + t.Fatalf("expected %d events, got %d", goroutines, len(events)) + } + + // Verify sequence numbers are unique and consecutive. + seen := make(map[int64]bool) + for _, ev := range events { + if seen[ev.SequenceNum] { + t.Errorf("duplicate sequence_num %d", ev.SequenceNum) + } + seen[ev.SequenceNum] = true + } + for i := int64(1); i <= goroutines; i++ { + if !seen[i] { + t.Errorf("missing sequence_num %d", i) + } + } +} + +// =========================================================================== +// PGAPIKeyStore integration tests +// =========================================================================== + +func TestPGAPIKeyStore_Integration(t *testing.T) { + pool := newTestPGPool(t) + ctx := context.Background() + + store, err := NewPGAPIKeyStore(pool) + if err != nil { + t.Fatalf("NewPGAPIKeyStore: %v", err) + } + + companyID := uuid.New() + createdBy := uuid.New() + + t.Cleanup(func() { + _, _ = pool.Exec(ctx, `DELETE FROM api_keys WHERE company_id = $1`, companyID) + }) + + key := &APIKey{ + Name: "pg-test-key", + CompanyID: companyID, + Permissions: []string{"read", "write", "admin"}, + CreatedBy: createdBy, + IsActive: true, + } + + // Create. + rawKey, err := store.Create(ctx, key) + if err != nil { + t.Fatalf("Create: %v", err) + } + if len(rawKey) == 0 { + t.Fatal("expected non-empty raw key") + } + + // Get by ID. + fetched, err := store.Get(ctx, key.ID) + if err != nil { + t.Fatalf("Get: %v", err) + } + if fetched.Name != "pg-test-key" { + t.Errorf("Name = %q, want 'pg-test-key'", fetched.Name) + } + if fetched.CompanyID != companyID { + t.Errorf("CompanyID mismatch") + } + if fetched.CreatedBy != createdBy { + t.Errorf("CreatedBy mismatch") + } + if fetched.IsActive != true { + t.Errorf("IsActive = false, want true") + } + + // JSONB permissions round-trip. + if len(fetched.Permissions) != 3 { + t.Fatalf("Permissions len = %d, want 3", len(fetched.Permissions)) + } + if fetched.Permissions[0] != "read" || fetched.Permissions[1] != "write" || fetched.Permissions[2] != "admin" { + t.Errorf("Permissions = %v, want [read write admin]", fetched.Permissions) + } + + // Get by hash. + h := hashKey(rawKey) + byHash, err := store.GetByHash(ctx, h) + if err != nil { + t.Fatalf("GetByHash: %v", err) + } + if byHash.ID != key.ID { + t.Errorf("GetByHash ID mismatch") + } + + // Validate with correct key. + validated, err := store.Validate(ctx, rawKey) + if err != nil { + t.Fatalf("Validate: %v", err) + } + if validated.ID != key.ID { + t.Errorf("Validate ID mismatch") + } + + // Validate with wrong key. + _, err = store.Validate(ctx, "wf_0000000000000000000000000000dead") + if err != ErrNotFound { + t.Errorf("Validate wrong key: got %v, want ErrNotFound", err) + } + + // List. + list, err := store.List(ctx, companyID) + if err != nil { + t.Fatalf("List: %v", err) + } + if len(list) != 1 { + t.Fatalf("List: expected 1, got %d", len(list)) + } + + // UpdateLastUsed. + if err := store.UpdateLastUsed(ctx, key.ID); err != nil { + t.Fatalf("UpdateLastUsed: %v", err) + } + after, err := store.Get(ctx, key.ID) + if err != nil { + t.Fatalf("Get after UpdateLastUsed: %v", err) + } + if after.LastUsedAt == nil { + t.Error("expected LastUsedAt to be set") + } + + // Expired key. + past := time.Now().Add(-time.Hour) + expiredKey := &APIKey{ + Name: "expired-key", + CompanyID: companyID, + Permissions: []string{}, + CreatedBy: createdBy, + IsActive: true, + ExpiresAt: &past, + } + expiredRaw, err := store.Create(ctx, expiredKey) + if err != nil { + t.Fatalf("Create expired: %v", err) + } + _, err = store.Validate(ctx, expiredRaw) + if err != ErrKeyExpired { + t.Errorf("Validate expired: got %v, want ErrKeyExpired", err) + } + + // Inactive key. + inactiveKey := &APIKey{ + Name: "inactive-key", + CompanyID: companyID, + Permissions: []string{}, + CreatedBy: createdBy, + IsActive: false, + } + inactiveRaw, err := store.Create(ctx, inactiveKey) + if err != nil { + t.Fatalf("Create inactive: %v", err) + } + _, err = store.Validate(ctx, inactiveRaw) + if err != ErrKeyInactive { + t.Errorf("Validate inactive: got %v, want ErrKeyInactive", err) + } + + // Delete. + if err := store.Delete(ctx, key.ID); err != nil { + t.Fatalf("Delete: %v", err) + } + _, err = store.Get(ctx, key.ID) + if err != ErrNotFound { + t.Errorf("Get after Delete: got %v, want ErrNotFound", err) + } + + // Delete non-existent. + if err := store.Delete(ctx, uuid.New()); err != ErrNotFound { + t.Errorf("Delete missing: got %v, want ErrNotFound", err) + } +} + +// =========================================================================== +// PGIdempotencyStore integration tests +// =========================================================================== + +func TestPGIdempotencyStore_Integration(t *testing.T) { + pool := newTestPGPool(t) + + // Reuse the shared idempotency test suite. + runIdempotencyTests(t, "Postgres", func(t *testing.T) IdempotencyStore { + t.Helper() + ctx := context.Background() + + store, err := NewPGIdempotencyStore(pool) + if err != nil { + t.Fatalf("NewPGIdempotencyStore: %v", err) + } + + // Each sub-test gets its own unique key prefix; clean up after. + prefix := "pg-idem-" + uuid.New().String() + t.Cleanup(func() { + _, _ = pool.Exec(ctx, `DELETE FROM idempotency_keys WHERE key LIKE $1`, prefix+"%") + }) + + // Wrap the store to inject the unique prefix into keys used by runIdempotencyTests. + // runIdempotencyTests creates its own records directly; no wrapping needed + // because each test uses unique keys. Simply return the store. + return store + }) +} + +// =========================================================================== +// PGDLQStore integration tests +// =========================================================================== + +func TestPGDLQStore_Integration(t *testing.T) { + pool := newTestPGPool(t) + ctx := context.Background() + + store, err := NewPGDLQStore(pool) + if err != nil { + t.Fatalf("NewPGDLQStore: %v", err) + } + + // Each test run uses entries with a unique pipeline name for isolation. + pipeline := "pg-dlq-" + uuid.New().String() + t.Cleanup(func() { + _, _ = pool.Exec(ctx, `DELETE FROM dlq_entries WHERE pipeline_name = $1`, pipeline) + }) + + // Add. + entry := &DLQEntry{ + OriginalEvent: json.RawMessage(`{"type":"pg.test","data":{"k":"v"}}`), + PipelineName: pipeline, + StepName: "pg-step", + ErrorMessage: "pg error", + ErrorType: "pg_type", + MaxRetries: 5, + Status: DLQStatusPending, + Metadata: map[string]any{"pg": true, "count": float64(42)}, + } + if err := store.Add(ctx, entry); err != nil { + t.Fatalf("Add: %v", err) + } + + // Get. + got, err := store.Get(ctx, entry.ID) + if err != nil { + t.Fatalf("Get: %v", err) + } + if got.PipelineName != pipeline { + t.Errorf("PipelineName = %q, want %q", got.PipelineName, pipeline) + } + if got.Status != DLQStatusPending { + t.Errorf("Status = %q, want pending", got.Status) + } + if got.MaxRetries != 5 { + t.Errorf("MaxRetries = %d, want 5", got.MaxRetries) + } + if got.Metadata == nil { + t.Fatal("expected non-nil metadata") + } + if got.Metadata["pg"] != true { + t.Errorf("Metadata[pg] = %v, want true", got.Metadata["pg"]) + } + if string(got.OriginalEvent) == "" { + t.Error("expected non-empty original_event") + } + // Verify JSON round-trip of original event. + var oe map[string]any + if err := json.Unmarshal(got.OriginalEvent, &oe); err != nil { + t.Errorf("OriginalEvent not valid JSON: %v", err) + } + + // Get not found. + _, err = store.Get(ctx, uuid.New()) + if err != ErrNotFound { + t.Errorf("Get missing: got %v, want ErrNotFound", err) + } + + // List + Count. + list, err := store.List(ctx, DLQFilter{PipelineName: pipeline}) + if err != nil { + t.Fatalf("List: %v", err) + } + if len(list) != 1 { + t.Fatalf("List: expected 1, got %d", len(list)) + } + count, err := store.Count(ctx, DLQFilter{PipelineName: pipeline}) + if err != nil { + t.Fatalf("Count: %v", err) + } + if count != 1 { + t.Errorf("Count = %d, want 1", count) + } + + // Filter by error_type. + byType, err := store.List(ctx, DLQFilter{ErrorType: "pg_type"}) + if err != nil { + t.Fatalf("List by error_type: %v", err) + } + found := false + for _, e := range byType { + if e.ID == entry.ID { + found = true + } + } + if !found { + t.Error("expected to find entry when filtering by error_type") + } + + // Retry. + if err := store.Retry(ctx, entry.ID); err != nil { + t.Fatalf("Retry: %v", err) + } + retried, err := store.Get(ctx, entry.ID) + if err != nil { + t.Fatalf("Get after Retry: %v", err) + } + if retried.Status != DLQStatusRetrying { + t.Errorf("Status after Retry = %q, want retrying", retried.Status) + } + if retried.RetryCount != 1 { + t.Errorf("RetryCount after Retry = %d, want 1", retried.RetryCount) + } + + // UpdateStatus. + if err := store.UpdateStatus(ctx, entry.ID, DLQStatusPending); err != nil { + t.Fatalf("UpdateStatus: %v", err) + } + + // Discard. + if err := store.Discard(ctx, entry.ID); err != nil { + t.Fatalf("Discard: %v", err) + } + discarded, err := store.Get(ctx, entry.ID) + if err != nil { + t.Fatalf("Get after Discard: %v", err) + } + if discarded.Status != DLQStatusDiscarded { + t.Errorf("Status after Discard = %q, want discarded", discarded.Status) + } + + // Resolve a second entry. + entry2 := &DLQEntry{ + PipelineName: pipeline, + StepName: "pg-step-2", + ErrorMessage: "err2", + ErrorType: "pg_type", + Status: DLQStatusPending, + } + if err := store.Add(ctx, entry2); err != nil { + t.Fatalf("Add entry2: %v", err) + } + if err := store.Resolve(ctx, entry2.ID); err != nil { + t.Fatalf("Resolve: %v", err) + } + resolved, err := store.Get(ctx, entry2.ID) + if err != nil { + t.Fatalf("Get after Resolve: %v", err) + } + if resolved.Status != DLQStatusResolved { + t.Errorf("Status after Resolve = %q, want resolved", resolved.Status) + } + if resolved.ResolvedAt == nil { + t.Error("expected non-nil resolved_at after Resolve") + } + + // Purge resolved/discarded entries older than zero duration (all of them). + purged, err := store.Purge(ctx, 0) + if err != nil { + t.Fatalf("Purge: %v", err) + } + if purged < 2 { + t.Errorf("Purge: expected >= 2 removed, got %d", purged) + } +} From 7eabfa36e092c27c07bf44f624fc6d5afa064c46 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 1 Mar 2026 13:32:30 +0000 Subject: [PATCH 6/7] Remove dead idx++ assignment from buildPGDLQQuery ErrorType block Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- store/pg_dlq.go | 1 - 1 file changed, 1 deletion(-) diff --git a/store/pg_dlq.go b/store/pg_dlq.go index 15280c3b..7931d349 100644 --- a/store/pg_dlq.go +++ b/store/pg_dlq.go @@ -277,7 +277,6 @@ func buildPGDLQQuery(base string, filter DLQFilter) (string, []any) { if filter.ErrorType != "" { conditions = append(conditions, fmt.Sprintf("error_type = $%d", idx)) args = append(args, filter.ErrorType) - idx++ //nolint:ineffassign // keep idx consistent for future conditions } query := base From 359e6fbbed98025793b440d8a5cbd656eb9c5ec6 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 1 Mar 2026 13:39:06 +0000 Subject: [PATCH 7/7] Fix idempotency test cleanup to delete all rows, not just prefix-matched ones Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- store/pg_stores_test.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/store/pg_stores_test.go b/store/pg_stores_test.go index 54b98105..1ce043e1 100644 --- a/store/pg_stores_test.go +++ b/store/pg_stores_test.go @@ -374,15 +374,12 @@ func TestPGIdempotencyStore_Integration(t *testing.T) { t.Fatalf("NewPGIdempotencyStore: %v", err) } - // Each sub-test gets its own unique key prefix; clean up after. - prefix := "pg-idem-" + uuid.New().String() + // Ensure the idempotency_keys table is cleaned up after each sub-test so that + // fixed keys used by runIdempotencyTests do not accumulate across runs. t.Cleanup(func() { - _, _ = pool.Exec(ctx, `DELETE FROM idempotency_keys WHERE key LIKE $1`, prefix+"%") + _, _ = pool.Exec(ctx, `DELETE FROM idempotency_keys`) }) - // Wrap the store to inject the unique prefix into keys used by runIdempotencyTests. - // runIdempotencyTests creates its own records directly; no wrapping needed - // because each test uses unique keys. Simply return the store. return store }) }