Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
7786302
docker
rafa-stacks Apr 23, 2026
0237d40
v3 transactions draft
rafa-stacks Apr 27, 2026
51fbe5f
txs cursor
rafa-stacks Apr 27, 2026
9f9f802
tx detail
rafa-stacks Apr 27, 2026
bc06c89
tx events
rafa-stacks Apr 27, 2026
4a9eb2d
cursor names
rafa-stacks Apr 27, 2026
1feb1a0
mempool schemas
rafa-stacks Apr 28, 2026
ee7acb4
mempool list cursor
rafa-stacks Apr 28, 2026
1e94fcc
fix fastify plugin
rafa-stacks Apr 28, 2026
80b0bf4
Merge branch 'next' into fix/balance
rafa-stacks Apr 29, 2026
f0d9c3e
first tests
rafa-stacks Apr 29, 2026
d5cbb00
block schemas
rafa-stacks Apr 30, 2026
583e315
new schemas
rafa-stacks May 7, 2026
d9f6372
principal txs draft
rafa-stacks May 8, 2026
42da4aa
Merge branch 'next' into feat/v3/transactions
rafa-stacks May 8, 2026
c3baa7d
remove mempool
rafa-stacks May 8, 2026
b57c211
Merge branch 'next' into feat/v3/transactions
rafa-stacks May 8, 2026
86c9b9b
tests
rafa-stacks May 8, 2026
2819883
extra fields
rafa-stacks May 9, 2026
42e138d
involvement
rafa-stacks May 10, 2026
6289d37
dont show canonical status
rafa-stacks May 11, 2026
877411b
balance change table
rafa-stacks May 12, 2026
11dd9bd
start wiring balances query
rafa-stacks May 12, 2026
d9911ee
cursor pagination
rafa-stacks May 12, 2026
9f6fac2
more tests
rafa-stacks May 13, 2026
0ebca3b
chore: merge next
rafa-stacks May 18, 2026
061bbba
cache controller
rafa-stacks May 19, 2026
64a71b6
add tests
rafa-stacks May 20, 2026
d71c23a
fix: import ts types in migration
rafa-stacks May 22, 2026
6947143
split migration
rafa-stacks May 24, 2026
afab992
merge next
rafa-stacks May 24, 2026
aa6ae57
fix: attempt to calculate counts gradually
rafa-stacks May 25, 2026
6a2b79d
split count even more
rafa-stacks May 26, 2026
6c593a7
fix: use BigNumber for balance changes
rafa-stacks May 27, 2026
34d2661
chore: merge
rafa-stacks May 27, 2026
f986c6d
fix: serializer
rafa-stacks May 27, 2026
34002b7
fix: import
rafa-stacks May 27, 2026
03bea5e
merge next
rafa-stacks Jun 15, 2026
925cea4
move migration
rafa-stacks Jun 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
322 changes: 322 additions & 0 deletions migrations/1778599015338_principal-tx-balance-changes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,322 @@
import type { ColumnDefinitions, MigrationBuilder } from 'node-pg-migrate';

export const shorthands: ColumnDefinitions | undefined = undefined;

export function up(pgm: MigrationBuilder) {
pgm.createTable('principal_tx_balance_changes', {
principal: {
type: 'text',
notNull: true,
},
tx_id: {
type: 'bytea',
notNull: true,
},
block_height: {
type: 'integer',
notNull: true,
},
index_block_hash: {
type: 'bytea',
notNull: true,
},
microblock_hash: {
type: 'bytea',
notNull: true,
},
microblock_sequence: {
type: 'integer',
notNull: true,
},
tx_index: {
type: 'smallint',
notNull: true,
},
canonical: {
type: 'boolean',
notNull: true,
},
microblock_canonical: {
type: 'boolean',
notNull: true,
},
asset_type: {
type: 'smallint', // 1: STX, 2: FT, 3: NFT
notNull: true,
},
asset_identifier: {
type: 'text',
notNull: true,
},
sent: {
type: 'numeric',
notNull: true,
},
received: {
type: 'numeric',
notNull: true,
},
});

pgm.addColumn('principal_txs', {
balance_change_count: {
type: 'integer',
notNull: true,
default: 0,
},
});

// Unique constraint created before the backfill so each per-source INSERT below can use
// ON CONFLICT to merge with rows already produced by earlier sources (e.g. the fee row
// for a principal that also appears as an STX event participant).
pgm.addConstraint(
'principal_tx_balance_changes',
'unique_principal_tx_balance_changes',
'UNIQUE(principal, tx_id, index_block_hash, microblock_hash, asset_type, asset_identifier)'
);

// Staging table for balance_change_count deltas. Each per-source INSERT captures the
// rows it actually created (via the xmax = 0 idiom on its RETURNING set — true for fresh
// inserts, false when ON CONFLICT triggered a merge) and writes one partial-count row per
// (principal, tx, index_block_hash, microblock_hash) here. A final UPDATE rolls these into
// principal_txs.balance_change_count.
//
// Why a staging table instead of either:
// (a) One COUNT(*) over the finished principal_tx_balance_changes (the previous design):
// that aggregate spans billions of rows, its hash exceeds work_mem and spills to
// disk, and the job never finishes.
// (b) Inline UPDATE-per-source against principal_txs: each principal_txs row could be
// touched by up to 7 sources, meaning up to 7 heap rewrites + index updates per row.
// Staging lets the end-of-migration UPDATE touch each row exactly once.
//
// TEMP + ON COMMIT DROP: no WAL for the staging rows, table is gone when the migration's
// transaction commits.
pgm.sql(`
CREATE TEMP TABLE balance_count_deltas (
principal text NOT NULL,
tx_id bytea NOT NULL,
index_block_hash bytea NOT NULL,
microblock_hash bytea NOT NULL,
delta integer NOT NULL
) ON COMMIT DROP
`);

// ===== Per-source backfill =====
//
// Mirrors PgWriteStore.updatePrincipalTxs:
// - Tx fee always contributes an STX `sent` row from the fee payer (sponsor || sender).
// - STX/FT events: sender contributes `sent`, recipient contributes `received`.
// - NFT events count 1 token per event.
// Event-table CHECK constraints guarantee sender IS NULL on mints and recipient IS NULL on
// burns, so the IS NOT NULL filters are sufficient.
//
// Each source is its own INSERT so per-statement memory stays bounded by one source table.
// The wrapping CTE feeds RETURNING into the deltas staging table — only `is_new` rows
// (newly inserted rather than merged via ON CONFLICT) count as +1 toward
// balance_change_count.
const writeDeltas = (sourceInsert: string) => `
WITH ins AS (
${sourceInsert}
ON CONFLICT ON CONSTRAINT unique_principal_tx_balance_changes DO UPDATE SET
sent = principal_tx_balance_changes.sent + EXCLUDED.sent,
received = principal_tx_balance_changes.received + EXCLUDED.received
RETURNING principal, tx_id, index_block_hash, microblock_hash, (xmax = 0) AS is_new
)
INSERT INTO balance_count_deltas (principal, tx_id, index_block_hash, microblock_hash, delta)
SELECT principal, tx_id, index_block_hash, microblock_hash, COUNT(*)::int
FROM ins
WHERE is_new
GROUP BY principal, tx_id, index_block_hash, microblock_hash
`;

// Tx fees: one row per tx, no source-side GROUP BY needed.
pgm.sql(
writeDeltas(`
INSERT INTO principal_tx_balance_changes (
principal, tx_id, block_height, index_block_hash, microblock_hash,
microblock_sequence, tx_index, canonical, microblock_canonical,
asset_type, asset_identifier, sent, received
)
SELECT
COALESCE(sponsor_address, sender_address),
tx_id, block_height, index_block_hash, microblock_hash,
microblock_sequence, tx_index, canonical, microblock_canonical,
1::smallint, 'stx'::text,
fee_rate::numeric, 0::numeric
FROM txs
`)
);

// STX sender side (transfer + burn).
pgm.sql(
writeDeltas(`
INSERT INTO principal_tx_balance_changes (
principal, tx_id, block_height, index_block_hash, microblock_hash,
microblock_sequence, tx_index, canonical, microblock_canonical,
asset_type, asset_identifier, sent, received
)
SELECT
sender,
tx_id, block_height, index_block_hash, microblock_hash,
microblock_sequence, tx_index, canonical, microblock_canonical,
1::smallint, 'stx'::text,
SUM(amount)::numeric, 0::numeric
FROM stx_events
WHERE sender IS NOT NULL
GROUP BY sender, tx_id, block_height, index_block_hash, microblock_hash,
microblock_sequence, tx_index, canonical, microblock_canonical
`)
);

// STX recipient side (transfer + mint).
pgm.sql(
writeDeltas(`
INSERT INTO principal_tx_balance_changes (
principal, tx_id, block_height, index_block_hash, microblock_hash,
microblock_sequence, tx_index, canonical, microblock_canonical,
asset_type, asset_identifier, sent, received
)
SELECT
recipient,
tx_id, block_height, index_block_hash, microblock_hash,
microblock_sequence, tx_index, canonical, microblock_canonical,
1::smallint, 'stx'::text,
0::numeric, SUM(amount)::numeric
FROM stx_events
WHERE recipient IS NOT NULL
GROUP BY recipient, tx_id, block_height, index_block_hash, microblock_hash,
microblock_sequence, tx_index, canonical, microblock_canonical
`)
);

// FT sender side.
pgm.sql(
writeDeltas(`
INSERT INTO principal_tx_balance_changes (
principal, tx_id, block_height, index_block_hash, microblock_hash,
microblock_sequence, tx_index, canonical, microblock_canonical,
asset_type, asset_identifier, sent, received
)
SELECT
sender,
tx_id, block_height, index_block_hash, microblock_hash,
microblock_sequence, tx_index, canonical, microblock_canonical,
2::smallint, asset_identifier,
SUM(amount)::numeric, 0::numeric
FROM ft_events
WHERE sender IS NOT NULL
GROUP BY sender, asset_identifier, tx_id, block_height, index_block_hash,
microblock_hash, microblock_sequence, tx_index, canonical, microblock_canonical
`)
);

// FT recipient side.
pgm.sql(
writeDeltas(`
INSERT INTO principal_tx_balance_changes (
principal, tx_id, block_height, index_block_hash, microblock_hash,
microblock_sequence, tx_index, canonical, microblock_canonical,
asset_type, asset_identifier, sent, received
)
SELECT
recipient,
tx_id, block_height, index_block_hash, microblock_hash,
microblock_sequence, tx_index, canonical, microblock_canonical,
2::smallint, asset_identifier,
0::numeric, SUM(amount)::numeric
FROM ft_events
WHERE recipient IS NOT NULL
GROUP BY recipient, asset_identifier, tx_id, block_height, index_block_hash,
microblock_hash, microblock_sequence, tx_index, canonical, microblock_canonical
`)
);

// NFT sender side, counted as 1 token per event.
pgm.sql(
writeDeltas(`
INSERT INTO principal_tx_balance_changes (
principal, tx_id, block_height, index_block_hash, microblock_hash,
microblock_sequence, tx_index, canonical, microblock_canonical,
asset_type, asset_identifier, sent, received
)
SELECT
sender,
tx_id, block_height, index_block_hash, microblock_hash,
microblock_sequence, tx_index, canonical, microblock_canonical,
3::smallint, asset_identifier,
COUNT(*)::numeric, 0::numeric
FROM nft_events
WHERE sender IS NOT NULL
GROUP BY sender, asset_identifier, tx_id, block_height, index_block_hash,
microblock_hash, microblock_sequence, tx_index, canonical, microblock_canonical
`)
);

// NFT recipient side, counted as 1 token per event.
pgm.sql(
writeDeltas(`
INSERT INTO principal_tx_balance_changes (
principal, tx_id, block_height, index_block_hash, microblock_hash,
microblock_sequence, tx_index, canonical, microblock_canonical,
asset_type, asset_identifier, sent, received
)
SELECT
recipient,
tx_id, block_height, index_block_hash, microblock_hash,
microblock_sequence, tx_index, canonical, microblock_canonical,
3::smallint, asset_identifier,
0::numeric, COUNT(*)::numeric
FROM nft_events
WHERE recipient IS NOT NULL
GROUP BY recipient, asset_identifier, tx_id, block_height, index_block_hash,
microblock_hash, microblock_sequence, tx_index, canonical, microblock_canonical
`)
);

// Materialize the aggregated counts into a separate, indexed temp table BEFORE running
// the UPDATE. This splits "aggregate" from "join + update" so each step gets a clean plan:
// 1. SUM / GROUP BY runs once as a standalone scan of balance_count_deltas into
// balance_count_final. The result is one row per UPDATE target — orders of magnitude
// smaller than balance_count_deltas itself.
// 2. The wrapping UPDATE then has an indexed driver on its left side, so the planner
// can pick a merge join or index nested loop along `principal_txs_unique` (which
// covers exactly this key) instead of hash-joining principal_txs — a multi-billion-
// row table — against an unindexed CTE. That converts random heap probes across all
// of principal_txs into ordered ones, which is the fix for the DataFileRead-bound
// UPDATE that took >24h on the prior attempt.
pgm.sql(`
CREATE TEMP TABLE balance_count_final ON COMMIT DROP AS
SELECT principal, tx_id, index_block_hash, microblock_hash,
SUM(delta)::int AS cnt
FROM balance_count_deltas
GROUP BY principal, tx_id, index_block_hash, microblock_hash
`);
// The deltas staging table is no longer needed and is the larger of the two; drop it now
// to free temp space and reduce buffer-cache pressure during the UPDATE.
pgm.sql(`DROP TABLE balance_count_deltas`);

pgm.sql(`
CREATE INDEX ON balance_count_final
(principal, tx_id, index_block_hash, microblock_hash)
`);
pgm.sql(`ANALYZE balance_count_final`);

pgm.sql(`
UPDATE principal_txs AS pt
SET balance_change_count = c.cnt
FROM balance_count_final AS c
WHERE pt.principal = c.principal
AND pt.tx_id = c.tx_id
AND pt.index_block_hash = c.index_block_hash
AND pt.microblock_hash = c.microblock_hash
`);

pgm.createIndex('principal_tx_balance_changes', 'tx_id');
pgm.createIndex('principal_tx_balance_changes', ['index_block_hash', 'canonical']);
pgm.createIndex('principal_tx_balance_changes', 'microblock_hash');
}

export function down(pgm: MigrationBuilder) {
pgm.dropTable('principal_tx_balance_changes');
pgm.dropColumn('principal_txs', 'balance_change_count');
}
2 changes: 1 addition & 1 deletion src/api/controllers/db-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ function getTxAnchorModeString(anchorMode: number): TransactionAnchorModeType {
}
}

function getTxTenureChangeCauseString(cause: number) {
export function getTxTenureChangeCauseString(cause: number) {
switch (cause) {
case 0:
return 'block_found';
Expand Down
Loading
Loading