diff --git a/.github/workflows/test-on-push.yml b/.github/workflows/test-on-push.yml index c8ba345..6b3612f 100644 --- a/.github/workflows/test-on-push.yml +++ b/.github/workflows/test-on-push.yml @@ -57,7 +57,10 @@ jobs: run: npm run test:e2e --prefix ./test-chaincode - name: Stream blocks from the network run: - timeout 15s npx ts-node src/sample.ts || true + timeout 15s npx ts-node src/sample-blocks.ts || true + - name: Stream transactions from the network + run: + timeout 15s npx ts-node src/sample-transactions.ts || true - name: Test run: | curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.5/install.sh | bash diff --git a/src/ChainCache.ts b/src/ChainCache.ts new file mode 100644 index 0000000..a38075e --- /dev/null +++ b/src/ChainCache.ts @@ -0,0 +1,22 @@ +/* + * Copyright (c) Gala Games Inc. All rights reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { Block, ChainInfo } from "./types"; + +export interface ChainCache { + getChainInfo(channelName: string): Promise; + setChainInfo(channelName: string, chainInfo: ChainInfo): Promise; + getBlock(channelName: string, blockNumber: number): Promise; + setBlock(channelName: string, blockNumber: number, block: Block): Promise; +} \ No newline at end of file diff --git a/src/ChainService.ts b/src/ChainService.ts index 6bb5c18..c1d85c2 100644 --- a/src/ChainService.ts +++ b/src/ChainService.ts @@ -25,6 +25,7 @@ import { IIdentity } from "./CAService"; import { LoggerInterface } from "./ChainStream"; import { parseBlock } from "./parseBlock"; import { Block, ChainInfo, Transaction, TransactionValidationCode } from "./types"; +import { ChainCache } from "./ChainCache"; export interface PeerConfig { url: string; @@ -44,8 +45,9 @@ export class ChainService { constructor( private readonly peer: PeerConfig, public readonly channelName: string, - private readonly logger: LoggerInterface - ) {} + private readonly logger: LoggerInterface, + private readonly cache: ChainCache | undefined + ) { } public connect(identity: IIdentity): void { this.logger.log(`Connecting to channel ${this.channelName}`); @@ -147,15 +149,24 @@ export class ChainService { throw new Error(`Network ${this.channelName} not connected`); } + const cached = await this.cache?.getChainInfo(this.channelName); + if (cached) { + return cached; + } + const querySystemCC = this.network.getContract("qscc"); const blockBuffer = await querySystemCC.evaluateTransaction("GetChainInfo", this.channelName); const info = fabricProtos.BlockchainInfo.decode(blockBuffer); - return { + const chainInfo = { channelName: this.channelName, height: +info.height.toString() }; + + await this.cache?.setChainInfo(this.channelName, chainInfo); + + return chainInfo; } public async queryBlocks(blockNumbers: number[], transactionFilter: TransactionFilter): Promise { @@ -175,6 +186,13 @@ export class ChainService { network: Network, transactionFilter: TransactionFilter ): Promise { + const cached = await this.cache?.getBlock(this.channelName, blockNumber); + if (cached) { + // apply filter + cached.transactions = cached.transactions.filter(transactionFilter); + return cached; + } + const querySystemCC = network.getContract("qscc"); const blockBuffer = await querySystemCC.evaluateTransaction( "GetBlockByNumber", @@ -185,18 +203,38 @@ export class ChainService { const convertedBlockBuffer = blockBuffer instanceof Uint8Array ? Buffer.from(blockBuffer) : blockBuffer; const block = parseBlock(BlockDecoder.decode(convertedBlockBuffer)); - // apply filter - block.transactions = block.transactions.filter(transactionFilter); + // If we don't cache, then we can query only the transactions that match the filter + if (!this.cache) { + // apply filter + block.transactions = block.transactions.filter(transactionFilter); - // query transaction codes - const codes = await this.queryTransactionCodes( - network, - block.transactions.map((tx) => tx.id) - ); + // query transaction codes + const codes = await this.queryTransactionCodes( + network, + block.transactions.map((tx) => tx.id) + ); - block.transactions.forEach((t) => { - t.validationCode = codes[t.id] ?? TransactionValidationCode.UNKNOWN; - }); + block.transactions.forEach((t) => { + t.validationCode = codes[t.id] ?? TransactionValidationCode.UNKNOWN; + }); + } + + // If we cache, then we need to query all transactions codes and filter them later + else { + // query transaction codes + const codes = await this.queryTransactionCodes( + network, + block.transactions.map((tx) => tx.id) + ); + + block.transactions.forEach((t) => { + t.validationCode = codes[t.id] ?? TransactionValidationCode.UNKNOWN; + }); + + await this.cache?.setBlock(this.channelName, blockNumber, block); + + block.transactions = block.transactions.filter(transactionFilter); + } return block; } diff --git a/src/StreamBuilder.ts b/src/StreamBuilder.ts index ca2fbad..a715317 100644 --- a/src/StreamBuilder.ts +++ b/src/StreamBuilder.ts @@ -17,6 +17,7 @@ import { CAConfig, CAService, UserConfig } from "./CAService"; import { ChainService, PeerConfig } from "./ChainService"; import { ChainStream, LoggerInterface } from "./ChainStream"; import { ConnectedStream, StreamConfig } from "./ConnectedStream"; +import { ChainCache } from "./ChainCache"; const defaultCaConfig = { orgMsp: "CuratorOrg", @@ -61,6 +62,7 @@ export interface ConnectionParams { peer?: Partial; stream?: Partial; logger?: LoggerInterface; + cache?: ChainCache; } export class StreamBuilder { @@ -69,6 +71,7 @@ export class StreamBuilder { private readonly peer: PeerConfig; private readonly streamConfig: StreamConfig; private readonly logger: LoggerInterface; + private readonly cache: ChainCache | undefined; constructor(params: ConnectionParams) { this.ca = { @@ -94,11 +97,12 @@ export class StreamBuilder { maxRetryCount: params.stream?.maxRetryCount ?? defaultStreamConfig.maxRetryCount }; this.logger = params.logger ?? defaultLogger; + this.cache = params.cache ?? undefined; } public build(channelName: string): ConnectedStream { const caService = new CAService(this.ca, this.user); - const chainService = new ChainService(this.peer, channelName, this.logger); + const chainService = new ChainService(this.peer, channelName, this.logger, this.cache); const chainStream = new ChainStream(caService, chainService, this.logger); return new ConnectedStream(chainStream, this.streamConfig); } diff --git a/src/index.ts b/src/index.ts index c1010d1..2a92d3b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -24,3 +24,4 @@ export * from "./ConnectedStream"; export * from "./StreamBuilder"; export * from "./types"; export * from "./ConnectedTransactionStream"; +export * from "./ChainCache"; \ No newline at end of file diff --git a/src/sample-transactions.ts b/src/sample-transactions.ts index 0fdbb22..de3ee3c 100644 --- a/src/sample-transactions.ts +++ b/src/sample-transactions.ts @@ -12,7 +12,36 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import stream from "./index"; +import stream, { Block, ChainCache, ChainInfo } from "./index"; + + +// Sample implementation of ChainCache that stores blocks in memory +class InMemoryChainCache implements ChainCache { + private blocks: Record = {}; + + async getChainInfo(): Promise { + // no caching + return undefined; + } + + async setChainInfo(): Promise { + // no caching + } + + async getBlock(channelName: string, blockNumber: number): Promise { + const block = this.blocks[`${channelName}-${blockNumber}`]; + if (block) { + console.log("Block found in cache:", block.blockNumber); + return block; + } + return undefined; + } + + async setBlock(channelName: string, blockNumber: number, block: Block): Promise { + console.log("Saving block in cache:", block.blockNumber); + this.blocks[`${channelName}-${blockNumber}`] = block; + } +} const config = { ca: { @@ -35,9 +64,11 @@ const config = { batchSize: 10, retryOnErrorDelayMs: 5000, maxRetryCount: 5 - } + }, + cache: new InMemoryChainCache() }; + stream .connect(config) .channel("product-channel")