Quick reference for using the AFI caching layer.
import { QueryCache } from "./server/query-cache";
import { PerformanceBenchmark } from "./server/performance-benchmark";const cache = new QueryCache(config?: QueryCacheConfig);Config options:
interface QueryCacheConfig {
agentMetricsTTL?: number; // Default: 300 (5 minutes)
counterpartyMetricsTTL?: number; // Default: 300 (5 minutes)
flowAggregateTTL?: number; // Default: 180 (3 minutes)
interactionListTTL?: number; // Default: 60 (1 minute)
enablePerformanceMonitoring?: boolean; // Default: false
}getAgentMetrics(store: Store, wallet: string): AgentMetricsGet cached agent metrics for a wallet.
Example:
const metrics = cache.getAgentMetrics(store, "0x123");
console.log(metrics.throughput.totalInteractions);getCounterpartyMetrics(store: Store, counterparty: string): CounterpartyMetricsGet cached counterparty metrics.
Example:
const metrics = cache.getCounterpartyMetrics(store, "service1");
console.log(metrics.volume.totalInteractions);getFlowAggregates(store: Store, filters?: {
wallet?: string;
counterparty?: string;
protocol?: string;
startDate?: string; // ISO format
endDate?: string; // ISO format
}): FlowAggregateResultGet cached flow aggregates with optional filters.
Examples:
// All flows
const all = cache.getFlowAggregates(store);
// Wallet-specific
const walletFlows = cache.getFlowAggregates(store, { wallet: "0x123" });
// Protocol-specific
const locusFlows = cache.getFlowAggregates(store, { protocol: "locus" });
// Date range
const recent = cache.getFlowAggregates(store, {
startDate: "2024-01-01",
endDate: "2024-12-31"
});
// Combined filters
const filtered = cache.getFlowAggregates(store, {
wallet: "0x123",
protocol: "locus",
startDate: "2024-03-01"
});Returns:
interface FlowAggregateResult {
totalInteractions: number;
dateRange: { start: string | null; end: string | null };
dailySeries: Array<{
date: string;
count: number;
uniqueProtocols: number;
uniqueCounterparties: number;
}>;
protocolSeries: Array<{ protocol: string; count: number }>;
counterpartySeries: Array<{ counterparty: string; count: number }>;
uniqueProtocols: number;
uniqueCounterparties: number;
}getInteractionsList(store: Store, filters?: {
wallet?: string;
counterparty?: string;
}): InteractionRecord[]Get cached list of interactions with optional filters.
Examples:
// All interactions
const all = cache.getInteractionsList(store);
// By wallet
const walletInteractions = cache.getInteractionsList(store, {
wallet: "0x123"
});
// By counterparty
const serviceInteractions = cache.getInteractionsList(store, {
counterparty: "service1"
});invalidateOnIngestion(
affectedWallets: string[],
affectedCounterparties: string[]
): voidInvalidate cache for specific entities after data ingestion.
Example:
// After ingesting a transaction
store.upsertInteraction(interaction);
cache.invalidateOnIngestion(
[interaction.wallet_address],
[interaction.counterparty]
);invalidateAll(): voidClear entire cache.
Example:
cache.invalidateAll();getStats(): CacheStatsGet cache statistics.
Returns:
interface CacheStats {
hits: number;
misses: number;
size: number;
hitRate: number; // 0-1
}Example:
const stats = cache.getStats();
console.log(`Hit rate: ${(stats.hitRate * 100).toFixed(1)}%`);
console.log(`Cache size: ${stats.size} entries`);getPerformanceMetrics(): QueryPerformanceMetrics[]Get performance metrics (only when enablePerformanceMonitoring: true).
Returns:
interface QueryPerformanceMetrics {
queryType: string;
cacheHit: boolean;
executionTimeMs: number;
timestamp: string;
}Example:
const cache = new QueryCache({ enablePerformanceMonitoring: true });
// ... run some queries ...
const metrics = cache.getPerformanceMetrics();
metrics.forEach(m => {
console.log(`${m.queryType}: ${m.executionTimeMs.toFixed(2)}ms ${m.cacheHit ? '(HIT)' : '(MISS)'}`);
});clearPerformanceMetrics(): voidClear performance metrics log.
cleanup(): voidRemove expired cache entries. Call periodically (e.g., every 5 minutes).
Example:
setInterval(() => cache.cleanup(), 5 * 60 * 1000);resetStats(): voidReset hit/miss counters.
const benchmark = new PerformanceBenchmark(store: Store);benchmarkAgentMetrics(
wallet: string,
iterations?: number
): Promise<BenchmarkSuite>Benchmark agent metrics queries.
Example:
const result = await benchmark.benchmarkAgentMetrics("0x123", 100);
console.log(`Speedup: ${result.summary.speedupCachedWarm.toFixed(1)}x`);benchmarkCounterpartyMetrics(
counterparty: string,
iterations?: number
): Promise<BenchmarkSuite>Benchmark counterparty metrics queries.
benchmarkFlowAggregates(
filters?: { wallet?: string; counterparty?: string },
iterations?: number
): Promise<BenchmarkSuite>Benchmark flow aggregate queries.
runFullBenchmark(options?: {
wallet?: string;
counterparty?: string;
iterations?: number;
}): Promise<{
agentMetrics?: BenchmarkSuite;
counterpartyMetrics?: BenchmarkSuite;
flowAggregates: BenchmarkSuite;
overallSummary: {
avgSpeedup: number;
cacheEffectiveness: "excellent" | "good" | "moderate" | "poor";
};
}>Run comprehensive benchmark suite.
Example:
const results = await benchmark.runFullBenchmark({
wallet: "0x123",
counterparty: "service1",
iterations: 100
});
console.log(`Average Speedup: ${results.overallSummary.avgSpeedup.toFixed(1)}x`);
console.log(`Effectiveness: ${results.overallSummary.cacheEffectiveness}`);generateReport(suites: BenchmarkSuite[]): stringGenerate human-readable benchmark report.
Example:
const agentBench = await benchmark.benchmarkAgentMetrics("0x123", 100);
const counterpartyBench = await benchmark.benchmarkCounterpartyMetrics("service1", 100);
const report = benchmark.generateReport([agentBench, counterpartyBench]);
console.log(report);Get cache statistics.
Response:
{
"hits": 1234,
"misses": 456,
"size": 42,
"hitRate": 0.73
}Manually invalidate entire cache.
Response:
{
"ok": true,
"message": "Cache invalidated"
}Get flow aggregates with caching.
Query Parameters:
wallet(optional): Filter by wallet addresscounterparty(optional): Filter by counterpartyprotocol(optional): Filter by protocolstartDate(optional): Filter start date (ISO format)endDate(optional): Filter end date (ISO format)
Examples:
# All flows
curl http://localhost:8787/api/metrics/flow-aggregates
# Wallet-specific
curl "http://localhost:8787/api/metrics/flow-aggregates?wallet=0x123"
# Protocol-specific with date range
curl "http://localhost:8787/api/metrics/flow-aggregates?protocol=locus&startDate=2024-01-01"
# Multiple filters
curl "http://localhost:8787/api/metrics/flow-aggregates?wallet=0x123&protocol=locus&startDate=2024-03-01"Response:
{
"totalInteractions": 150,
"dateRange": {
"start": "2024-01-01T00:00:00Z",
"end": "2024-03-18T00:00:00Z"
},
"dailySeries": [
{
"date": "2024-01-01",
"count": 5,
"uniqueProtocols": 2,
"uniqueCounterparties": 3
}
],
"protocolSeries": [
{ "protocol": "locus", "count": 80 },
{ "protocol": "x402", "count": 70 }
],
"counterpartySeries": [
{ "counterparty": "service1", "count": 60 },
{ "counterparty": "service2", "count": 40 }
],
"uniqueProtocols": 3,
"uniqueCounterparties": 5
}import { QueryCache } from "./server/query-cache";
import { Store } from "./server/store";
export function createApp() {
const store = new Store(config);
const queryCache = new QueryCache({
agentMetricsTTL: 300,
counterpartyMetricsTTL: 300,
flowAggregateTTL: 180,
interactionListTTL: 60,
});
// Pass to API
const api = createApi({ store, queryCache });
// Setup periodic cleanup
const cleanupInterval = setInterval(() => {
queryCache.cleanup();
}, 5 * 60 * 1000);
// Cleanup on shutdown
process.on("SIGTERM", () => {
clearInterval(cleanupInterval);
});
return app;
}// After ingesting data
async function ingestX402(data: X402Data) {
const bundle = normalizeInteraction(data);
store.upsertInteraction(bundle.interaction);
store.upsertSettlement(bundle.settlement);
store.upsertEvidence(bundle.evidence);
// Invalidate affected caches
const wallets = bundle.interaction.wallet_address
? [bundle.interaction.wallet_address]
: [];
const counterparties = bundle.interaction.counterparty
? [bundle.interaction.counterparty]
: [];
queryCache.invalidateOnIngestion(wallets, counterparties);
return bundle.interaction.id;
}// After batch ingestion
async function ingestBatch(transactions: Transaction[]) {
const wallets = new Set<string>();
const counterparties = new Set<string>();
for (const tx of transactions) {
store.upsertInteraction(tx);
if (tx.wallet_address) wallets.add(tx.wallet_address);
if (tx.counterparty) counterparties.add(tx.counterparty);
}
// Batch invalidate
queryCache.invalidateOnIngestion(
Array.from(wallets),
Array.from(counterparties)
);
}const cache = new QueryCache({ enablePerformanceMonitoring: true });
// Run queries
cache.getAgentMetrics(store, "0x123");
cache.getCounterpartyMetrics(store, "service1");
// Analyze performance
const metrics = cache.getPerformanceMetrics();
const avgTime = metrics.reduce((sum, m) => sum + m.executionTimeMs, 0) / metrics.length;
console.log(`Average query time: ${avgTime.toFixed(2)}ms`);
const hitRate = cache.getStats().hitRate;
console.log(`Cache hit rate: ${(hitRate * 100).toFixed(1)}%`);async function analyzeCachePerformance() {
const benchmark = new PerformanceBenchmark(store);
const results = await benchmark.runFullBenchmark({
wallet: "0x123",
counterparty: "service1",
iterations: 100
});
console.log(`\nPerformance Analysis:`);
console.log(`- Average Speedup: ${results.overallSummary.avgSpeedup.toFixed(1)}x`);
console.log(`- Cache Effectiveness: ${results.overallSummary.cacheEffectiveness}`);
if (results.agentMetrics) {
console.log(`\nAgent Metrics:`);
console.log(`- Direct: ${results.agentMetrics.summary.directAvgMs.toFixed(2)}ms`);
console.log(`- Cached (warm): ${results.agentMetrics.summary.cachedWarmAvgMs.toFixed(2)}ms`);
}
// Generate full report
const report = benchmark.generateReport([
results.agentMetrics!,
results.counterpartyMetrics!,
results.flowAggregates
]);
console.log(report);
}- Always invalidate on ingestion: Ensures data consistency
- Set appropriate TTLs: Balance freshness vs performance
- Monitor hit rates: Aim for > 50% in production
- Use cleanup interval: Prevent memory leaks
- Enable monitoring in dev: Helps identify bottlenecks
- Benchmark regularly: Verify cache effectiveness
- Handle edge cases: Empty results, null values, etc.
Low hit rate (<30%):
- Increase TTLs
- Check query parameter normalization
- Review invalidation frequency
Stale data:
- Decrease TTLs
- Verify invalidation on all ingestion paths
- Add manual invalidation endpoint
High memory usage:
- Decrease TTLs
- Verify cleanup is running
- Consider cache size limits
Performance not improving:
- Enable performance monitoring
- Run benchmarks
- Check if queries are actually using cache