diff --git a/src/integration/blockchain/icp/dto/icp.dto.ts b/src/integration/blockchain/icp/dto/icp.dto.ts index ea4870a1bc..4eeed3cead 100644 --- a/src/integration/blockchain/icp/dto/icp.dto.ts +++ b/src/integration/blockchain/icp/dto/icp.dto.ts @@ -67,12 +67,29 @@ export interface CandidIcrcGetTransactionsResponse { // --- Rosetta API response types --- -export interface RosettaTransaction { +export interface RosettaTransactionsResponse { + transactions: RosettaTransactionEntry[]; + total_count: number; + next_offset?: number; +} + +export interface RosettaTransactionEntry { + block_identifier: { index: number; hash: string }; transaction: { - operations: { status: string }[]; + transaction_identifier: { hash: string }; + operations: RosettaOperation[]; + metadata: { block_height: number; memo: number; timestamp: number }; }; } +export interface RosettaOperation { + operation_identifier: { index: number }; + type: string; + status: string; + account: { address: string }; + amount?: { value: string; currency: { symbol: string; decimals: number } }; +} + // --- Typed raw ledger interfaces (for Actor.createActor results) --- export interface IcpNativeRawLedger { diff --git a/src/integration/blockchain/icp/icp-client.ts b/src/integration/blockchain/icp/icp-client.ts index 8afae40f51..d9f29bdb6f 100644 --- a/src/integration/blockchain/icp/icp-client.ts +++ b/src/integration/blockchain/icp/icp-client.ts @@ -18,12 +18,14 @@ import { IcpTransfer, IcpTransferQueryResult, IcrcRawLedger, - RosettaTransaction, + RosettaTransactionsResponse, } from './dto/icp.dto'; import { InternetComputerWallet } from './icp-wallet'; import { icpNativeLedgerIdlFactory, icrcLedgerIdlFactory } from './icp.idl'; import { InternetComputerUtil } from './icp.util'; +const ROSETTA_NETWORK_ID = { blockchain: 'Internet Computer', network: '00000000000000020101' }; + export class InternetComputerClient extends BlockchainClient { private readonly logger = new DfxLogger(InternetComputerClient); @@ -206,6 +208,44 @@ export class InternetComputerClient extends BlockchainClient { }; } + // --- Per-address transfers via Rosetta /search/transactions --- + + async getNativeTransfersForAddress(accountIdentifier: string, maxBlock?: number, limit = 50): Promise { + const url = `${this.rosettaApiUrl}/search/transactions`; + + const body: Record = { + network_identifier: ROSETTA_NETWORK_ID, + account_identifier: { address: accountIdentifier }, + limit, + }; + if (maxBlock !== undefined) body.max_block = maxBlock; + + const response = await this.http.post(url, body); + + const transfers: IcpTransfer[] = []; + + for (const entry of response.transactions) { + const ops = entry.transaction.operations.filter((op) => op.type === 'TRANSACTION' && op.status === 'COMPLETED'); + + const creditOp = ops.find((op) => op.amount && BigInt(op.amount.value) > 0n); + const debitOp = ops.find((op) => op.amount && BigInt(op.amount.value) < 0n); + + if (!creditOp?.amount || !debitOp?.amount) continue; + + transfers.push({ + blockIndex: entry.block_identifier.index, + from: debitOp.account.address, + to: creditOp.account.address, + amount: InternetComputerUtil.fromSmallestUnit(BigInt(creditOp.amount.value)), + fee: 0, + memo: BigInt(entry.transaction.metadata.memo), + timestamp: Math.floor(entry.transaction.metadata.timestamp / 1_000_000_000), + }); + } + + return transfers; + } + // --- Block height & transfers (ICRC-3, for ck-tokens) --- async getIcrcBlockHeight(canisterId: string): Promise { @@ -285,8 +325,8 @@ export class InternetComputerClient extends BlockchainClient { private async isTxHashComplete(txHash: string): Promise { const url = `${this.rosettaApiUrl}/search/transactions`; - const response = await this.http.post<{ transactions: RosettaTransaction[] }>(url, { - network_identifier: { blockchain: 'Internet Computer', network: '00000000000000020101' }, + const response = await this.http.post(url, { + network_identifier: ROSETTA_NETWORK_ID, transaction_identifier: { hash: txHash }, }); diff --git a/src/integration/blockchain/icp/services/icp.service.ts b/src/integration/blockchain/icp/services/icp.service.ts index 480e015270..733efe6275 100644 --- a/src/integration/blockchain/icp/services/icp.service.ts +++ b/src/integration/blockchain/icp/services/icp.service.ts @@ -9,7 +9,7 @@ import nacl from 'tweetnacl'; import { WalletAccount } from '../../shared/evm/domain/wallet-account'; import { SignatureException } from '../../shared/exceptions/signature.exception'; import { BlockchainService } from '../../shared/util/blockchain.service'; -import { IcpTransferQueryResult } from '../dto/icp.dto'; +import { IcpTransfer, IcpTransferQueryResult } from '../dto/icp.dto'; import { InternetComputerClient } from '../icp-client'; @Injectable() @@ -96,6 +96,14 @@ export class InternetComputerService extends BlockchainService { return this.client.getTransfers(start, count); } + async getNativeTransfersForAddress( + accountIdentifier: string, + maxBlock?: number, + limit?: number, + ): Promise { + return this.client.getNativeTransfersForAddress(accountIdentifier, maxBlock, limit); + } + async getIcrcBlockHeight(canisterId: string): Promise { return this.client.getIcrcBlockHeight(canisterId); } diff --git a/src/subdomains/supporting/payin/services/payin-icp.service.ts b/src/subdomains/supporting/payin/services/payin-icp.service.ts index 90cd5ad9c0..f67b0e952f 100644 --- a/src/subdomains/supporting/payin/services/payin-icp.service.ts +++ b/src/subdomains/supporting/payin/services/payin-icp.service.ts @@ -1,5 +1,5 @@ import { Injectable } from '@nestjs/common'; -import { IcpTransferQueryResult } from 'src/integration/blockchain/icp/dto/icp.dto'; +import { IcpTransfer, IcpTransferQueryResult } from 'src/integration/blockchain/icp/dto/icp.dto'; import { InternetComputerService } from 'src/integration/blockchain/icp/services/icp.service'; import { Asset } from 'src/shared/models/asset/asset.entity'; @@ -19,6 +19,14 @@ export class PayInInternetComputerService { return this.internetComputerService.getTransfers(start, count); } + async getNativeTransfersForAddress( + accountIdentifier: string, + maxBlock?: number, + limit?: number, + ): Promise { + return this.internetComputerService.getNativeTransfersForAddress(accountIdentifier, maxBlock, limit); + } + async getIcrcBlockHeight(canisterId: string): Promise { return this.internetComputerService.getIcrcBlockHeight(canisterId); } diff --git a/src/subdomains/supporting/payin/strategies/register/__tests__/register.registry.spec.ts b/src/subdomains/supporting/payin/strategies/register/__tests__/register.registry.spec.ts index 35ac86d54d..914b3c34c8 100644 --- a/src/subdomains/supporting/payin/strategies/register/__tests__/register.registry.spec.ts +++ b/src/subdomains/supporting/payin/strategies/register/__tests__/register.registry.spec.ts @@ -7,6 +7,7 @@ import { TronService } from 'src/integration/blockchain/tron/services/tron.servi import { TatumWebhookService } from 'src/integration/tatum/services/tatum-webhook.service'; import { createCustomAsset } from 'src/shared/models/asset/__mocks__/asset.entity.mock'; import { RepositoryFactory } from 'src/shared/repositories/repository.factory'; +import { TransactionRequestService } from 'src/subdomains/supporting/payment/services/transaction-request.service'; import { PayInBitcoinService } from '../../../services/payin-bitcoin.service'; import { PayInInternetComputerService } from '../../../services/payin-icp.service'; import { PayInMoneroService } from '../../../services/payin-monero.service'; @@ -81,7 +82,7 @@ describe('RegisterStrategyRegistry', () => { (ConfigModule as Record).Config = { payment: { internetComputerSeed: 'test' } }; jest.spyOn(InternetComputerUtil, 'createWallet').mockReturnValue({ address: 'test-principal' } as never); jest.spyOn(InternetComputerUtil, 'accountIdentifier').mockReturnValue('test-account-id'); - icpStrategy = new IcpStrategy(mock()); + icpStrategy = new IcpStrategy(mock(), mock()); registry = new RegisterStrategyRegistryWrapper( bitcoinStrategy, diff --git a/src/subdomains/supporting/payin/strategies/register/impl/icp.strategy.ts b/src/subdomains/supporting/payin/strategies/register/impl/icp.strategy.ts index 45e2314d38..31f1a70e3c 100644 --- a/src/subdomains/supporting/payin/strategies/register/impl/icp.strategy.ts +++ b/src/subdomains/supporting/payin/strategies/register/impl/icp.strategy.ts @@ -1,41 +1,34 @@ -import { Inject, Injectable } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { CronExpression } from '@nestjs/schedule'; import { Config } from 'src/config/config'; -import { IcpTransfer } from 'src/integration/blockchain/icp/dto/icp.dto'; import { InternetComputerUtil } from 'src/integration/blockchain/icp/icp.util'; import { Blockchain } from 'src/integration/blockchain/shared/enums/blockchain.enum'; -import { Asset } from 'src/shared/models/asset/asset.entity'; import { BlockchainAddress } from 'src/shared/models/blockchain-address'; import { DfxLogger } from 'src/shared/services/dfx-logger'; import { Process } from 'src/shared/services/process.service'; import { DfxCron } from 'src/shared/utils/cron'; -import { DepositService } from 'src/subdomains/supporting/address-pool/deposit/deposit.service'; -import { Like } from 'typeorm'; +import { Util } from 'src/shared/utils/util'; +import { TransactionRequestService } from 'src/subdomains/supporting/payment/services/transaction-request.service'; +import { Not, Like } from 'typeorm'; import { PayInType } from '../../../entities/crypto-input.entity'; import { PayInEntry } from '../../../interfaces'; import { PayInInternetComputerService } from '../../../services/payin-icp.service'; -import { PollingStrategy } from './base/polling.strategy'; - -const BATCH_SIZE = 1000; +import { RegisterStrategy } from './base/register.strategy'; @Injectable() -export class InternetComputerStrategy extends PollingStrategy { +export class InternetComputerStrategy extends RegisterStrategy { protected readonly logger = new DfxLogger(InternetComputerStrategy); - @Inject() private readonly depositService: DepositService; - - private lastProcessedBlock: number | null = null; - private readonly lastProcessedTokenBlocks: Map = new Map(); - private readonly paymentAddress: string; - private readonly paymentAccountIdentifier: string | undefined; - constructor(private readonly payInInternetComputerService: PayInInternetComputerService) { + constructor( + private readonly payInInternetComputerService: PayInInternetComputerService, + private readonly transactionRequestService: TransactionRequestService, + ) { super(); const wallet = InternetComputerUtil.createWallet({ seed: Config.payment.internetComputerSeed, index: 0 }); this.paymentAddress = wallet.address; - this.paymentAccountIdentifier = InternetComputerUtil.accountIdentifier(wallet.address); } get blockchain(): Blockchain { @@ -43,192 +36,93 @@ export class InternetComputerStrategy extends PollingStrategy { } //*** JOBS ***// - @DfxCron(CronExpression.EVERY_SECOND, { process: Process.PAY_IN, timeout: 7200 }) + @DfxCron(CronExpression.EVERY_MINUTE, { process: Process.PAY_IN, timeout: 7200 }) async checkPayInEntries(): Promise { - await super.checkPayInEntries(); - await this.processTokenPayInEntries(); - } - - //*** HELPER METHODS ***// - protected async getBlockHeight(): Promise { - return this.payInInternetComputerService.getBlockHeight(); - } - - protected async processNewPayInEntries(): Promise { - const log = this.createNewLogObject(); - - const lastProcessed = await this.getLastProcessedBlock(); - const start = lastProcessed + 1; - - const result = await this.payInInternetComputerService.getTransfers(start, BATCH_SIZE); - - if (result.lastBlockIndex >= start) { - this.lastProcessedBlock = result.lastBlockIndex; - } + const activeDepositAddresses = await this.transactionRequestService.getActiveDepositAddresses( + Util.hoursBefore(1), + this.blockchain, + ); - if (result.transfers.length > 0) { - // query_blocks returns AccountIdentifier hex — match via computed AccountIdentifiers - const accountIdToDeposit = await this.getDepositAccountIdentifierMap(); + if (this.paymentAddress) activeDepositAddresses.push(this.paymentAddress); - // Add payment address to the map (if configured) - if (this.paymentAddress && this.paymentAccountIdentifier) { - accountIdToDeposit.set(this.paymentAccountIdentifier, this.paymentAddress); - } - - const ownAccountId = this.getOwnWalletAccountIdentifier(); - const relevantTransfers = result.transfers.filter((t) => accountIdToDeposit.has(t.to) && t.from !== ownAccountId); + await this.processNewPayInEntries(activeDepositAddresses.map((a) => BlockchainAddress.create(a, this.blockchain))); + } - if (relevantTransfers.length > 0) { - const entries = await this.mapToPayInEntries(relevantTransfers, accountIdToDeposit); - await this.createPayInsAndSave(entries, log); - } - } + async pollAddress(depositAddress: BlockchainAddress, fromBlock?: number, toBlock?: number): Promise { + if (depositAddress.blockchain !== this.blockchain) + throw new Error(`Invalid blockchain: ${depositAddress.blockchain}`); - this.printInputLog(log, 'omitted', this.blockchain); + return this.processNewPayInEntries([depositAddress], fromBlock, toBlock); } - private async processTokenPayInEntries(): Promise { + //*** HELPER METHODS ***// + private async processNewPayInEntries( + depositAddresses: BlockchainAddress[], + fromBlock?: number, + toBlock?: number, + ): Promise { const log = this.createNewLogObject(); - const tokenAssets = await this.assetService.getTokens(this.blockchain); - const depositPrincipals = await this.getDepositPrincipalSet(); - - // Add payment address to the set (if configured) - if (this.paymentAddress) depositPrincipals.add(this.paymentAddress); - - const ownWalletPrincipal = this.payInInternetComputerService.getWalletAddress(); - - for (const tokenAsset of tokenAssets) { - if (!tokenAsset.chainId) continue; - - try { - const currentHeight = await this.payInInternetComputerService.getIcrcBlockHeight(tokenAsset.chainId); - const lastIndex = await this.getLastProcessedTokenBlock(tokenAsset.chainId); - if (lastIndex >= currentHeight) continue; - - const result = await this.payInInternetComputerService.getIcrcTransfers( - tokenAsset.chainId, - tokenAsset.decimals, - lastIndex + 1, - BATCH_SIZE, - ); - - if (result.lastBlockIndex >= lastIndex + 1) { - this.lastProcessedTokenBlocks.set(tokenAsset.chainId, result.lastBlockIndex); - } - if (result.transfers.length > 0) { - const relevant = result.transfers.filter((t) => depositPrincipals.has(t.to) && t.from !== ownWalletPrincipal); + const newEntries = await this.getNativeEntries(depositAddresses, fromBlock, toBlock); - if (relevant.length > 0) { - const entries = this.mapTokenTransfers(relevant, tokenAsset); - await this.createPayInsAndSave(entries, log); - } - } - } catch (e) { - this.logger.error(`Failed to process token ${tokenAsset.uniqueName}:`, e); - } + if (newEntries.length) { + await this.createPayInsAndSave(newEntries, log); } this.printInputLog(log, 'omitted', this.blockchain); } - private async getLastProcessedBlock(): Promise { - if (this.lastProcessedBlock !== null) return this.lastProcessedBlock; - - const lastPayIn = await this.payInRepository.findOne({ - select: { id: true, blockHeight: true }, - where: { address: { blockchain: this.blockchain } }, - order: { blockHeight: 'DESC' }, - loadEagerRelations: false, - }); - - if (lastPayIn?.blockHeight) { - this.lastProcessedBlock = lastPayIn.blockHeight; - return this.lastProcessedBlock; - } - - this.lastProcessedBlock = await this.payInInternetComputerService.getBlockHeight(); - return this.lastProcessedBlock; - } - - private async getLastProcessedTokenBlock(canisterId: string): Promise { - const cached = this.lastProcessedTokenBlocks.get(canisterId); - if (cached !== undefined) return cached; - - // Check DB for last processed token block (token txIds have format "canisterId:blockIndex") - const lastPayIn = await this.payInRepository.findOne({ - select: { id: true, blockHeight: true }, - where: { address: { blockchain: this.blockchain }, inTxId: Like(`${canisterId}:%`) }, - order: { blockHeight: 'DESC' }, - loadEagerRelations: false, - }); - - if (lastPayIn?.blockHeight) { - this.lastProcessedTokenBlocks.set(canisterId, lastPayIn.blockHeight); - return lastPayIn.blockHeight; - } - - const blockHeight = await this.payInInternetComputerService.getIcrcBlockHeight(canisterId); - this.lastProcessedTokenBlocks.set(canisterId, blockHeight); - return blockHeight; - } - - private getOwnWalletAccountIdentifier(): string { - const walletPrincipal = this.payInInternetComputerService.getWalletAddress(); - return InternetComputerUtil.accountIdentifier(walletPrincipal); - } + // --- Native ICP (Rosetta per-address history) --- // + private async getNativeEntries( + depositAddresses: BlockchainAddress[], + fromBlock?: number, + toBlock?: number, + ): Promise { + const ownAccountId = InternetComputerUtil.accountIdentifier(this.payInInternetComputerService.getWalletAddress()); + const asset = await this.assetService.getNativeAsset(this.blockchain); - private async getDepositAccountIdentifierMap(): Promise> { - const deposits = await this.depositService.getUsedDepositsByBlockchain(this.blockchain); - const map = new Map(); + const entries: PayInEntry[] = []; - for (const deposit of deposits) { + for (const da of depositAddresses) { try { - const accountId = InternetComputerUtil.accountIdentifier(deposit.address); - map.set(accountId, deposit.address); + const accountId = InternetComputerUtil.accountIdentifier(da.address); + const lastBlock = fromBlock ?? (await this.getLastCheckedNativeBlockHeight(da)) + 1; + + const transfers = await this.payInInternetComputerService.getNativeTransfersForAddress(accountId); + + for (const transfer of transfers) { + if (transfer.blockIndex < lastBlock) continue; + if (toBlock !== undefined && transfer.blockIndex > toBlock) continue; + if (transfer.from === ownAccountId) continue; + + entries.push({ + senderAddresses: transfer.from, + receiverAddress: BlockchainAddress.create(da.address, this.blockchain), + txId: transfer.blockIndex.toString(), + txType: this.getTxType(da.address), + blockHeight: transfer.blockIndex, + amount: transfer.amount, + asset, + }); + } } catch (e) { - this.logger.error(`Invalid Principal in deposit ${deposit.id}: ${deposit.address}`, e); + this.logger.error(`Failed to fetch native transfers for ${da.address}:`, e); } } - return map; - } - - private async getDepositPrincipalSet(): Promise> { - const deposits = await this.depositService.getUsedDepositsByBlockchain(this.blockchain); - return new Set(deposits.map((d) => d.address)); - } - - private async mapToPayInEntries( - transfers: IcpTransfer[], - accountIdToDeposit: Map, - ): Promise { - const asset = await this.assetService.getNativeAsset(this.blockchain); - - return transfers.map((t) => { - const resolvedAddress = accountIdToDeposit.get(t.to) ?? t.to; - return { - senderAddresses: t.from, - receiverAddress: BlockchainAddress.create(resolvedAddress, this.blockchain), - txId: t.blockIndex.toString(), - txType: this.getTxType(resolvedAddress), - blockHeight: t.blockIndex, - amount: t.amount, - asset, - }; - }); + return entries; } - private mapTokenTransfers(transfers: IcpTransfer[], asset: Asset): PayInEntry[] { - return transfers.map((t) => ({ - senderAddresses: t.from, - receiverAddress: BlockchainAddress.create(t.to, this.blockchain), - txId: `${asset.chainId}:${t.blockIndex}`, - txType: this.getTxType(t.to), - blockHeight: t.blockIndex, - amount: t.amount, - asset, - })); + // --- DB-based block height lookups --- // + private async getLastCheckedNativeBlockHeight(depositAddress: BlockchainAddress): Promise { + return this.payInRepository + .findOne({ + select: { id: true, blockHeight: true }, + where: { address: depositAddress, inTxId: Not(Like('%:%')) }, + order: { blockHeight: 'DESC' }, + loadEagerRelations: false, + }) + .then((input) => input?.blockHeight ?? 0); } private getTxType(resolvedAddress: string): PayInType {