diff --git a/migrations/1778599015338_principal-tx-balance-changes.ts b/migrations/1778599015338_principal-tx-balance-changes.ts new file mode 100644 index 000000000..b380e27df --- /dev/null +++ b/migrations/1778599015338_principal-tx-balance-changes.ts @@ -0,0 +1,322 @@ +import type { ColumnDefinitions, MigrationBuilder } from 'node-pg-migrate'; + +export const shorthands: ColumnDefinitions | undefined = undefined; + +export function up(pgm: MigrationBuilder) { + pgm.createTable('principal_tx_balance_changes', { + principal: { + type: 'text', + notNull: true, + }, + tx_id: { + type: 'bytea', + notNull: true, + }, + block_height: { + type: 'integer', + notNull: true, + }, + index_block_hash: { + type: 'bytea', + notNull: true, + }, + microblock_hash: { + type: 'bytea', + notNull: true, + }, + microblock_sequence: { + type: 'integer', + notNull: true, + }, + tx_index: { + type: 'smallint', + notNull: true, + }, + canonical: { + type: 'boolean', + notNull: true, + }, + microblock_canonical: { + type: 'boolean', + notNull: true, + }, + asset_type: { + type: 'smallint', // 1: STX, 2: FT, 3: NFT + notNull: true, + }, + asset_identifier: { + type: 'text', + notNull: true, + }, + sent: { + type: 'numeric', + notNull: true, + }, + received: { + type: 'numeric', + notNull: true, + }, + }); + + pgm.addColumn('principal_txs', { + balance_change_count: { + type: 'integer', + notNull: true, + default: 0, + }, + }); + + // Unique constraint created before the backfill so each per-source INSERT below can use + // ON CONFLICT to merge with rows already produced by earlier sources (e.g. the fee row + // for a principal that also appears as an STX event participant). + pgm.addConstraint( + 'principal_tx_balance_changes', + 'unique_principal_tx_balance_changes', + 'UNIQUE(principal, tx_id, index_block_hash, microblock_hash, asset_type, asset_identifier)' + ); + + // Staging table for balance_change_count deltas. Each per-source INSERT captures the + // rows it actually created (via the xmax = 0 idiom on its RETURNING set — true for fresh + // inserts, false when ON CONFLICT triggered a merge) and writes one partial-count row per + // (principal, tx, index_block_hash, microblock_hash) here. A final UPDATE rolls these into + // principal_txs.balance_change_count. + // + // Why a staging table instead of either: + // (a) One COUNT(*) over the finished principal_tx_balance_changes (the previous design): + // that aggregate spans billions of rows, its hash exceeds work_mem and spills to + // disk, and the job never finishes. + // (b) Inline UPDATE-per-source against principal_txs: each principal_txs row could be + // touched by up to 7 sources, meaning up to 7 heap rewrites + index updates per row. + // Staging lets the end-of-migration UPDATE touch each row exactly once. + // + // TEMP + ON COMMIT DROP: no WAL for the staging rows, table is gone when the migration's + // transaction commits. + pgm.sql(` + CREATE TEMP TABLE balance_count_deltas ( + principal text NOT NULL, + tx_id bytea NOT NULL, + index_block_hash bytea NOT NULL, + microblock_hash bytea NOT NULL, + delta integer NOT NULL + ) ON COMMIT DROP + `); + + // ===== Per-source backfill ===== + // + // Mirrors PgWriteStore.updatePrincipalTxs: + // - Tx fee always contributes an STX `sent` row from the fee payer (sponsor || sender). + // - STX/FT events: sender contributes `sent`, recipient contributes `received`. + // - NFT events count 1 token per event. + // Event-table CHECK constraints guarantee sender IS NULL on mints and recipient IS NULL on + // burns, so the IS NOT NULL filters are sufficient. + // + // Each source is its own INSERT so per-statement memory stays bounded by one source table. + // The wrapping CTE feeds RETURNING into the deltas staging table — only `is_new` rows + // (newly inserted rather than merged via ON CONFLICT) count as +1 toward + // balance_change_count. + const writeDeltas = (sourceInsert: string) => ` + WITH ins AS ( + ${sourceInsert} + ON CONFLICT ON CONSTRAINT unique_principal_tx_balance_changes DO UPDATE SET + sent = principal_tx_balance_changes.sent + EXCLUDED.sent, + received = principal_tx_balance_changes.received + EXCLUDED.received + RETURNING principal, tx_id, index_block_hash, microblock_hash, (xmax = 0) AS is_new + ) + INSERT INTO balance_count_deltas (principal, tx_id, index_block_hash, microblock_hash, delta) + SELECT principal, tx_id, index_block_hash, microblock_hash, COUNT(*)::int + FROM ins + WHERE is_new + GROUP BY principal, tx_id, index_block_hash, microblock_hash + `; + + // Tx fees: one row per tx, no source-side GROUP BY needed. + pgm.sql( + writeDeltas(` + INSERT INTO principal_tx_balance_changes ( + principal, tx_id, block_height, index_block_hash, microblock_hash, + microblock_sequence, tx_index, canonical, microblock_canonical, + asset_type, asset_identifier, sent, received + ) + SELECT + COALESCE(sponsor_address, sender_address), + tx_id, block_height, index_block_hash, microblock_hash, + microblock_sequence, tx_index, canonical, microblock_canonical, + 1::smallint, 'stx'::text, + fee_rate::numeric, 0::numeric + FROM txs + `) + ); + + // STX sender side (transfer + burn). + pgm.sql( + writeDeltas(` + INSERT INTO principal_tx_balance_changes ( + principal, tx_id, block_height, index_block_hash, microblock_hash, + microblock_sequence, tx_index, canonical, microblock_canonical, + asset_type, asset_identifier, sent, received + ) + SELECT + sender, + tx_id, block_height, index_block_hash, microblock_hash, + microblock_sequence, tx_index, canonical, microblock_canonical, + 1::smallint, 'stx'::text, + SUM(amount)::numeric, 0::numeric + FROM stx_events + WHERE sender IS NOT NULL + GROUP BY sender, tx_id, block_height, index_block_hash, microblock_hash, + microblock_sequence, tx_index, canonical, microblock_canonical + `) + ); + + // STX recipient side (transfer + mint). + pgm.sql( + writeDeltas(` + INSERT INTO principal_tx_balance_changes ( + principal, tx_id, block_height, index_block_hash, microblock_hash, + microblock_sequence, tx_index, canonical, microblock_canonical, + asset_type, asset_identifier, sent, received + ) + SELECT + recipient, + tx_id, block_height, index_block_hash, microblock_hash, + microblock_sequence, tx_index, canonical, microblock_canonical, + 1::smallint, 'stx'::text, + 0::numeric, SUM(amount)::numeric + FROM stx_events + WHERE recipient IS NOT NULL + GROUP BY recipient, tx_id, block_height, index_block_hash, microblock_hash, + microblock_sequence, tx_index, canonical, microblock_canonical + `) + ); + + // FT sender side. + pgm.sql( + writeDeltas(` + INSERT INTO principal_tx_balance_changes ( + principal, tx_id, block_height, index_block_hash, microblock_hash, + microblock_sequence, tx_index, canonical, microblock_canonical, + asset_type, asset_identifier, sent, received + ) + SELECT + sender, + tx_id, block_height, index_block_hash, microblock_hash, + microblock_sequence, tx_index, canonical, microblock_canonical, + 2::smallint, asset_identifier, + SUM(amount)::numeric, 0::numeric + FROM ft_events + WHERE sender IS NOT NULL + GROUP BY sender, asset_identifier, tx_id, block_height, index_block_hash, + microblock_hash, microblock_sequence, tx_index, canonical, microblock_canonical + `) + ); + + // FT recipient side. + pgm.sql( + writeDeltas(` + INSERT INTO principal_tx_balance_changes ( + principal, tx_id, block_height, index_block_hash, microblock_hash, + microblock_sequence, tx_index, canonical, microblock_canonical, + asset_type, asset_identifier, sent, received + ) + SELECT + recipient, + tx_id, block_height, index_block_hash, microblock_hash, + microblock_sequence, tx_index, canonical, microblock_canonical, + 2::smallint, asset_identifier, + 0::numeric, SUM(amount)::numeric + FROM ft_events + WHERE recipient IS NOT NULL + GROUP BY recipient, asset_identifier, tx_id, block_height, index_block_hash, + microblock_hash, microblock_sequence, tx_index, canonical, microblock_canonical + `) + ); + + // NFT sender side, counted as 1 token per event. + pgm.sql( + writeDeltas(` + INSERT INTO principal_tx_balance_changes ( + principal, tx_id, block_height, index_block_hash, microblock_hash, + microblock_sequence, tx_index, canonical, microblock_canonical, + asset_type, asset_identifier, sent, received + ) + SELECT + sender, + tx_id, block_height, index_block_hash, microblock_hash, + microblock_sequence, tx_index, canonical, microblock_canonical, + 3::smallint, asset_identifier, + COUNT(*)::numeric, 0::numeric + FROM nft_events + WHERE sender IS NOT NULL + GROUP BY sender, asset_identifier, tx_id, block_height, index_block_hash, + microblock_hash, microblock_sequence, tx_index, canonical, microblock_canonical + `) + ); + + // NFT recipient side, counted as 1 token per event. + pgm.sql( + writeDeltas(` + INSERT INTO principal_tx_balance_changes ( + principal, tx_id, block_height, index_block_hash, microblock_hash, + microblock_sequence, tx_index, canonical, microblock_canonical, + asset_type, asset_identifier, sent, received + ) + SELECT + recipient, + tx_id, block_height, index_block_hash, microblock_hash, + microblock_sequence, tx_index, canonical, microblock_canonical, + 3::smallint, asset_identifier, + 0::numeric, COUNT(*)::numeric + FROM nft_events + WHERE recipient IS NOT NULL + GROUP BY recipient, asset_identifier, tx_id, block_height, index_block_hash, + microblock_hash, microblock_sequence, tx_index, canonical, microblock_canonical + `) + ); + + // Materialize the aggregated counts into a separate, indexed temp table BEFORE running + // the UPDATE. This splits "aggregate" from "join + update" so each step gets a clean plan: + // 1. SUM / GROUP BY runs once as a standalone scan of balance_count_deltas into + // balance_count_final. The result is one row per UPDATE target — orders of magnitude + // smaller than balance_count_deltas itself. + // 2. The wrapping UPDATE then has an indexed driver on its left side, so the planner + // can pick a merge join or index nested loop along `principal_txs_unique` (which + // covers exactly this key) instead of hash-joining principal_txs — a multi-billion- + // row table — against an unindexed CTE. That converts random heap probes across all + // of principal_txs into ordered ones, which is the fix for the DataFileRead-bound + // UPDATE that took >24h on the prior attempt. + pgm.sql(` + CREATE TEMP TABLE balance_count_final ON COMMIT DROP AS + SELECT principal, tx_id, index_block_hash, microblock_hash, + SUM(delta)::int AS cnt + FROM balance_count_deltas + GROUP BY principal, tx_id, index_block_hash, microblock_hash + `); + // The deltas staging table is no longer needed and is the larger of the two; drop it now + // to free temp space and reduce buffer-cache pressure during the UPDATE. + pgm.sql(`DROP TABLE balance_count_deltas`); + + pgm.sql(` + CREATE INDEX ON balance_count_final + (principal, tx_id, index_block_hash, microblock_hash) + `); + pgm.sql(`ANALYZE balance_count_final`); + + pgm.sql(` + UPDATE principal_txs AS pt + SET balance_change_count = c.cnt + FROM balance_count_final AS c + WHERE pt.principal = c.principal + AND pt.tx_id = c.tx_id + AND pt.index_block_hash = c.index_block_hash + AND pt.microblock_hash = c.microblock_hash + `); + + pgm.createIndex('principal_tx_balance_changes', 'tx_id'); + pgm.createIndex('principal_tx_balance_changes', ['index_block_hash', 'canonical']); + pgm.createIndex('principal_tx_balance_changes', 'microblock_hash'); +} + +export function down(pgm: MigrationBuilder) { + pgm.dropTable('principal_tx_balance_changes'); + pgm.dropColumn('principal_txs', 'balance_change_count'); +} diff --git a/migrations/1775100000000_tx-event-pagination-indexes.ts b/migrations/1779552862561_tx-event-pagination-indexes.ts similarity index 100% rename from migrations/1775100000000_tx-event-pagination-indexes.ts rename to migrations/1779552862561_tx-event-pagination-indexes.ts diff --git a/src/api/controllers/db-controller.ts b/src/api/controllers/db-controller.ts index 5c8e57d90..16cbd530e 100644 --- a/src/api/controllers/db-controller.ts +++ b/src/api/controllers/db-controller.ts @@ -142,7 +142,7 @@ function getTxAnchorModeString(anchorMode: number): TransactionAnchorModeType { } } -function getTxTenureChangeCauseString(cause: number) { +export function getTxTenureChangeCauseString(cause: number) { switch (cause) { case 0: return 'block_found'; diff --git a/src/api/routes/v3/principals.ts b/src/api/routes/v3/principals.ts index 5e725b029..4407707ad 100644 --- a/src/api/routes/v3/principals.ts +++ b/src/api/routes/v3/principals.ts @@ -1,9 +1,12 @@ -import { handlePrincipalCache } from '../../controllers/cache-controller.js'; +import { + handlePrincipalCache, + handleTransactionCache, +} from '../../controllers/cache-controller.js'; import { FastifyPluginAsync } from 'fastify'; import { Type, TypeBoxTypeProvider } from '@fastify/type-provider-typebox'; import { Server } from 'node:http'; import { getPagingQueryLimit, ResourceType } from '../../pagination.js'; -import { PrincipalSchema } from '../../schemas/v3/entities/common.js'; +import { PrincipalSchema, TransactionIdSchema } from '../../schemas/v3/entities/common.js'; import { CursorPaginationQuerystring, CursorPaginatedResponse, @@ -11,6 +14,18 @@ import { } from '../../schemas/v3/cursors.js'; import { PrincipalTransactionSummarySchema } from '../../schemas/v3/entities/principal-transactions.js'; import { serializePrincipalTransactionSummary } from '../../serializers/v3/transactions.js'; +import { + serializePrincipalBalanceChange, + serializePrincipalTransactionBalanceChange, +} from '../../serializers/v3/balance-changes.js'; +import { + PrincipalBalanceChangeCursorSchema, + PrincipalTransactionBalanceChangeCursorSchema, +} from '../../schemas/v3/params.js'; +import { + PrincipalBalanceChangeSchema, + PrincipalTransactionBalanceChangeSchema, +} from '../../schemas/v3/entities/principal-balance-changes.js'; export const PrincipalsRoutes: FastifyPluginAsync< Record, @@ -56,5 +71,109 @@ export const PrincipalsRoutes: FastifyPluginAsync< } ); + fastify.get( + '/principals/:principal/transactions/:tx_id/balance-changes', + { + preHandler: handleTransactionCache, + schema: { + operationId: 'get_principal_transaction_balance_changes', + summary: 'Get principal transaction balance changes', + description: `Returns the balance changes for a principal's transaction`, + tags: ['Transactions'], + params: Type.Object({ principal: PrincipalSchema, tx_id: TransactionIdSchema }), + querystring: CursorPaginationQuerystring( + PrincipalTransactionBalanceChangeCursorSchema, + ResourceType.Tx + ), + response: { + 200: CursorPaginatedResponse( + PrincipalTransactionBalanceChangeSchema, + PrincipalTransactionBalanceChangeCursorSchema, + ResourceType.Tx + ), + }, + }, + }, + async (req, reply) => { + const results = await fastify.db.v3.getPrincipalTransactionBalanceChanges({ + principal: req.params.principal, + tx_id: req.params.tx_id, + limit: req.query.limit ?? getPagingQueryLimit(ResourceType.Tx), + cursor: req.query.cursor, + }); + await reply.send({ + limit: results.limit, + total: results.total, + cursor: { + next: results.next_cursor, + previous: results.prev_cursor, + current: results.current_cursor, + }, + results: results.results.map(r => serializePrincipalTransactionBalanceChange(r)), + }); + } + ); + + fastify.get( + '/principals/:principal/balance-changes', + { + preHandler: handlePrincipalCache, + // Accept both repeated (`?tx_id=A&tx_id=B`) and comma-separated (`?tx_id=A,B`) forms. + // The repeated form is already an array via Fastify's qs parser; this hook normalizes + // the comma-separated form. Mirrors the convention used by `/extended/v1/tx/multiple`. + preValidation: (req, _reply, done) => { + if (typeof req.query.tx_id === 'string') { + req.query.tx_id = (req.query.tx_id as string).split(',') as typeof req.query.tx_id; + } + done(); + }, + schema: { + operationId: 'get_principal_balance_changes', + summary: 'Get principal balance changes', + description: `Returns the balance changes for a principal across one or more transactions, as a single paginated flat array ordered by chain position descending then by asset.`, + tags: ['Transactions'], + params: Type.Object({ principal: PrincipalSchema }), + querystring: Type.Composite([ + CursorPaginationQuerystring(PrincipalBalanceChangeCursorSchema, ResourceType.Tx), + Type.Object({ + tx_id: Type.Array(TransactionIdSchema, { + minItems: 1, + maxItems: getPagingQueryLimit(ResourceType.Tx), + description: + 'Transaction IDs to query balance changes for. Provide as repeated ' + + 'querystring values (`?tx_id=A&tx_id=B`) or as a single comma-separated ' + + 'value (`?tx_id=A,B`).', + }), + }), + ]), + response: { + 200: CursorPaginatedResponse( + PrincipalBalanceChangeSchema, + PrincipalBalanceChangeCursorSchema, + ResourceType.Tx + ), + }, + }, + }, + async (req, reply) => { + const results = await fastify.db.v3.getPrincipalBalanceChanges({ + principal: req.params.principal, + tx_ids: req.query.tx_id, + limit: req.query.limit ?? getPagingQueryLimit(ResourceType.Tx), + cursor: req.query.cursor, + }); + await reply.send({ + limit: results.limit, + total: results.total, + cursor: { + next: results.next_cursor, + previous: results.prev_cursor, + current: results.current_cursor, + }, + results: results.results.map(r => serializePrincipalBalanceChange(r)), + }); + } + ); + await Promise.resolve(); }; diff --git a/src/api/schemas/v3/entities/principal-balance-changes.ts b/src/api/schemas/v3/entities/principal-balance-changes.ts new file mode 100644 index 000000000..5798cc7e0 --- /dev/null +++ b/src/api/schemas/v3/entities/principal-balance-changes.ts @@ -0,0 +1,43 @@ +import { Static, Type } from '@sinclair/typebox'; +import { TransactionIdSchema } from './common.js'; + +export const BalanceChangeSchema = Type.Object({ + sent: Type.String({ + description: 'Amount sent by the principal', + }), + received: Type.String({ + description: 'Amount received by the principal', + }), + net: Type.String({ + description: 'Net balance change for the principal', + }), +}); +export type BalanceChange = Static; + +export const PrincipalTransactionBalanceChangeSchema = Type.Object({ + asset: Type.Union([ + Type.Object({ + type: Type.Literal('stx'), + }), + Type.Object({ + type: Type.Union([Type.Literal('ft'), Type.Literal('nft')], { + description: 'The asset type that was affected by the balance change.', + }), + identifier: Type.String({ + description: 'The identifier of the asset that was affected by the balance change.', + }), + }), + ]), + balance_change: BalanceChangeSchema, +}); +export type PrincipalTransactionBalanceChange = Static< + typeof PrincipalTransactionBalanceChangeSchema +>; + +export const PrincipalBalanceChangeSchema = Type.Composite([ + Type.Object({ + tx_id: TransactionIdSchema, + }), + PrincipalTransactionBalanceChangeSchema, +]); +export type PrincipalBalanceChange = Static; diff --git a/src/api/schemas/v3/params.ts b/src/api/schemas/v3/params.ts new file mode 100644 index 000000000..3f5f4201f --- /dev/null +++ b/src/api/schemas/v3/params.ts @@ -0,0 +1,73 @@ +import { ObjectOptions, TSchema, Type } from '@sinclair/typebox'; +import { pagingQueryLimits, ResourceType } from '../../pagination.js'; +import { Nullable } from '../v1/util.js'; + +/** + * Cursor pagination querystring + * @param resource - Resource type to determine the default limit and max limit + * @param type - Type of the cursor to paginate by + * @returns Cursor pagination querystring + */ +export const CursorPaginationQuerystring = ( + resource: ResourceType, + type: T, + title?: string, + description?: string, + limitOverride?: number +) => + Type.Object({ + limit: Type.Optional( + Type.Integer({ + minimum: 0, + default: pagingQueryLimits[resource].defaultLimit, + maximum: limitOverride ?? pagingQueryLimits[resource].maxLimit, + title: title ?? 'Limit', + description: description ?? 'Results per page', + }) + ), + cursor: Type.Optional(type), + }); + +/** + * Cursor pagination response + * @param type - Type of the response object + * @param options - Options for the response + * @returns Cursor pagination response schema + */ +export const CursorPaginatedResponse = (type: T, options?: ObjectOptions) => + Type.Object( + { + total: Type.Integer({ examples: [1] }), + limit: Type.Integer({ examples: [20] }), + cursor: Type.Object({ + next: Nullable(Type.String({ description: 'Next page cursor' })), + previous: Nullable(Type.String({ description: 'Previous page cursor' })), + current: Nullable(Type.String({ description: 'Current page cursor' })), + }), + results: Type.Array(type), + }, + options + ); + +export const TransactionCursorSchema = Type.String({ + description: + 'Cursor for paginating transactions. Format: block_height:microblock_sequence:tx_index', + pattern: '^[0-9]+:[0-9]+:[0-9]+$', +}); + +export const PrincipalTransactionBalanceChangeCursorSchema = Type.String({ + description: + 'Cursor for paginating principal transaction balance changes. Format: ' + + '`:` where `asset_type` is a numeric tag ' + + '(1=STX, 2=FT, 3=NFT) and `asset_identifier` is `` for STX or a ' + + 'fully-qualified Clarity asset id such as `SP000…contract-name::asset-name` ' + + 'for FT/NFT.', + pattern: '^[0-9]+:\\S+$', +}); + +export const PrincipalBalanceChangeCursorSchema = Type.String({ + description: + 'Cursor for paginating principal balance changes across multiple transactions. ' + + 'Format: `::::`.', + pattern: '^[0-9]+:[0-9]+:[0-9]+:[0-9]+:\\S+$', +}); diff --git a/src/api/serializers/v3/balance-changes.ts b/src/api/serializers/v3/balance-changes.ts new file mode 100644 index 000000000..9b317cbe5 --- /dev/null +++ b/src/api/serializers/v3/balance-changes.ts @@ -0,0 +1,62 @@ +import { + PrincipalBalanceChange, + PrincipalTransactionBalanceChange, +} from '../../schemas/v3/entities/principal-balance-changes.js'; +import { DbPrincipalTransactionBalanceChange } from '../../../datastore/v3/types.js'; +import { DbAssetType } from '../../../datastore/common.js'; + +function serializeAssetType(type: DbAssetType): 'stx' | 'ft' | 'nft' { + switch (type) { + case DbAssetType.Stx: + return 'stx'; + case DbAssetType.Ft: + return 'ft'; + case DbAssetType.Nft: + return 'nft'; + default: + throw new Error(`Unexpected DbAssetType: ${type}`); + } +} + +/** + * Parses a database principal transaction balance change into a principal transaction balance + * change. + * @param change - The database principal transaction balance change to parse. + * @returns The parsed principal transaction balance change. + */ +export function serializePrincipalTransactionBalanceChange( + change: DbPrincipalTransactionBalanceChange +): PrincipalTransactionBalanceChange { + const assetType = serializeAssetType(change.asset_type); + return { + asset: + assetType === 'stx' + ? { + type: 'stx', + } + : { + type: assetType, + identifier: change.asset_identifier, + }, + balance_change: { + sent: change.sent, + received: change.received, + net: change.net, + }, + }; +} + +/** + * Parses a database principal transaction balance change into a principal balance change + * (the flattened batch shape that carries `tx_id` alongside the asset and balance fields). + * @param change - The database principal transaction balance change to parse. + * @returns The parsed principal balance change. + */ +export function serializePrincipalBalanceChange( + change: DbPrincipalTransactionBalanceChange +): PrincipalBalanceChange { + return { + tx_id: change.tx_id, + ...serializePrincipalTransactionBalanceChange(change), + }; +} diff --git a/src/api/serializers/v3/transactions.ts b/src/api/serializers/v3/transactions.ts index dcee0f396..a88f51692 100644 --- a/src/api/serializers/v3/transactions.ts +++ b/src/api/serializers/v3/transactions.ts @@ -12,11 +12,12 @@ import { } from '../../schemas/v3/entities/transaction-summaries.js'; import { DbMempoolTransaction, + DbPrincipalTransactionBalanceChange, DbPrincipalTransactionSummary, DbTransaction, DbTransactionSummary, } from '../../../datastore/v3/types.js'; -import { DbTxStatus, DbTxTypeId } from '../../../datastore/common.js'; +import { DbAssetType, DbTxStatus, DbTxTypeId } from '../../../datastore/common.js'; import { PrincipalTransactionSummary } from '../../schemas/v3/entities/principal-transactions.js'; import { BaseTransaction, @@ -38,6 +39,10 @@ import { } from '@stacks/codec'; import { serializePostCondition } from './post-conditions.js'; import { serializeDbMempoolTransaction } from './mempool-transactions.js'; +import { + PrincipalBalanceChange, + PrincipalTransactionBalanceChange, +} from '../../schemas/v3/entities/principal-balance-changes.js'; /** * Serializes a database transaction summary status into a transaction summary status. @@ -368,3 +373,59 @@ export function serializeDbTransactionOrMempoolTransaction( } return serializeDbMempoolTransaction(transaction, include); } + +function serializeAssetType(type: DbAssetType): 'stx' | 'ft' | 'nft' { + switch (type) { + case DbAssetType.Stx: + return 'stx'; + case DbAssetType.Ft: + return 'ft'; + case DbAssetType.Nft: + return 'nft'; + default: + throw new Error(`Unexpected DbAssetType: ${type}`); + } +} + +/** + * Parses a database principal transaction balance change into a principal transaction balance + * change. + * @param change - The database principal transaction balance change to parse. + * @returns The parsed principal transaction balance change. + */ +export function serializePrincipalTransactionBalanceChange( + change: DbPrincipalTransactionBalanceChange +): PrincipalTransactionBalanceChange { + const assetType = serializeAssetType(change.asset_type); + return { + asset: + assetType === 'stx' + ? { + type: 'stx', + } + : { + type: assetType, + identifier: change.asset_identifier, + }, + balance_change: { + sent: change.sent, + received: change.received, + net: change.net, + }, + }; +} + +/** + * Parses a database principal transaction balance change into a principal balance change + * (the flattened batch shape that carries `tx_id` alongside the asset and balance fields). + * @param change - The database principal transaction balance change to parse. + * @returns The parsed principal balance change. + */ +export function serializePrincipalBalanceChange( + change: DbPrincipalTransactionBalanceChange +): PrincipalBalanceChange { + return { + tx_id: change.tx_id, + ...serializePrincipalTransactionBalanceChange(change), + }; +} diff --git a/src/datastore/common.ts b/src/datastore/common.ts index 18aa7d369..2305b0887 100644 --- a/src/datastore/common.ts +++ b/src/datastore/common.ts @@ -576,6 +576,12 @@ export enum DbAssetEventTypeId { Burn = 3, } +export enum DbAssetType { + Stx = 1, + Ft = 2, + Nft = 3, +} + interface DbAssetEvent extends DbEventBase { asset_event_type_id: DbAssetEventTypeId; sender?: string; @@ -1683,8 +1689,8 @@ export interface PrincipalTxsInsertValues { microblock_sequence: number; tx_index: number; canonical: boolean; - stx_sent: bigint; - stx_received: bigint; + stx_sent: PgNumeric; + stx_received: PgNumeric; microblock_canonical: boolean; stx_balance_affected: boolean; ft_balance_affected: boolean; @@ -1698,6 +1704,23 @@ export interface PrincipalTxsInsertValues { nft_mint_event_count: number; nft_burn_event_count: number; nft_transfer_event_count: number; + balance_change_count: number; +} + +export interface PrincipalTxBalanceChangeInsertValues { + principal: string; + tx_id: PgBytea; + block_height: number; + index_block_hash: PgBytea; + microblock_hash: PgBytea; + microblock_sequence: number; + tx_index: number; + canonical: boolean; + microblock_canonical: boolean; + asset_type: DbAssetType; + asset_identifier: string; + sent: PgNumeric; + received: PgNumeric; } export interface RewardSlotHolderInsertValues { diff --git a/src/datastore/pg-write-store.ts b/src/datastore/pg-write-store.ts index 4ed3046fa..c9b0bea1b 100644 --- a/src/datastore/pg-write-store.ts +++ b/src/datastore/pg-write-store.ts @@ -63,7 +63,9 @@ import { PoxSetSignerValues, PoxCycleInsertValues, DbAssetEventTypeId, + DbAssetType, DbBurnBlockPoxTx, + PrincipalTxBalanceChangeInsertValues, } from './common.js'; import { BLOCK_COLUMNS, @@ -1394,6 +1396,8 @@ export class PgWriteStore extends PgStore { /** * Update the `principal_txs` table with the latest `tx_id`s that resulted in activity for a * principal (contract or address), and mark the type of token balance that was affected. + * Also populates the `principal_tx_balance_changes` table with one row per + * (principal, asset_type, asset_identifier) touched by each tx. * @param sql - DB client * @param txs - list of transactions */ @@ -1402,8 +1406,8 @@ export class PgWriteStore extends PgStore { stx: boolean; ft: boolean; nft: boolean; - stx_sent: bigint; - stx_received: bigint; + stx_sent: BigNumber; + stx_received: BigNumber; stx_mints: number; stx_burns: number; stx_transfers: number; @@ -1414,7 +1418,16 @@ export class PgWriteStore extends PgStore { nft_burns: number; nft_transfers: number; }; + type BalanceChangeRow = { + principal: string; + asset_type: DbAssetType; + asset_identifier: string; + sent: BigNumber; + received: BigNumber; + }; + const STX_ASSET_IDENTIFIER = 'stx'; const values: PrincipalTxsInsertValues[] = []; + const balanceChangeValues: PrincipalTxBalanceChangeInsertValues[] = []; for (const { tx, stxEvents, ftEvents, nftEvents } of txs) { // Mark principals who participated in this transaction, along with the type of token balance // they affected. @@ -1425,8 +1438,8 @@ export class PgWriteStore extends PgStore { stx: entry?.stx || data?.stx || false, ft: entry?.ft || data?.ft || false, nft: entry?.nft || data?.nft || false, - stx_sent: (entry?.stx_sent ?? 0n) + (data?.stx_sent ?? 0n), - stx_received: (entry?.stx_received ?? 0n) + (data?.stx_received ?? 0n), + stx_sent: (entry?.stx_sent ?? new BigNumber(0)).plus(data?.stx_sent ?? 0n), + stx_received: (entry?.stx_received ?? new BigNumber(0)).plus(data?.stx_received ?? 0n), stx_mints: (entry?.stx_mints ?? 0) + (data?.stx_mints ?? 0), stx_burns: (entry?.stx_burns ?? 0) + (data?.stx_burns ?? 0), stx_transfers: (entry?.stx_transfers ?? 0) + (data?.stx_transfers ?? 0), @@ -1439,6 +1452,28 @@ export class PgWriteStore extends PgStore { }); }; + // Per-asset balance changes for this tx, keyed by `${principal}|${asset_type}|${asset_id}`. + // Note: for NFTs we count tokens moved (each event contributes 1 to sent/received) since + // the schema stores numeric counts rather than the underlying token values. + const balanceChanges = new Map(); + const addBalanceChange = ( + principal: string, + asset_type: DbAssetType, + asset_identifier: string, + sent: BigNumber, + received: BigNumber + ) => { + const key = `${principal}|${asset_type}|${asset_identifier}`; + const entry = balanceChanges.get(key); + balanceChanges.set(key, { + principal, + asset_type, + asset_identifier, + sent: (entry?.sent ?? new BigNumber(0)).plus(sent), + received: (entry?.received ?? new BigNumber(0)).plus(received), + }); + }; + // Record participating principals. No amounts yet, that will be included in stx_events below. addPrincipal(tx.sender_address); if (tx.token_transfer_recipient_address) @@ -1447,72 +1482,196 @@ export class PgWriteStore extends PgStore { if (tx.smart_contract_contract_id) addPrincipal(tx.smart_contract_contract_id); // Record fee paid. - if (tx.sponsor_address) { - addPrincipal(tx.sponsor_address, { stx: true, stx_sent: BigInt(tx.fee_rate) }); - } else { - addPrincipal(tx.sender_address, { stx: true, stx_sent: BigInt(tx.fee_rate) }); - } + const feePayer = tx.sponsor_address ?? tx.sender_address; + const feeAmount = new BigNumber(tx.fee_rate); + addPrincipal(feePayer, { stx: true, stx_sent: feeAmount }); + addBalanceChange( + feePayer, + DbAssetType.Stx, + STX_ASSET_IDENTIFIER, + feeAmount, + new BigNumber(0) + ); // Record token amounts and event counts. for (const event of stxEvents) { switch (event.asset_event_type_id) { case DbAssetEventTypeId.Mint: - if (event.recipient) + if (event.recipient) { addPrincipal(event.recipient, { stx: true, - stx_received: event.amount, + stx_received: new BigNumber(event.amount), stx_mints: 1, }); + addBalanceChange( + event.recipient, + DbAssetType.Stx, + STX_ASSET_IDENTIFIER, + new BigNumber(0), + new BigNumber(event.amount) + ); + } break; case DbAssetEventTypeId.Burn: - if (event.sender) - addPrincipal(event.sender, { stx: true, stx_sent: event.amount, stx_burns: 1 }); + if (event.sender) { + addPrincipal(event.sender, { + stx: true, + stx_sent: new BigNumber(event.amount), + stx_burns: 1, + }); + addBalanceChange( + event.sender, + DbAssetType.Stx, + STX_ASSET_IDENTIFIER, + new BigNumber(event.amount), + new BigNumber(0) + ); + } break; case DbAssetEventTypeId.Transfer: - if (event.sender) - addPrincipal(event.sender, { stx: true, stx_sent: event.amount, stx_transfers: 1 }); - if (event.recipient) + if (event.sender) { + addPrincipal(event.sender, { + stx: true, + stx_sent: new BigNumber(event.amount), + stx_transfers: 1, + }); + addBalanceChange( + event.sender, + DbAssetType.Stx, + STX_ASSET_IDENTIFIER, + new BigNumber(event.amount), + new BigNumber(0) + ); + } + if (event.recipient) { addPrincipal(event.recipient, { stx: true, - stx_received: event.amount, + stx_received: new BigNumber(event.amount), stx_transfers: 1, }); + addBalanceChange( + event.recipient, + DbAssetType.Stx, + STX_ASSET_IDENTIFIER, + new BigNumber(0), + new BigNumber(event.amount) + ); + } break; } } for (const event of ftEvents) { switch (event.asset_event_type_id) { case DbAssetEventTypeId.Mint: - if (event.recipient) addPrincipal(event.recipient, { ft: true, ft_mints: 1 }); + if (event.recipient) { + addPrincipal(event.recipient, { ft: true, ft_mints: 1 }); + addBalanceChange( + event.recipient, + DbAssetType.Ft, + event.asset_identifier, + new BigNumber(0), + new BigNumber(event.amount) + ); + } break; case DbAssetEventTypeId.Burn: - if (event.sender) addPrincipal(event.sender, { ft: true, ft_burns: 1 }); + if (event.sender) { + addPrincipal(event.sender, { ft: true, ft_burns: 1 }); + addBalanceChange( + event.sender, + DbAssetType.Ft, + event.asset_identifier, + new BigNumber(event.amount), + new BigNumber(0) + ); + } break; case DbAssetEventTypeId.Transfer: - if (event.sender) addPrincipal(event.sender, { ft: true, ft_transfers: 1 }); - if (event.recipient) + if (event.sender) { + addPrincipal(event.sender, { ft: true, ft_transfers: 1 }); + addBalanceChange( + event.sender, + DbAssetType.Ft, + event.asset_identifier, + new BigNumber(event.amount), + new BigNumber(0) + ); + } + if (event.recipient) { addPrincipal(event.recipient, { ft: true, ft_transfers: 1, }); + addBalanceChange( + event.recipient, + DbAssetType.Ft, + event.asset_identifier, + new BigNumber(0), + new BigNumber(event.amount) + ); + } break; } } for (const event of nftEvents) { switch (event.asset_event_type_id) { case DbAssetEventTypeId.Mint: - if (event.recipient) addPrincipal(event.recipient, { nft: true, nft_mints: 1 }); + if (event.recipient) { + addPrincipal(event.recipient, { nft: true, nft_mints: 1 }); + addBalanceChange( + event.recipient, + DbAssetType.Nft, + event.asset_identifier, + new BigNumber(0), + new BigNumber(1) + ); + } break; case DbAssetEventTypeId.Burn: - if (event.sender) addPrincipal(event.sender, { nft: true, nft_burns: 1 }); + if (event.sender) { + addPrincipal(event.sender, { nft: true, nft_burns: 1 }); + addBalanceChange( + event.sender, + DbAssetType.Nft, + event.asset_identifier, + new BigNumber(1), + new BigNumber(0) + ); + } break; case DbAssetEventTypeId.Transfer: - if (event.sender) addPrincipal(event.sender, { nft: true, nft_transfers: 1 }); - if (event.recipient) addPrincipal(event.recipient, { nft: true, nft_transfers: 1 }); + if (event.sender) { + addPrincipal(event.sender, { nft: true, nft_transfers: 1 }); + addBalanceChange( + event.sender, + DbAssetType.Nft, + event.asset_identifier, + new BigNumber(1), + new BigNumber(0) + ); + } + if (event.recipient) { + addPrincipal(event.recipient, { nft: true, nft_transfers: 1 }); + addBalanceChange( + event.recipient, + DbAssetType.Nft, + event.asset_identifier, + new BigNumber(0), + new BigNumber(1) + ); + } break; } } + // Count balance change rows per principal so the principal_txs row carries + // `balance_change_count` — used by the API to know how many rows to expect from + // the drill-in endpoint without an extra COUNT(*) query. + const balanceChangeCounts = new Map(); + for (const row of balanceChanges.values()) { + balanceChangeCounts.set(row.principal, (balanceChangeCounts.get(row.principal) ?? 0) + 1); + } + for (const [principal, data] of principals.entries()) { values.push({ principal, @@ -1527,8 +1686,8 @@ export class PgWriteStore extends PgStore { stx_balance_affected: data.stx, ft_balance_affected: data.ft, nft_balance_affected: data.nft, - stx_sent: data.stx_sent, - stx_received: data.stx_received, + stx_sent: data.stx_sent.toFixed(), + stx_received: data.stx_received.toFixed(), stx_mint_event_count: data.stx_mints, stx_burn_event_count: data.stx_burns, stx_transfer_event_count: data.stx_transfers, @@ -1538,6 +1697,25 @@ export class PgWriteStore extends PgStore { nft_mint_event_count: data.nft_mints, nft_burn_event_count: data.nft_burns, nft_transfer_event_count: data.nft_transfers, + balance_change_count: balanceChangeCounts.get(principal) ?? 0, + }); + } + + for (const change of balanceChanges.values()) { + balanceChangeValues.push({ + principal: change.principal, + tx_id: tx.tx_id, + block_height: tx.block_height, + index_block_hash: tx.index_block_hash, + microblock_hash: tx.microblock_hash, + microblock_sequence: tx.microblock_sequence, + tx_index: tx.tx_index, + canonical: tx.canonical, + microblock_canonical: tx.microblock_canonical, + asset_type: change.asset_type, + asset_identifier: change.asset_identifier, + sent: change.sent.toFixed(), + received: change.received.toFixed(), }); } } @@ -1559,6 +1737,12 @@ export class PgWriteStore extends PgStore { ON CONFLICT (principal) DO UPDATE SET count = principal_tx_counts.count + EXCLUDED.count `; } + for (const batch of batchIterate(balanceChangeValues, INSERT_BATCH_SIZE)) { + await sql` + INSERT INTO principal_tx_balance_changes ${sql(batch)} + ON CONFLICT ON CONSTRAINT unique_principal_tx_balance_changes DO NOTHING + `; + } } async updateBatchZonefiles( @@ -3014,6 +3198,14 @@ export class PgWriteStore extends PgStore { AND (index_block_hash = ${args.indexBlockHash} OR index_block_hash = '\\x'::bytea) AND tx_id IN ${sql(txIds)} `; + await sql` + UPDATE principal_tx_balance_changes + SET microblock_canonical = ${args.isMicroCanonical}, + canonical = ${args.isCanonical}, index_block_hash = ${args.indexBlockHash} + WHERE microblock_hash IN ${sql(args.microblocks)} + AND (index_block_hash = ${args.indexBlockHash} OR index_block_hash = '\\x'::bytea) + AND tx_id IN ${sql(txIds)} + `; } // Update unanchored tx count in `chain_tip` table @@ -3467,6 +3659,13 @@ export class PgWriteStore extends PgStore { FROM count_deltas AS cd WHERE pc.principal = cd.principal `; + await sql` + UPDATE principal_tx_balance_changes + SET canonical = ${canonical} + WHERE tx_id IN ${sql(txs.map(t => t.txId))} + AND index_block_hash = ${indexBlockHash} + AND canonical != ${canonical} + `; } }); q.enqueue(async () => { diff --git a/src/datastore/v3/constants.ts b/src/datastore/v3/constants.ts index 3dc7d489f..c2cea663e 100644 --- a/src/datastore/v3/constants.ts +++ b/src/datastore/v3/constants.ts @@ -50,6 +50,22 @@ export const TX_COLUMNS = [ 'tenure_change_pubkey_hash', ]; +export const PRINCIPAL_TRANSACTION_BALANCE_CHANGE_COLUMNS = [ + 'principal', + 'tx_id', + 'block_height', + 'index_block_hash', + 'microblock_hash', + 'microblock_sequence', + 'tx_index', + 'canonical', + 'microblock_canonical', + 'asset_type', + 'asset_identifier', + 'sent', + 'received', +]; + export const MEMPOOL_TX_SUMMARY_COLUMNS = [ 'tx_id', 'type_id', diff --git a/src/datastore/v3/pg-store-v3.ts b/src/datastore/v3/pg-store-v3.ts index 3fb4a9503..ebbdf9e30 100644 --- a/src/datastore/v3/pg-store-v3.ts +++ b/src/datastore/v3/pg-store-v3.ts @@ -3,12 +3,14 @@ import { DbCursorPaginatedResult, DbMempoolTransaction, DbMempoolTransactionSummary, + DbPrincipalTransactionBalanceChange, DbPrincipalTransactionSummary, DbTransaction, DbTransactionEvent, DbTransactionSummary, } from './types.js'; import { + PRINCIPAL_TRANSACTION_BALANCE_CHANGE_COLUMNS, MEMPOOL_TX_COLUMNS, MEMPOOL_TX_SUMMARY_COLUMNS, TX_COLUMNS, @@ -419,6 +421,254 @@ export class PgStoreV3 extends BasePgStoreModule { }); } + /** + * Gets the balance changes for a principal's transaction. + * @param args - The arguments for the query. + * @returns The balance changes for the principal's transaction. + */ + async getPrincipalTransactionBalanceChanges(args: { + principal: string; + tx_id: string; + limit: number; + cursor?: string; + }): Promise> { + return await this.sqlTransaction(async sql => { + // Cursor format: `${asset_type}:${asset_identifier}`. We split on the *first* colon + // only because FT/NFT asset identifiers contain `::` internally (e.g. + // `SP000…contract-name::asset-name`); a naive split would over-split. The cursor is + // inclusive and points at the first row of the current page, matching the convention + // used by `getPrincipalTransactionSummaryList`. + let cursorFilter = sql``; + if (args.cursor) { + const colonIdx = args.cursor.indexOf(':'); + if (colonIdx > 0) { + const cursorAssetType = parseInt(args.cursor.substring(0, colonIdx), 10); + const cursorAssetIdentifier = args.cursor.substring(colonIdx + 1); + cursorFilter = sql` + AND (asset_type, asset_identifier) + >= (${cursorAssetType}, ${cursorAssetIdentifier}) + `; + } + } + + const resultQuery = await sql<(DbPrincipalTransactionBalanceChange & { total: number })[]>` + WITH total AS ( + SELECT balance_change_count + FROM principal_txs + WHERE principal = ${args.principal} + AND tx_id = ${args.tx_id} + AND canonical = true + AND microblock_canonical = true + ) + SELECT ${sql(PRINCIPAL_TRANSACTION_BALANCE_CHANGE_COLUMNS)}, + (received - sent) AS net, + (SELECT balance_change_count FROM total) AS total + FROM principal_tx_balance_changes + WHERE principal = ${args.principal} + AND tx_id = ${args.tx_id} + AND canonical = true + AND microblock_canonical = true + ${cursorFilter} + ORDER BY asset_type ASC, asset_identifier ASC + LIMIT ${args.limit + 1} + `; + + const hasNextPage = resultQuery.count > args.limit; + const results = hasNextPage ? resultQuery.slice(0, args.limit) : resultQuery; + const total = resultQuery.count > 0 ? resultQuery[0].total : 0; + + const peekResult = resultQuery[resultQuery.length - 1]; + const nextCursor = + hasNextPage && peekResult + ? `${peekResult.asset_type}:${peekResult.asset_identifier}` + : null; + + const firstResult = results[0]; + const currentCursor = firstResult + ? `${firstResult.asset_type}:${firstResult.asset_identifier}` + : null; + + let prevCursor: string | null = null; + if (firstResult) { + const prevPageQuery = await sql<{ asset_type: number; asset_identifier: string }[]>` + SELECT asset_type, asset_identifier + FROM principal_tx_balance_changes + WHERE principal = ${args.principal} + AND tx_id = ${args.tx_id} + AND canonical = true + AND microblock_canonical = true + AND (asset_type, asset_identifier) + < (${firstResult.asset_type}, ${firstResult.asset_identifier}) + ORDER BY asset_type DESC, asset_identifier DESC + OFFSET ${args.limit - 1} + LIMIT 1 + `; + if (prevPageQuery.length > 0) { + const prevPage = prevPageQuery[0]; + prevCursor = `${prevPage.asset_type}:${prevPage.asset_identifier}`; + } + } + + return { + limit: args.limit, + next_cursor: nextCursor, + prev_cursor: prevCursor, + current_cursor: currentCursor, + total, + results, + }; + }); + } + + /** + * Gets the balance changes for a principal across a batch of transactions, paginated as a + * single flat array ordered by chain position DESC (newest tx first) then by asset + * (STX, FT, NFT) ASC within each tx. + * @param args - The arguments for the query. + * @returns The paginated balance changes for the principal across the given tx ids. + */ + async getPrincipalBalanceChanges(args: { + principal: string; + tx_ids: string[]; + limit: number; + cursor?: string; + }): Promise> { + return await this.sqlTransaction(async sql => { + // Cursor format: `${block_height}:${microblock_sequence}:${tx_index}:${asset_type}:${asset_identifier}`. + // We walk the first 4 colons manually and treat everything after as the asset_identifier, + // because FT/NFT asset_identifier values contain `::` internally — a naive `split(':')` + // would over-split. The cursor is inclusive and points at the first row of the current + // page. + // + // The page direction is mixed: DESC by chain position, ASC by asset within a tx. SQL row + // comparison can only express one direction at a time, so the "row >= cursor in page + // order" predicate is expressed as a two-branch OR. + let cursorFilter = sql``; + if (args.cursor) { + const parts: string[] = []; + let idx = 0; + let valid = true; + for (let i = 0; i < 4; i++) { + const next = args.cursor.indexOf(':', idx); + if (next === -1) { + valid = false; + break; + } + parts.push(args.cursor.substring(idx, next)); + idx = next + 1; + } + if (valid) { + parts.push(args.cursor.substring(idx)); + const blockHeight = parseInt(parts[0], 10); + const microblockSequence = parseInt(parts[1], 10); + const txIndex = parseInt(parts[2], 10); + const cursorAssetType = parseInt(parts[3], 10); + const cursorAssetIdentifier = parts[4]; + cursorFilter = sql` + AND ( + (block_height, microblock_sequence, tx_index) + < (${blockHeight}, ${microblockSequence}, ${txIndex}) + OR ( + (block_height, microblock_sequence, tx_index) + = (${blockHeight}, ${microblockSequence}, ${txIndex}) + AND (asset_type, asset_identifier) + >= (${cursorAssetType}, ${cursorAssetIdentifier}) + ) + ) + `; + } + } + + const resultQuery = await sql<(DbPrincipalTransactionBalanceChange & { total: number })[]>` + WITH total AS ( + SELECT COALESCE(SUM(balance_change_count)::int, 0) AS count + FROM principal_txs + WHERE principal = ${args.principal} + AND tx_id IN ${sql(args.tx_ids)} + AND canonical = true + AND microblock_canonical = true + ) + SELECT ${sql(PRINCIPAL_TRANSACTION_BALANCE_CHANGE_COLUMNS)}, + (received - sent) AS net, + (SELECT count FROM total) AS total + FROM principal_tx_balance_changes + WHERE principal = ${args.principal} + AND tx_id IN ${sql(args.tx_ids)} + AND canonical = true + AND microblock_canonical = true + ${cursorFilter} + ORDER BY block_height DESC, microblock_sequence DESC, tx_index DESC, + asset_type ASC, asset_identifier ASC + LIMIT ${args.limit + 1} + `; + + const hasNextPage = resultQuery.count > args.limit; + const results = hasNextPage ? resultQuery.slice(0, args.limit) : resultQuery; + const total = resultQuery.count > 0 ? resultQuery[0].total : 0; + + const buildCursor = (row: DbPrincipalTransactionBalanceChange) => + `${row.block_height}:${row.microblock_sequence}:${row.tx_index}:${row.asset_type}:${row.asset_identifier}`; + + const peekResult = resultQuery[resultQuery.length - 1]; + const nextCursor = hasNextPage && peekResult ? buildCursor(peekResult) : null; + + const firstResult = results[0]; + const currentCursor = firstResult ? buildCursor(firstResult) : null; + + // Previous page: rows that come BEFORE firstResult in the forward direction. In our + // mixed DESC/ASC order that means a chain position later than firstResult, or the + // same tx with an earlier asset. Ordered in reverse direction (ASC chain + DESC + // asset) and offset by `limit - 1` so the returned row is the first row of the + // previous page. + let prevCursor: string | null = null; + if (firstResult) { + const prevPageQuery = await sql< + { + block_height: number; + microblock_sequence: number; + tx_index: number; + asset_type: number; + asset_identifier: string; + }[] + >` + SELECT block_height, microblock_sequence, tx_index, asset_type, asset_identifier + FROM principal_tx_balance_changes + WHERE principal = ${args.principal} + AND tx_id IN ${sql(args.tx_ids)} + AND canonical = true + AND microblock_canonical = true + AND ( + (block_height, microblock_sequence, tx_index) + > (${firstResult.block_height}, ${firstResult.microblock_sequence}, ${firstResult.tx_index}) + OR ( + (block_height, microblock_sequence, tx_index) + = (${firstResult.block_height}, ${firstResult.microblock_sequence}, ${firstResult.tx_index}) + AND (asset_type, asset_identifier) + < (${firstResult.asset_type}, ${firstResult.asset_identifier}) + ) + ) + ORDER BY block_height ASC, microblock_sequence ASC, tx_index ASC, + asset_type DESC, asset_identifier DESC + OFFSET ${args.limit - 1} + LIMIT 1 + `; + if (prevPageQuery.length > 0) { + const prevPage = prevPageQuery[0]; + prevCursor = `${prevPage.block_height}:${prevPage.microblock_sequence}:${prevPage.tx_index}:${prevPage.asset_type}:${prevPage.asset_identifier}`; + } + } + + return { + limit: args.limit, + next_cursor: nextCursor, + prev_cursor: prevCursor, + current_cursor: currentCursor, + total, + results, + }; + }); + } + /** * Gets the transaction by ID. Looks up in the canonical chain first, then the mempool. * Heavy columns (post conditions, contract source, decoded clarity inputs, raw result) diff --git a/src/datastore/v3/types.ts b/src/datastore/v3/types.ts index 8b00c7d47..9d83a64e4 100644 --- a/src/datastore/v3/types.ts +++ b/src/datastore/v3/types.ts @@ -1,4 +1,10 @@ -import { DbAssetEventTypeId, DbEventTypeId, DbTxStatus, DbTxTypeId } from '../common.js'; +import { + DbAssetEventTypeId, + DbAssetType, + DbEventTypeId, + DbTxStatus, + DbTxTypeId, +} from '../common.js'; export type DbCursorPaginatedResult = { limit: number; @@ -72,6 +78,23 @@ export interface DbPrincipalTransactionSummary extends DbTransactionSummary { involvement: DbPrincipalTransactionInvolvement; } +export interface DbPrincipalTransactionBalanceChange { + principal: string; + tx_id: string; + block_height: number; + index_block_hash: string; + microblock_hash: string; + microblock_sequence: number; + tx_index: number; + canonical: boolean; + microblock_canonical: boolean; + asset_type: DbAssetType; + asset_identifier: string; + sent: string; + received: string; + net: string; +} + export interface DbMempoolTransactionSummary { tx_id: string; type_id: DbTxTypeId; diff --git a/tests/api/v3/principals.test.ts b/tests/api/v3/principals.test.ts index 93d3ebcda..2e9bfdac2 100644 --- a/tests/api/v3/principals.test.ts +++ b/tests/api/v3/principals.test.ts @@ -518,4 +518,393 @@ describe('principals', () => { assert.equal(refreshed.statusCode, 304); }); }); + + describe('/v3/principals/:principal/transactions/:tx_id/balance-changes', () => { + test('should return an empty list', async () => { + const response = await api.fastifyApp.inject({ + method: 'GET', + url: `/extended/v3/principals/${emptyPrincipal}/transactions/${hex(0xdeadbeef)}/balance-changes`, + }); + assert.equal(response.statusCode, 200); + const body = JSON.parse(response.body); + assert.deepEqual(body, { + limit: 20, + total: 0, + cursor: { + next: null, + previous: null, + current: null, + }, + results: [], + }); + }); + + test('should return a list of balance changes with cursor pagination', async () => { + const response1 = await api.fastifyApp.inject({ + method: 'GET', + url: `/extended/v3/principals/${testAddr1}/transactions/${hex(3)}/balance-changes`, + }); + assert.equal(response1.statusCode, 200); + const body1 = JSON.parse(response1.body); + assert.deepEqual(body1, { + limit: 20, + total: 3, + cursor: { + next: null, + previous: null, + current: '1:stx', + }, + results: [ + { + asset: { + type: 'stx', + }, + balance_change: { + sent: '100050', + received: '0', + net: '-100050', + }, + }, + { + asset: { + type: 'ft', + identifier: + 'SP2H8PY27SEZ03MWRKS5XABZYQN17ETGQS3527SA5.newyorkcitycoin-token::newyorkcitycoin', + }, + balance_change: { + sent: '100000', + received: '0', + net: '-100000', + }, + }, + { + asset: { + type: 'nft', + identifier: 'SP3D6PV2ACBPEKYJTCMH7HEN02KP87QSP8KTEH335.Candies::candy', + }, + balance_change: { + sent: '1', + received: '0', + net: '-1', + }, + }, + ], + }); + + const response2 = await api.fastifyApp.inject({ + method: 'GET', + url: `/extended/v3/principals/${testAddr1}/transactions/${hex(3)}/balance-changes`, + query: { + limit: '1', + cursor: '1:stx', + }, + }); + assert.equal(response2.statusCode, 200); + const body2 = JSON.parse(response2.body); + assert.deepEqual(body2, { + limit: 1, + total: 3, + cursor: { + next: '2:SP2H8PY27SEZ03MWRKS5XABZYQN17ETGQS3527SA5.newyorkcitycoin-token::newyorkcitycoin', + previous: null, + current: '1:stx', + }, + results: [ + { + asset: { + type: 'stx', + }, + balance_change: { + sent: '100050', + received: '0', + net: '-100050', + }, + }, + ], + }); + + const response3 = await api.fastifyApp.inject({ + method: 'GET', + url: `/extended/v3/principals/${testAddr1}/transactions/${hex(3)}/balance-changes`, + query: { + limit: '1', + cursor: + '2:SP2H8PY27SEZ03MWRKS5XABZYQN17ETGQS3527SA5.newyorkcitycoin-token::newyorkcitycoin', + }, + }); + assert.equal(response3.statusCode, 200); + const body3 = JSON.parse(response3.body); + assert.deepEqual(body3, { + limit: 1, + total: 3, + cursor: { + next: '3:SP3D6PV2ACBPEKYJTCMH7HEN02KP87QSP8KTEH335.Candies::candy', + previous: '1:stx', + current: + '2:SP2H8PY27SEZ03MWRKS5XABZYQN17ETGQS3527SA5.newyorkcitycoin-token::newyorkcitycoin', + }, + results: [ + { + asset: { + type: 'ft', + identifier: + 'SP2H8PY27SEZ03MWRKS5XABZYQN17ETGQS3527SA5.newyorkcitycoin-token::newyorkcitycoin', + }, + balance_change: { + sent: '100000', + received: '0', + net: '-100000', + }, + }, + ], + }); + }); + + test('should return 304 when ETag matches and refresh ETag per transaction', async () => { + // The balance-changes-by-tx endpoint uses the per-transaction ETag, so the cache key + // is scoped to (principal, tx_id). + const first = await api.fastifyApp.inject({ + method: 'GET', + url: `/extended/v3/principals/${testAddr1}/transactions/${hex(3)}/balance-changes`, + }); + assert.equal(first.statusCode, 200); + const etag = first.headers['etag']; + assert.ok(etag, 'expected ETag header to be set'); + + // Same ETag returns 304 with an empty body. + const cached = await api.fastifyApp.inject({ + method: 'GET', + url: `/extended/v3/principals/${testAddr1}/transactions/${hex(3)}/balance-changes`, + headers: { 'if-none-match': etag as string }, + }); + assert.equal(cached.statusCode, 304); + assert.equal(cached.body, ''); + + // A stale ETag returns 200 with the current data and ETag. + const stale = await api.fastifyApp.inject({ + method: 'GET', + url: `/extended/v3/principals/${testAddr1}/transactions/${hex(3)}/balance-changes`, + headers: { 'if-none-match': '"0xdeadbeef"' }, + }); + assert.equal(stale.statusCode, 200); + assert.equal(stale.headers['etag'], etag); + + // A different tx_id returns a distinct ETag and does not 304 against tx hex(3)'s ETag. + const otherTx = await api.fastifyApp.inject({ + method: 'GET', + url: `/extended/v3/principals/${testAddr1}/transactions/${hex(1)}/balance-changes`, + headers: { 'if-none-match': etag as string }, + }); + assert.equal(otherTx.statusCode, 200); + assert.ok(otherTx.headers['etag']); + assert.notEqual(otherTx.headers['etag'], etag); + }); + }); + + describe('/v3/principals/:principal/balance-changes', () => { + test('should require at least one tx_id', async () => { + const response = await api.fastifyApp.inject({ + method: 'GET', + url: `/extended/v3/principals/${testAddr1}/balance-changes`, + }); + assert.equal(response.statusCode, 400); + }); + + test('should return an empty list when the principal has no activity on the requested txs', async () => { + const response = await api.fastifyApp.inject({ + method: 'GET', + url: `/extended/v3/principals/${emptyPrincipal}/balance-changes`, + query: { tx_id: hex(3) }, + }); + assert.equal(response.statusCode, 200); + const body = JSON.parse(response.body); + assert.deepEqual(body, { + limit: 20, + total: 0, + cursor: { + next: null, + previous: null, + current: null, + }, + results: [], + }); + }); + + test('should return balance changes across multiple txs ordered by chain position desc then asset asc', async () => { + // testAddr1 has activity on: + // - hex(1): coinbase in block 1 → stx fee only + // - hex(3): token transfer in block 2 → stx + ft + nft + const response = await api.fastifyApp.inject({ + method: 'GET', + url: `/extended/v3/principals/${testAddr1}/balance-changes`, + query: { tx_id: [hex(1), hex(3)] }, + }); + assert.equal(response.statusCode, 200); + const body = JSON.parse(response.body); + assert.equal(body.limit, 20); + assert.equal(body.total, 4); + assert.equal(body.results.length, 4); + assert.deepEqual(body.cursor, { + next: null, + previous: null, + current: '2:0:3:1:stx', + }); + assert.deepEqual(body.results, [ + { + tx_id: hex(3), + asset: { type: 'stx' }, + balance_change: { sent: '100050', received: '0', net: '-100050' }, + }, + { + tx_id: hex(3), + asset: { + type: 'ft', + identifier: + 'SP2H8PY27SEZ03MWRKS5XABZYQN17ETGQS3527SA5.newyorkcitycoin-token::newyorkcitycoin', + }, + balance_change: { sent: '100000', received: '0', net: '-100000' }, + }, + { + tx_id: hex(3), + asset: { + type: 'nft', + identifier: 'SP3D6PV2ACBPEKYJTCMH7HEN02KP87QSP8KTEH335.Candies::candy', + }, + balance_change: { sent: '1', received: '0', net: '-1' }, + }, + { + tx_id: hex(1), + asset: { type: 'stx' }, + balance_change: { sent: '50', received: '0', net: '-50' }, + }, + ]); + }); + + test('should accept comma-separated tx_id values', async () => { + const response = await api.fastifyApp.inject({ + method: 'GET', + url: `/extended/v3/principals/${testAddr1}/balance-changes`, + query: { tx_id: `${hex(1)},${hex(3)}` }, + }); + assert.equal(response.statusCode, 200); + const body = JSON.parse(response.body); + assert.equal(body.total, 4); + assert.equal(body.results.length, 4); + assert.equal(body.results[0].tx_id, hex(3)); + assert.equal(body.results[3].tx_id, hex(1)); + }); + + test('should allow cursor pagination', async () => { + // First page: limit 2 → first two entries of tx hex(3). + const page1 = await api.fastifyApp.inject({ + method: 'GET', + url: `/extended/v3/principals/${testAddr1}/balance-changes`, + query: { tx_id: [hex(1), hex(3)], limit: '2' }, + }); + assert.equal(page1.statusCode, 200); + const body1 = JSON.parse(page1.body); + assert.equal(body1.total, 4); + assert.equal(body1.limit, 2); + assert.equal(body1.results.length, 2); + assert.equal(body1.results[0].tx_id, hex(3)); + assert.equal(body1.results[0].asset.type, 'stx'); + assert.equal(body1.results[1].asset.type, 'ft'); + assert.deepEqual(body1.cursor, { + next: '2:0:3:3:SP3D6PV2ACBPEKYJTCMH7HEN02KP87QSP8KTEH335.Candies::candy', + previous: null, + current: '2:0:3:1:stx', + }); + + // Second page: starts at the nft of hex(3), then crosses over to the stx of hex(1). + const page2 = await api.fastifyApp.inject({ + method: 'GET', + url: `/extended/v3/principals/${testAddr1}/balance-changes`, + query: { + tx_id: [hex(1), hex(3)], + limit: '2', + cursor: '2:0:3:3:SP3D6PV2ACBPEKYJTCMH7HEN02KP87QSP8KTEH335.Candies::candy', + }, + }); + assert.equal(page2.statusCode, 200); + const body2 = JSON.parse(page2.body); + assert.equal(body2.results.length, 2); + assert.equal(body2.results[0].tx_id, hex(3)); + assert.equal(body2.results[0].asset.type, 'nft'); + assert.equal(body2.results[1].tx_id, hex(1)); + assert.equal(body2.results[1].asset.type, 'stx'); + assert.deepEqual(body2.cursor, { + next: null, + previous: '2:0:3:1:stx', + current: '2:0:3:3:SP3D6PV2ACBPEKYJTCMH7HEN02KP87QSP8KTEH335.Candies::candy', + }); + }); + + test('should return 304 when ETag matches and refresh ETag on new principal activity', async () => { + // This endpoint uses the principal cache, so the ETag tracks the principal's last + // confirmed activity — independent of the requested tx_id batch. + const first = await api.fastifyApp.inject({ + method: 'GET', + url: `/extended/v3/principals/${testAddr1}/balance-changes`, + query: { tx_id: hex(3) }, + }); + assert.equal(first.statusCode, 200); + const etag = first.headers['etag']; + assert.ok(etag, 'expected ETag header to be set'); + + // Same ETag returns 304 with an empty body. + const cached = await api.fastifyApp.inject({ + method: 'GET', + url: `/extended/v3/principals/${testAddr1}/balance-changes`, + query: { tx_id: hex(3) }, + headers: { 'if-none-match': etag as string }, + }); + assert.equal(cached.statusCode, 304); + assert.equal(cached.body, ''); + + // A stale ETag returns 200 with the current data and ETag. + const stale = await api.fastifyApp.inject({ + method: 'GET', + url: `/extended/v3/principals/${testAddr1}/balance-changes`, + query: { tx_id: hex(3) }, + headers: { 'if-none-match': '"0xdeadbeef"' }, + }); + assert.equal(stale.statusCode, 200); + assert.equal(stale.headers['etag'], etag); + + // New confirmed activity for testAddr1 invalidates its ETag. + await db.update( + new TestBlockBuilder({ + block_height: 3, + block_hash: hex(3), + index_block_hash: hex(3), + parent_index_block_hash: hex(2), + parent_block_hash: hex(2), + }) + .addTx({ + tx_id: hex(0x1001), + fee_rate: 50n, + block_hash: hex(3), + index_block_hash: hex(3), + block_time: 3000, + burn_block_height: 3, + burn_block_time: 3000, + type_id: DbTxTypeId.TokenTransfer, + status: DbTxStatus.Success, + sender_address: testAddr1, + nonce: 100, + }) + .build() + ); + const afterActivity = await api.fastifyApp.inject({ + method: 'GET', + url: `/extended/v3/principals/${testAddr1}/balance-changes`, + query: { tx_id: hex(3) }, + headers: { 'if-none-match': etag as string }, + }); + assert.equal(afterActivity.statusCode, 200); + const newEtag = afterActivity.headers['etag']; + assert.ok(newEtag); + assert.notEqual(newEtag, etag); + }); + }); });