Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/test-on-push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ jobs:
run: npm run test:e2e --prefix ./test-chaincode
- name: Stream blocks from the network
run:
timeout 15s npx ts-node src/sample.ts || true
timeout 15s npx ts-node src/sample-blocks.ts || true
- name: Stream transactions from the network
run:
timeout 15s npx ts-node src/sample-transactions.ts || true
- name: Test
run: |
curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.5/install.sh | bash
Expand Down
22 changes: 22 additions & 0 deletions src/ChainCache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) Gala Games Inc. All rights reserved.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Block, ChainInfo } from "./types";

export interface ChainCache {
getChainInfo(channelName: string): Promise<ChainInfo | undefined>;
setChainInfo(channelName: string, chainInfo: ChainInfo): Promise<void>;
getBlock(channelName: string, blockNumber: number): Promise<Block | undefined>;
setBlock(channelName: string, blockNumber: number, block: Block): Promise<void>;
}
64 changes: 51 additions & 13 deletions src/ChainService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { IIdentity } from "./CAService";
import { LoggerInterface } from "./ChainStream";
import { parseBlock } from "./parseBlock";
import { Block, ChainInfo, Transaction, TransactionValidationCode } from "./types";
import { ChainCache } from "./ChainCache";

export interface PeerConfig {
url: string;
Expand All @@ -44,8 +45,9 @@ export class ChainService {
constructor(
private readonly peer: PeerConfig,
public readonly channelName: string,
private readonly logger: LoggerInterface
) {}
private readonly logger: LoggerInterface,
private readonly cache: ChainCache | undefined
) { }

public connect(identity: IIdentity): void {
this.logger.log(`Connecting to channel ${this.channelName}`);
Expand Down Expand Up @@ -147,15 +149,24 @@ export class ChainService {
throw new Error(`Network ${this.channelName} not connected`);
}

const cached = await this.cache?.getChainInfo(this.channelName);
if (cached) {
return cached;
}

const querySystemCC = this.network.getContract("qscc");
const blockBuffer = await querySystemCC.evaluateTransaction("GetChainInfo", this.channelName);

const info = fabricProtos.BlockchainInfo.decode(blockBuffer);

return {
const chainInfo = {
channelName: this.channelName,
height: +info.height.toString()
};

await this.cache?.setChainInfo(this.channelName, chainInfo);

return chainInfo;
}

public async queryBlocks(blockNumbers: number[], transactionFilter: TransactionFilter): Promise<Block[]> {
Expand All @@ -175,6 +186,13 @@ export class ChainService {
network: Network,
transactionFilter: TransactionFilter
): Promise<Block> {
const cached = await this.cache?.getBlock(this.channelName, blockNumber);
if (cached) {
// apply filter
cached.transactions = cached.transactions.filter(transactionFilter);
return cached;
}

const querySystemCC = network.getContract("qscc");
const blockBuffer = await querySystemCC.evaluateTransaction(
"GetBlockByNumber",
Expand All @@ -185,18 +203,38 @@ export class ChainService {
const convertedBlockBuffer = blockBuffer instanceof Uint8Array ? Buffer.from(blockBuffer) : blockBuffer;
const block = parseBlock(BlockDecoder.decode(convertedBlockBuffer));

// apply filter
block.transactions = block.transactions.filter(transactionFilter);
// If we don't cache, then we can query only the transactions that match the filter
if (!this.cache) {
// apply filter
block.transactions = block.transactions.filter(transactionFilter);

// query transaction codes
const codes = await this.queryTransactionCodes(
network,
block.transactions.map((tx) => tx.id)
);
// query transaction codes
const codes = await this.queryTransactionCodes(
network,
block.transactions.map((tx) => tx.id)
);

block.transactions.forEach((t) => {
t.validationCode = codes[t.id] ?? TransactionValidationCode.UNKNOWN;
});
block.transactions.forEach((t) => {
t.validationCode = codes[t.id] ?? TransactionValidationCode.UNKNOWN;
});
}

// If we cache, then we need to query all transactions codes and filter them later
else {
// query transaction codes
const codes = await this.queryTransactionCodes(
network,
block.transactions.map((tx) => tx.id)
);

block.transactions.forEach((t) => {
t.validationCode = codes[t.id] ?? TransactionValidationCode.UNKNOWN;
});

await this.cache?.setBlock(this.channelName, blockNumber, block);

block.transactions = block.transactions.filter(transactionFilter);
}

return block;
}
Expand Down
6 changes: 5 additions & 1 deletion src/StreamBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { CAConfig, CAService, UserConfig } from "./CAService";
import { ChainService, PeerConfig } from "./ChainService";
import { ChainStream, LoggerInterface } from "./ChainStream";
import { ConnectedStream, StreamConfig } from "./ConnectedStream";
import { ChainCache } from "./ChainCache";

const defaultCaConfig = {
orgMsp: "CuratorOrg",
Expand Down Expand Up @@ -61,6 +62,7 @@ export interface ConnectionParams {
peer?: Partial<PeerConfig>;
stream?: Partial<StreamConfig>;
logger?: LoggerInterface;
cache?: ChainCache;
}

export class StreamBuilder {
Expand All @@ -69,6 +71,7 @@ export class StreamBuilder {
private readonly peer: PeerConfig;
private readonly streamConfig: StreamConfig;
private readonly logger: LoggerInterface;
private readonly cache: ChainCache | undefined;

constructor(params: ConnectionParams) {
this.ca = {
Expand All @@ -94,11 +97,12 @@ export class StreamBuilder {
maxRetryCount: params.stream?.maxRetryCount ?? defaultStreamConfig.maxRetryCount
};
this.logger = params.logger ?? defaultLogger;
this.cache = params.cache ?? undefined;
}

public build(channelName: string): ConnectedStream {
const caService = new CAService(this.ca, this.user);
const chainService = new ChainService(this.peer, channelName, this.logger);
const chainService = new ChainService(this.peer, channelName, this.logger, this.cache);
const chainStream = new ChainStream(caService, chainService, this.logger);
return new ConnectedStream(chainStream, this.streamConfig);
}
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ export * from "./ConnectedStream";
export * from "./StreamBuilder";
export * from "./types";
export * from "./ConnectedTransactionStream";
export * from "./ChainCache";
35 changes: 33 additions & 2 deletions src/sample-transactions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,36 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import stream from "./index";
import stream, { Block, ChainCache, ChainInfo } from "./index";


// Sample implementation of ChainCache that stores blocks in memory
class InMemoryChainCache implements ChainCache {
private blocks: Record<string, Block> = {};

async getChainInfo(): Promise<ChainInfo | undefined> {
// no caching
return undefined;
}

async setChainInfo(): Promise<void> {
// no caching
}

async getBlock(channelName: string, blockNumber: number): Promise<Block | undefined> {
const block = this.blocks[`${channelName}-${blockNumber}`];
if (block) {
console.log("Block found in cache:", block.blockNumber);
return block;
}
return undefined;
}

async setBlock(channelName: string, blockNumber: number, block: Block): Promise<void> {
console.log("Saving block in cache:", block.blockNumber);
this.blocks[`${channelName}-${blockNumber}`] = block;
}
}

const config = {
ca: {
Expand All @@ -35,9 +64,11 @@ const config = {
batchSize: 10,
retryOnErrorDelayMs: 5000,
maxRetryCount: 5
}
},
cache: new InMemoryChainCache()
};


stream
.connect(config)
.channel("product-channel")
Expand Down
Loading