Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 27 additions & 21 deletions chain/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"time"
)

// Appender writes hash-chained audit entries to Postgres.
Expand All @@ -21,33 +22,36 @@ func NewAppender(db *sql.DB) *Appender {

// Append opens its own transaction, appends one entry to ledger, and commits.
// metadata is stored as-is in audit_log.metadata (JSONB); pass nil if not needed.
// Returns (sequence, entryHash, error).
func (a *Appender) Append(ctx context.Context, ledger, eventType string, payload, metadata []byte, actor string) (int64, string, error) {
// Returns (sequence, entryHash, dbCreatedAt, error).
// createdAt is the server-side timestamp assigned by the DB (via NOW()), not the
// application clock, so it is accurate regardless of NTP skew across nodes.
func (a *Appender) Append(ctx context.Context, ledger, eventType string, payload, metadata []byte, actor string) (int64, string, time.Time, error) {
tx, err := a.db.BeginTx(ctx, nil)
if err != nil {
return 0, "", fmt.Errorf("chain.Append: begin tx: %w", err)
return 0, "", time.Time{}, fmt.Errorf("chain.Append: begin tx: %w", err)
}
seq, hash, err := a.AppendTx(ctx, tx, ledger, eventType, payload, metadata, actor)
seq, hash, createdAt, err := a.AppendTx(ctx, tx, ledger, eventType, payload, metadata, actor)
if err != nil {
_ = tx.Rollback()
return 0, "", err
return 0, "", time.Time{}, err
}
if err := tx.Commit(); err != nil {
return 0, "", fmt.Errorf("chain.Append: commit: %w", err)
return 0, "", time.Time{}, fmt.Errorf("chain.Append: commit: %w", err)
}
return seq, hash, nil
return seq, hash, createdAt, nil
}

// AppendTx appends one entry within the caller-supplied transaction tx.
// The caller is responsible for commit/rollback. This is the primitive used
// by BMW PR 11 Task 47 (step.bmw.audit_append_with_map) so that the audit
// entry and the business record land in a single atomic transaction.
// metadata is stored as-is in audit_log.metadata (JSONB); pass nil if not needed.
func (a *Appender) AppendTx(ctx context.Context, tx *sql.Tx, ledger, eventType string, payload, metadata []byte, actor string) (int64, string, error) {
// Returns (sequence, entryHash, dbCreatedAt, error).
func (a *Appender) AppendTx(ctx context.Context, tx *sql.Tx, ledger, eventType string, payload, metadata []byte, actor string) (int64, string, time.Time, error) {
// 0. Enforce a server-side lock timeout so a stalled holder surfaces as an
// error rather than blocking indefinitely.
if _, err := tx.ExecContext(ctx, `SET LOCAL lock_timeout = '5s'`); err != nil {
return 0, "", fmt.Errorf("chain.AppendTx: set lock_timeout: %w", err)
return 0, "", time.Time{}, fmt.Errorf("chain.AppendTx: set lock_timeout: %w", err)
}

// 1. Lock the ledger row and read the current cursor.
Expand All @@ -61,34 +65,36 @@ func (a *Appender) AppendTx(ctx context.Context, tx *sql.Tx, ledger, eventType s
ledger,
).Scan(&lastSeq, &lastHash)
if err == sql.ErrNoRows {
return 0, "", fmt.Errorf("chain.AppendTx: unknown ledger %q", ledger)
return 0, "", time.Time{}, fmt.Errorf("chain.AppendTx: unknown ledger %q", ledger)
}
if err != nil {
return 0, "", fmt.Errorf("chain.AppendTx: lock ledger: %w", err)
return 0, "", time.Time{}, fmt.Errorf("chain.AppendTx: lock ledger: %w", err)
}

// 2. Compute hashes.
payloadHash, err := PayloadHash(payload)
if err != nil {
return 0, "", fmt.Errorf("chain.AppendTx: %w", err)
return 0, "", time.Time{}, fmt.Errorf("chain.AppendTx: %w", err)
}
seq := lastSeq + 1
// For the genesis entry, prevHash is empty ("").
entryHash := EntryHash(seq, ledger, eventType, payloadHash, lastHash)

// 3. Insert the audit log row.
// created_at uses DB-server NOW() to avoid application clock skew in
// multi-node deployments.
_, err = tx.ExecContext(ctx,
// 3. Insert the audit log row and return the DB-assigned created_at.
// RETURNING created_at ensures callers receive the server-side timestamp
// (set by NOW() inside Postgres) rather than the application clock.
var createdAt time.Time
err = tx.QueryRowContext(ctx,
`INSERT INTO audit_log
(sequence, ledger, event_type, payload, payload_hash,
prev_entry_hash, entry_hash, created_at, appended_by_actor, metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7, NOW(), $8, $9)`,
VALUES ($1, $2, $3, $4, $5, $6, $7, NOW(), $8, $9)
RETURNING created_at`,
seq, ledger, eventType, payload, payloadHash,
lastHash, entryHash, actor, metadata,
)
).Scan(&createdAt)
if err != nil {
return 0, "", fmt.Errorf("chain.AppendTx: insert audit_log: %w", err)
return 0, "", time.Time{}, fmt.Errorf("chain.AppendTx: insert audit_log: %w", err)
}

// 4. Advance the ledger cursor.
Expand All @@ -99,8 +105,8 @@ func (a *Appender) AppendTx(ctx context.Context, tx *sql.Tx, ledger, eventType s
ledger, seq, entryHash,
)
if err != nil {
return 0, "", fmt.Errorf("chain.AppendTx: update audit_ledgers: %w", err)
return 0, "", time.Time{}, fmt.Errorf("chain.AppendTx: update audit_ledgers: %w", err)
}

return seq, entryHash, nil
return seq, entryHash, createdAt, nil
}
19 changes: 11 additions & 8 deletions chain/append_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestAppend_FirstEntry_SetsEmptyPrevHash(t *testing.T) {
createLedger(t, db, "test-ledger")
a := chain.NewAppender(db)

seq, hash, err := a.Append(ctx, "test-ledger", "event.x", []byte(`{"k":1}`), nil, "actor")
seq, hash, createdAt, err := a.Append(ctx, "test-ledger", "event.x", []byte(`{"k":1}`), nil, "actor")
if err != nil {
t.Fatal(err)
}
Expand All @@ -159,6 +159,9 @@ func TestAppend_FirstEntry_SetsEmptyPrevHash(t *testing.T) {
if len(hash) != 64 {
t.Errorf("expected 64-char hash, got %d: %s", len(hash), hash)
}
if createdAt.IsZero() {
t.Error("Append returned zero createdAt; expected DB-assigned timestamp")
}

// First entry must have empty prev_entry_hash.
var prev string
Expand All @@ -179,11 +182,11 @@ func TestAppend_SecondEntry_LinksPrevHash(t *testing.T) {
createLedger(t, db, "test-ledger")
a := chain.NewAppender(db)

_, h1, err := a.Append(ctx, "test-ledger", "event.x", []byte(`{"k":1}`), nil, "")
_, h1, _, err := a.Append(ctx, "test-ledger", "event.x", []byte(`{"k":1}`), nil, "")
if err != nil {
t.Fatal(err)
}
_, _, err = a.Append(ctx, "test-ledger", "event.x", []byte(`{"k":2}`), nil, "")
_, _, _, err = a.Append(ctx, "test-ledger", "event.x", []byte(`{"k":2}`), nil, "")
if err != nil {
t.Fatal(err)
}
Expand All @@ -206,7 +209,7 @@ func TestAppend_EntryHashMatchesChainComputation(t *testing.T) {
a := chain.NewAppender(db)

payload := []byte(`{"amount_cents":2000,"item_id":"abc"}`)
seq, gotHash, err := a.Append(ctx, "test-ledger", "contribution.captured", payload, nil, "stripe")
seq, gotHash, _, err := a.Append(ctx, "test-ledger", "contribution.captured", payload, nil, "stripe")
if err != nil {
t.Fatal(err)
}
Expand All @@ -227,7 +230,7 @@ func TestAppend_UnknownLedger_ReturnsError(t *testing.T) {
db := setupTestDB(t)
a := chain.NewAppender(db)

_, _, err := a.Append(ctx, "no-such-ledger", "event.x", []byte(`{}`), nil, "")
_, _, _, err := a.Append(ctx, "no-such-ledger", "event.x", []byte(`{}`), nil, "")
if err == nil {
t.Error("expected error for unknown ledger")
}
Expand All @@ -246,7 +249,7 @@ func TestAppendTx_ParticipatesInCallerTransaction(t *testing.T) {
if err != nil {
t.Fatal(err)
}
seq, _, err := a.AppendTx(ctx, tx, "test-ledger", "event.x", []byte(`{}`), nil, "actor")
seq, _, _, err := a.AppendTx(ctx, tx, "test-ledger", "event.x", []byte(`{}`), nil, "actor")
if err != nil {
_ = tx.Rollback()
t.Fatal(err)
Expand Down Expand Up @@ -293,7 +296,7 @@ func TestAppendTx_CommitPersistsEntry(t *testing.T) {
if err != nil {
t.Fatal(err)
}
seq, hash, err := a.AppendTx(ctx, tx, "test-ledger", "event.x", []byte(`{"v":1}`), nil, "")
seq, hash, _, err := a.AppendTx(ctx, tx, "test-ledger", "event.x", []byte(`{"v":1}`), nil, "")
if err != nil {
_ = tx.Rollback()
t.Fatal(err)
Expand Down Expand Up @@ -342,7 +345,7 @@ func TestAppend_ConcurrentAppends_MonotonicSequence(t *testing.T) {
go func() {
defer wg.Done()
for i := 0; i < entriesEach; i++ {
seq, _, err := a.Append(ctx, "concurrent-ledger", "stress.event",
seq, _, _, err := a.Append(ctx, "concurrent-ledger", "stress.event",
[]byte(`{"g":1}`), nil, "")
mu.Lock()
if err != nil {
Expand Down
Loading
Loading