Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/indexer-agent/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,8 @@ export class Agent {
await operator.dipsManager.matchAgreementAllocations(
activeAllocations,
)

await operator.dipsManager.collectAgreementPayments()
}
},
)
Expand Down
10 changes: 10 additions & 0 deletions packages/indexer-agent/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,15 @@ export const start = {
required: false,
group: 'Indexing Fees ("DIPs")',
})
.option('dips-collection-target', {
description:
'Target collection point within the agreement window as a percentage (1-90). ' +
'Lower values collect sooner (safer), higher values collect later (fewer txs).',
type: 'number',
default: 50,
required: false,
group: 'Indexing Fees ("DIPs")',
})
.check(argv => {
if (
!argv['network-subgraph-endpoint'] &&
Expand Down Expand Up @@ -471,6 +480,7 @@ export async function createNetworkSpecification(
ravCollectionInterval: argv.ravCollectionInterval,
ravCheckInterval: argv.ravCheckInterval,
dipsEpochsMargin: argv.dipsEpochsMargin,
dipsCollectionTarget: argv.dipsCollectionTarget,
}

const transactionMonitoring = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ export class AllocationManager {
this.logger,
this.models,
this.network,
this.graphNode,
this,
this.pendingRcaModel,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ function createDipsManager(
models: IndexerManagementModels,
consumer: PendingRcaConsumer,
): DipsManager {
const dm = new DipsManager(logger, models, network, null)
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const dm = new DipsManager(logger, models, network, {} as any, null)
// eslint-disable-next-line @typescript-eslint/no-explicit-any
;(dm as any).pendingRcaConsumer = consumer
return dm
Expand Down Expand Up @@ -246,7 +247,8 @@ describe('DipsManager.acceptPendingProposals', () => {
test('returns early when pendingRcaConsumer is null', async () => {
const models = createMockModels()
const network = createMockNetwork()
const dm = new DipsManager(logger, models, network, null)
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const dm = new DipsManager(logger, models, network, {} as any, null)

// Should not throw
await dm.acceptPendingProposals([])
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/* eslint-disable @typescript-eslint/no-explicit-any,@typescript-eslint/no-unused-vars */
import {
fetchCollectableAgreements,
SubgraphIndexingAgreement,
} from '../agreement-monitor'

const mockQuery = jest.fn()
const mockNetworkSubgraph = { query: mockQuery } as any

const INDEXER_ADDRESS = '0x1234567890abcdef1234567890abcdef12345678'

describe('fetchCollectableAgreements', () => {
beforeEach(() => {
jest.clearAllMocks()
})

test('returns agreements in Accepted and CanceledByPayer states', async () => {
mockQuery.mockResolvedValueOnce({
data: {
indexingAgreements: [
{
id: '0x00000000000000000000000000000001',
allocationId: '0xaaaa',
subgraphDeploymentId: '0xbbbb',
state: 1,
lastCollectionAt: '1000',
endsAt: '9999999999',
maxInitialTokens: '1000000',
maxOngoingTokensPerSecond: '100',
tokensPerSecond: '50',
tokensPerEntityPerSecond: '10',
minSecondsPerCollection: 3600,
maxSecondsPerCollection: 86400,
canceledAt: '0',
},
],
},
})

const result = await fetchCollectableAgreements(mockNetworkSubgraph, INDEXER_ADDRESS)

expect(result).toHaveLength(1)
expect(result[0].id).toBe('0x00000000000000000000000000000001')
expect(result[0].state).toBe(1)
expect(mockQuery).toHaveBeenCalledTimes(1)
})

test('returns empty array when no agreements exist', async () => {
mockQuery.mockResolvedValueOnce({
data: { indexingAgreements: [] },
})

const result = await fetchCollectableAgreements(mockNetworkSubgraph, INDEXER_ADDRESS)

expect(result).toHaveLength(0)
})

test('paginates through large result sets', async () => {
// First page: 1000 results
const page1 = Array.from({ length: 1000 }, (_, i) => ({
id: `0x${i.toString(16).padStart(32, '0')}`,
allocationId: '0xaaaa',
subgraphDeploymentId: '0xbbbb',
state: 1,
lastCollectionAt: '1000',
endsAt: '9999999999',
maxInitialTokens: '1000000',
maxOngoingTokensPerSecond: '100',
tokensPerSecond: '50',
tokensPerEntityPerSecond: '10',
minSecondsPerCollection: 3600,
maxSecondsPerCollection: 86400,
canceledAt: '0',
}))
// Second page: 1 result
const page2 = [
{
id: '0x' + 'f'.repeat(32),
allocationId: '0xaaaa',
subgraphDeploymentId: '0xbbbb',
state: 1,
lastCollectionAt: '1000',
endsAt: '9999999999',
maxInitialTokens: '1000000',
maxOngoingTokensPerSecond: '100',
tokensPerSecond: '50',
tokensPerEntityPerSecond: '10',
minSecondsPerCollection: 3600,
maxSecondsPerCollection: 86400,
canceledAt: '0',
},
]

mockQuery
.mockResolvedValueOnce({ data: { indexingAgreements: page1 } })
.mockResolvedValueOnce({ data: { indexingAgreements: page2 } })

const result = await fetchCollectableAgreements(mockNetworkSubgraph, INDEXER_ADDRESS)

expect(result).toHaveLength(1001)
expect(mockQuery).toHaveBeenCalledTimes(2)
})
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { Logger } from '@graphprotocol/common-ts'
import { DipsManager } from '../dips'

const logger = {
child: jest.fn().mockReturnThis(),
info: jest.fn(),
debug: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
trace: jest.fn(),
} as unknown as Logger

const mockQuery = jest.fn()
const mockNetworkSubgraph = { query: mockQuery } as any

const mockCollectEstimateGas = jest.fn()
const mockCollect = jest.fn()

const mockContracts = {
SubgraphService: {
collect: Object.assign(mockCollect, {
estimateGas: mockCollectEstimateGas,
}),
},
} as any

const mockExecuteTransaction = jest.fn()
const mockTransactionManager = {
executeTransaction: mockExecuteTransaction,
} as any

const mockGraphNode = {
entityCount: jest.fn(),
proofOfIndexing: jest.fn(),
blockHashFromNumber: jest.fn(),
subgraphFeatures: jest.fn().mockResolvedValue({ network: 'mainnet' }),
} as any

const mockNetwork = {
contracts: mockContracts,
networkSubgraph: mockNetworkSubgraph,
transactionManager: mockTransactionManager,
specification: {
indexerOptions: {
address: '0x1234567890abcdef1234567890abcdef12345678',
dipperEndpoint: undefined,
dipsCollectionTarget: 50,
},
networkIdentifier: 'eip155:421614',
},
networkProvider: {
getBlockNumber: jest.fn().mockResolvedValue(1000),
getBlock: jest.fn().mockResolvedValue({ timestamp: Math.floor(Date.now() / 1000) }),
},
} as any

const mockModels = {} as any

function createDipsManager(): DipsManager {
return new DipsManager(logger, mockModels, mockNetwork, mockGraphNode, null)
}

// Helper: agreement that was last collected long ago (ready to collect)
function makeReadyAgreement(id = '0x00000000000000000000000000000001') {
return {
id,
allocationId: '0xaaaa',
subgraphDeploymentId:
'0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb',
state: 1,
lastCollectionAt: '0', // never collected → always ready
endsAt: '9999999999',
maxInitialTokens: '1000000',
maxOngoingTokensPerSecond: '100',
tokensPerSecond: '50',
tokensPerEntityPerSecond: '10',
minSecondsPerCollection: 3600,
maxSecondsPerCollection: 86400,
canceledAt: '0',
}
}

describe('DipsManager.collectAgreementPayments', () => {
beforeEach(() => {
jest.clearAllMocks()
})

test('skips when no collectable agreements found', async () => {
mockQuery.mockResolvedValueOnce({
data: { indexingAgreements: [] },
})

const dm = createDipsManager()
await dm.collectAgreementPayments()

expect(mockExecuteTransaction).not.toHaveBeenCalled()
})

test('skips agreement when tracker says not ready yet', async () => {
const recentlyCollected = makeReadyAgreement()
// Collected very recently — min is 3600, target at 50% is ~45000s
recentlyCollected.lastCollectionAt = String(Math.floor(Date.now() / 1000) - 100)

mockQuery.mockResolvedValueOnce({
data: { indexingAgreements: [recentlyCollected] },
})

const dm = createDipsManager()
await dm.collectAgreementPayments()

expect(mockExecuteTransaction).not.toHaveBeenCalled()
})

test('collects payment when agreement is ready', async () => {
mockQuery.mockResolvedValueOnce({
data: { indexingAgreements: [makeReadyAgreement()] },
})

mockGraphNode.entityCount.mockResolvedValueOnce([500])
mockGraphNode.blockHashFromNumber.mockResolvedValueOnce('0x' + 'ab'.repeat(32))
mockGraphNode.proofOfIndexing.mockResolvedValueOnce('0x' + 'cd'.repeat(32))
mockExecuteTransaction.mockResolvedValueOnce({ hash: '0xtxhash', status: 1 })

const dm = createDipsManager()
await dm.collectAgreementPayments()

expect(mockExecuteTransaction).toHaveBeenCalledTimes(1)
})

test('updates tracker after successful collection', async () => {
mockQuery
.mockResolvedValueOnce({ data: { indexingAgreements: [makeReadyAgreement()] } })
.mockResolvedValueOnce({ data: { indexingAgreements: [makeReadyAgreement()] } })

mockGraphNode.entityCount.mockResolvedValue([500])
mockGraphNode.blockHashFromNumber.mockResolvedValue('0x' + 'ab'.repeat(32))
mockGraphNode.proofOfIndexing.mockResolvedValue('0x' + 'cd'.repeat(32))
mockExecuteTransaction.mockResolvedValue({ hash: '0xtxhash', status: 1 })

const dm = createDipsManager()

// First call: collects
await dm.collectAgreementPayments()
expect(mockExecuteTransaction).toHaveBeenCalledTimes(1)

// Second call: tracker should skip (just collected)
await dm.collectAgreementPayments()
// Still only 1 call — second was skipped by tracker
expect(mockExecuteTransaction).toHaveBeenCalledTimes(1)
})

test('still attempts collection when POI is unavailable (best effort)', async () => {
mockQuery.mockResolvedValueOnce({
data: { indexingAgreements: [makeReadyAgreement()] },
})

mockGraphNode.entityCount.mockResolvedValueOnce([500])
mockGraphNode.blockHashFromNumber.mockResolvedValueOnce('0x' + 'ab'.repeat(32))
mockGraphNode.proofOfIndexing.mockResolvedValueOnce(null) // POI unavailable
mockExecuteTransaction.mockResolvedValueOnce({ hash: '0xtxhash', status: 1 })

const dm = createDipsManager()
await dm.collectAgreementPayments()

// Should log warning but still attempt collection with zero POI
expect(logger.warn).toHaveBeenCalled()
expect(mockExecuteTransaction).toHaveBeenCalledTimes(1)
})

test('handles deterministic error gracefully', async () => {
mockQuery.mockResolvedValueOnce({
data: { indexingAgreements: [makeReadyAgreement()] },
})

mockGraphNode.entityCount.mockResolvedValueOnce([500])
mockGraphNode.blockHashFromNumber.mockResolvedValueOnce('0x' + 'ab'.repeat(32))
mockGraphNode.proofOfIndexing.mockResolvedValueOnce('0x' + 'cd'.repeat(32))
mockExecuteTransaction.mockRejectedValueOnce(
Object.assign(new Error('revert'), { code: 'CALL_EXCEPTION' }),
)

const dm = createDipsManager()
await dm.collectAgreementPayments()

expect(logger.warn).toHaveBeenCalled()
})
})
Loading
Loading