From 68b699d7b7d30f50b0aea982914329ea8390d2e9 Mon Sep 17 00:00:00 2001 From: Jakub Dzikowski Date: Fri, 23 Jan 2026 13:20:07 +0100 Subject: [PATCH 1/5] Feat: Support cache Signed-off-by: Jakub Dzikowski --- src/ChainCache.ts | 8 ++++++ src/ChainService.ts | 58 +++++++++++++++++++++++++++++++++++--------- src/StreamBuilder.ts | 6 ++++- 3 files changed, 59 insertions(+), 13 deletions(-) create mode 100644 src/ChainCache.ts diff --git a/src/ChainCache.ts b/src/ChainCache.ts new file mode 100644 index 0000000..ed9a6ea --- /dev/null +++ b/src/ChainCache.ts @@ -0,0 +1,8 @@ +import { Block, ChainInfo } from "./types"; + +export interface ChainCache { + getChainInfo(channelName: string): ChainInfo; + setChainInfo(channelName: string, chainInfo: ChainInfo): void; + getBlock(channelName: string, blockNumber: number): Block; + setBlock(channelName: string, blockNumber: number, block: Block): void; +} \ No newline at end of file diff --git a/src/ChainService.ts b/src/ChainService.ts index 6bb5c18..04bad3f 100644 --- a/src/ChainService.ts +++ b/src/ChainService.ts @@ -25,6 +25,7 @@ import { IIdentity } from "./CAService"; import { LoggerInterface } from "./ChainStream"; import { parseBlock } from "./parseBlock"; import { Block, ChainInfo, Transaction, TransactionValidationCode } from "./types"; +import { ChainCache } from "./ChainCache"; export interface PeerConfig { url: string; @@ -44,8 +45,9 @@ export class ChainService { constructor( private readonly peer: PeerConfig, public readonly channelName: string, - private readonly logger: LoggerInterface - ) {} + private readonly logger: LoggerInterface, + private readonly cache: ChainCache | undefined + ) { } public connect(identity: IIdentity): void { this.logger.log(`Connecting to channel ${this.channelName}`); @@ -147,6 +149,11 @@ export class ChainService { throw new Error(`Network ${this.channelName} not connected`); } + const cached = this.cache?.getChainInfo(this.channelName); + if (cached) { + return cached; + } + const querySystemCC = this.network.getContract("qscc"); const blockBuffer = await querySystemCC.evaluateTransaction("GetChainInfo", this.channelName); @@ -175,6 +182,13 @@ export class ChainService { network: Network, transactionFilter: TransactionFilter ): Promise { + const cached = this.cache?.getBlock(this.channelName, blockNumber); + if (cached) { + // apply filter + cached.transactions = cached.transactions.filter(transactionFilter); + return cached; + } + const querySystemCC = network.getContract("qscc"); const blockBuffer = await querySystemCC.evaluateTransaction( "GetBlockByNumber", @@ -185,18 +199,38 @@ export class ChainService { const convertedBlockBuffer = blockBuffer instanceof Uint8Array ? Buffer.from(blockBuffer) : blockBuffer; const block = parseBlock(BlockDecoder.decode(convertedBlockBuffer)); - // apply filter - block.transactions = block.transactions.filter(transactionFilter); + // If we don't cache, then we can query only the transactions that match the filter + if (!this.cache) { + // apply filter + block.transactions = block.transactions.filter(transactionFilter); - // query transaction codes - const codes = await this.queryTransactionCodes( - network, - block.transactions.map((tx) => tx.id) - ); + // query transaction codes + const codes = await this.queryTransactionCodes( + network, + block.transactions.map((tx) => tx.id) + ); - block.transactions.forEach((t) => { - t.validationCode = codes[t.id] ?? TransactionValidationCode.UNKNOWN; - }); + block.transactions.forEach((t) => { + t.validationCode = codes[t.id] ?? TransactionValidationCode.UNKNOWN; + }); + } + + // If we cache, then we need to query all transactions codes and filter them later + else { + // query transaction codes + const codes = await this.queryTransactionCodes( + network, + block.transactions.map((tx) => tx.id) + ); + + block.transactions.forEach((t) => { + t.validationCode = codes[t.id] ?? TransactionValidationCode.UNKNOWN; + }); + + this.cache?.setBlock(this.channelName, blockNumber, block); + + block.transactions = block.transactions.filter(transactionFilter); + } return block; } diff --git a/src/StreamBuilder.ts b/src/StreamBuilder.ts index ca2fbad..a715317 100644 --- a/src/StreamBuilder.ts +++ b/src/StreamBuilder.ts @@ -17,6 +17,7 @@ import { CAConfig, CAService, UserConfig } from "./CAService"; import { ChainService, PeerConfig } from "./ChainService"; import { ChainStream, LoggerInterface } from "./ChainStream"; import { ConnectedStream, StreamConfig } from "./ConnectedStream"; +import { ChainCache } from "./ChainCache"; const defaultCaConfig = { orgMsp: "CuratorOrg", @@ -61,6 +62,7 @@ export interface ConnectionParams { peer?: Partial; stream?: Partial; logger?: LoggerInterface; + cache?: ChainCache; } export class StreamBuilder { @@ -69,6 +71,7 @@ export class StreamBuilder { private readonly peer: PeerConfig; private readonly streamConfig: StreamConfig; private readonly logger: LoggerInterface; + private readonly cache: ChainCache | undefined; constructor(params: ConnectionParams) { this.ca = { @@ -94,11 +97,12 @@ export class StreamBuilder { maxRetryCount: params.stream?.maxRetryCount ?? defaultStreamConfig.maxRetryCount }; this.logger = params.logger ?? defaultLogger; + this.cache = params.cache ?? undefined; } public build(channelName: string): ConnectedStream { const caService = new CAService(this.ca, this.user); - const chainService = new ChainService(this.peer, channelName, this.logger); + const chainService = new ChainService(this.peer, channelName, this.logger, this.cache); const chainStream = new ChainStream(caService, chainService, this.logger); return new ConnectedStream(chainStream, this.streamConfig); } From 26c5845f42a0cce9503a625f357ece2b66565b7a Mon Sep 17 00:00:00 2001 From: Jakub Dzikowski Date: Fri, 23 Jan 2026 13:22:49 +0100 Subject: [PATCH 2/5] Make chain cache async Signed-off-by: Jakub Dzikowski --- src/ChainCache.ts | 8 ++++---- src/ChainService.ts | 12 ++++++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/ChainCache.ts b/src/ChainCache.ts index ed9a6ea..1380fe6 100644 --- a/src/ChainCache.ts +++ b/src/ChainCache.ts @@ -1,8 +1,8 @@ import { Block, ChainInfo } from "./types"; export interface ChainCache { - getChainInfo(channelName: string): ChainInfo; - setChainInfo(channelName: string, chainInfo: ChainInfo): void; - getBlock(channelName: string, blockNumber: number): Block; - setBlock(channelName: string, blockNumber: number, block: Block): void; + getChainInfo(channelName: string): Promise; + setChainInfo(channelName: string, chainInfo: ChainInfo): Promise; + getBlock(channelName: string, blockNumber: number): Promise; + setBlock(channelName: string, blockNumber: number, block: Block): Promise; } \ No newline at end of file diff --git a/src/ChainService.ts b/src/ChainService.ts index 04bad3f..c1d85c2 100644 --- a/src/ChainService.ts +++ b/src/ChainService.ts @@ -149,7 +149,7 @@ export class ChainService { throw new Error(`Network ${this.channelName} not connected`); } - const cached = this.cache?.getChainInfo(this.channelName); + const cached = await this.cache?.getChainInfo(this.channelName); if (cached) { return cached; } @@ -159,10 +159,14 @@ export class ChainService { const info = fabricProtos.BlockchainInfo.decode(blockBuffer); - return { + const chainInfo = { channelName: this.channelName, height: +info.height.toString() }; + + await this.cache?.setChainInfo(this.channelName, chainInfo); + + return chainInfo; } public async queryBlocks(blockNumbers: number[], transactionFilter: TransactionFilter): Promise { @@ -182,7 +186,7 @@ export class ChainService { network: Network, transactionFilter: TransactionFilter ): Promise { - const cached = this.cache?.getBlock(this.channelName, blockNumber); + const cached = await this.cache?.getBlock(this.channelName, blockNumber); if (cached) { // apply filter cached.transactions = cached.transactions.filter(transactionFilter); @@ -227,7 +231,7 @@ export class ChainService { t.validationCode = codes[t.id] ?? TransactionValidationCode.UNKNOWN; }); - this.cache?.setBlock(this.channelName, blockNumber, block); + await this.cache?.setBlock(this.channelName, blockNumber, block); block.transactions = block.transactions.filter(transactionFilter); } From fb9635ec255475eb6eff120f9f4faa4fdd883d08 Mon Sep 17 00:00:00 2001 From: Jakub Dzikowski Date: Fri, 23 Jan 2026 16:07:13 +0100 Subject: [PATCH 3/5] Add cache to sample script Signed-off-by: Jakub Dzikowski --- .github/workflows/test-on-push.yml | 5 ++++- src/ChainCache.ts | 4 ++-- src/index.ts | 1 + src/sample-transactions.ts | 29 ++++++++++++++++++++++++++++- 4 files changed, 35 insertions(+), 4 deletions(-) diff --git a/.github/workflows/test-on-push.yml b/.github/workflows/test-on-push.yml index c8ba345..3facb77 100644 --- a/.github/workflows/test-on-push.yml +++ b/.github/workflows/test-on-push.yml @@ -57,7 +57,10 @@ jobs: run: npm run test:e2e --prefix ./test-chaincode - name: Stream blocks from the network run: - timeout 15s npx ts-node src/sample.ts || true + timeout 15s npx ts-node src/sample-blocks.ts || true + - name: Stream blocks from the network + run: + timeout 15s npx ts-node src/sample-transactions.ts || true - name: Test run: | curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.5/install.sh | bash diff --git a/src/ChainCache.ts b/src/ChainCache.ts index 1380fe6..231e1d5 100644 --- a/src/ChainCache.ts +++ b/src/ChainCache.ts @@ -1,8 +1,8 @@ import { Block, ChainInfo } from "./types"; export interface ChainCache { - getChainInfo(channelName: string): Promise; + getChainInfo(channelName: string): Promise; setChainInfo(channelName: string, chainInfo: ChainInfo): Promise; - getBlock(channelName: string, blockNumber: number): Promise; + getBlock(channelName: string, blockNumber: number): Promise; setBlock(channelName: string, blockNumber: number, block: Block): Promise; } \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index c1010d1..2a92d3b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -24,3 +24,4 @@ export * from "./ConnectedStream"; export * from "./StreamBuilder"; export * from "./types"; export * from "./ConnectedTransactionStream"; +export * from "./ChainCache"; \ No newline at end of file diff --git a/src/sample-transactions.ts b/src/sample-transactions.ts index 0fdbb22..309302c 100644 --- a/src/sample-transactions.ts +++ b/src/sample-transactions.ts @@ -12,7 +12,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import stream from "./index"; +import stream, { Block, ChainCache, ChainInfo } from "./index"; const config = { ca: { @@ -38,6 +38,33 @@ const config = { } }; +class InMemoryChainCache implements ChainCache { + private blocks: Record = {}; + + async getChainInfo(): Promise { + // no caching + return undefined; + } + + async setChainInfo(): Promise { + // no caching + } + + async getBlock(channelName: string, blockNumber: number): Promise { + const block = this.blocks[`${channelName}-${blockNumber}`]; + if (block) { + console.log("Block found in cache:", block.blockNumber); + return block; + } + return undefined; + } + + async setBlock(channelName: string, blockNumber: number, block: Block): Promise { + console.log("Saving block in cache:", block.blockNumber); + this.blocks[`${channelName}-${blockNumber}`] = block; + } +} + stream .connect(config) .channel("product-channel") From 56f91139b294c3b44e8528ae2f34f2a4a35b667c Mon Sep 17 00:00:00 2001 From: Jakub Dzikowski Date: Fri, 23 Jan 2026 16:20:11 +0100 Subject: [PATCH 4/5] Add missing copyright Signed-off-by: Jakub Dzikowski --- src/ChainCache.ts | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/ChainCache.ts b/src/ChainCache.ts index 231e1d5..a38075e 100644 --- a/src/ChainCache.ts +++ b/src/ChainCache.ts @@ -1,3 +1,17 @@ +/* + * Copyright (c) Gala Games Inc. All rights reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ import { Block, ChainInfo } from "./types"; export interface ChainCache { From 2841a9b5b95b50cc0992c78c622787dba18e1229 Mon Sep 17 00:00:00 2001 From: Jakub Dzikowski Date: Fri, 23 Jan 2026 16:27:53 +0100 Subject: [PATCH 5/5] Update workflow to reflect transaction streaming and enhance sample script with in-memory cache implementation Signed-off-by: Jakub Dzikowski --- .github/workflows/test-on-push.yml | 2 +- src/sample-transactions.ts | 50 ++++++++++++++++-------------- 2 files changed, 28 insertions(+), 24 deletions(-) diff --git a/.github/workflows/test-on-push.yml b/.github/workflows/test-on-push.yml index 3facb77..6b3612f 100644 --- a/.github/workflows/test-on-push.yml +++ b/.github/workflows/test-on-push.yml @@ -58,7 +58,7 @@ jobs: - name: Stream blocks from the network run: timeout 15s npx ts-node src/sample-blocks.ts || true - - name: Stream blocks from the network + - name: Stream transactions from the network run: timeout 15s npx ts-node src/sample-transactions.ts || true - name: Test diff --git a/src/sample-transactions.ts b/src/sample-transactions.ts index 309302c..de3ee3c 100644 --- a/src/sample-transactions.ts +++ b/src/sample-transactions.ts @@ -14,30 +14,8 @@ */ import stream, { Block, ChainCache, ChainInfo } from "./index"; -const config = { - ca: { - url: "https://localhost:7040", - name: "ca.curator.local", - orgMsp: "CuratorOrg" - }, - user: { - userId: "admin", - userSecret: "adminpw" - }, - peer: { - url: "grpcs://localhost:7041", - tlsCACertPath: - "./test-chaincode/test-network/fablo-target/fabric-config/crypto-config/peerOrganizations/curator.local/msp/tlscacerts/tlsca.curator.local-cert.pem", - grpcHostnameOverride: "peer0.curator.local" - }, - stream: { - gracePeriodMs: 1000, - batchSize: 10, - retryOnErrorDelayMs: 5000, - maxRetryCount: 5 - } -}; +// Sample implementation of ChainCache that stores blocks in memory class InMemoryChainCache implements ChainCache { private blocks: Record = {}; @@ -65,6 +43,32 @@ class InMemoryChainCache implements ChainCache { } } +const config = { + ca: { + url: "https://localhost:7040", + name: "ca.curator.local", + orgMsp: "CuratorOrg" + }, + user: { + userId: "admin", + userSecret: "adminpw" + }, + peer: { + url: "grpcs://localhost:7041", + tlsCACertPath: + "./test-chaincode/test-network/fablo-target/fabric-config/crypto-config/peerOrganizations/curator.local/msp/tlscacerts/tlsca.curator.local-cert.pem", + grpcHostnameOverride: "peer0.curator.local" + }, + stream: { + gracePeriodMs: 1000, + batchSize: 10, + retryOnErrorDelayMs: 5000, + maxRetryCount: 5 + }, + cache: new InMemoryChainCache() +}; + + stream .connect(config) .channel("product-channel")