diff --git a/.github/workflows/test-on-push.yml b/.github/workflows/test-on-push.yml index 2893b19..c8ba345 100644 --- a/.github/workflows/test-on-push.yml +++ b/.github/workflows/test-on-push.yml @@ -52,8 +52,7 @@ jobs: [ -s "$NVM_DIR/nvm.sh" ] && \. "$NVM_DIR/nvm.sh" nvm install 18.20.5 npm i - npm i --prefix ./test-chaincode - (cd test-chaincode && npm run network:up && cd ..) + (cd test-chaincode && npm i && npm run build && npm run network:up) - name: Run e2e tests run: npm run test:e2e --prefix ./test-chaincode - name: Stream blocks from the network diff --git a/jest.config.ts b/jest.config.ts index 950311b..5abab25 100644 --- a/jest.config.ts +++ b/jest.config.ts @@ -14,11 +14,11 @@ */ export default { - displayName: "chaincode-template", + displayName: "stream", testEnvironment: "node", transform: { "^.+\\.[tj]s$": ["ts-jest", { tsconfig: "/tsconfig.spec.json" }] }, moduleFileExtensions: ["ts", "js"], - modulePathIgnorePatterns: ["lib", "e2e"] + modulePathIgnorePatterns: ["lib", "e2e", "test-chaincode"] }; diff --git a/package-lock.json b/package-lock.json index 7349530..29ee2ca 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@gala-chain/stream", - "version": "0.0.2", + "version": "0.0.3", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@gala-chain/stream", - "version": "0.0.2", + "version": "0.0.3", "license": "Apache-2.0", "dependencies": { "@hyperledger/fabric-gateway": "^1.6.0", diff --git a/package.json b/package.json index 87dcda1..ecb6bd4 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@gala-chain/stream", - "version": "0.0.2", + "version": "0.0.3", "description": "Streams blocks from GalaChain or Hyperledger Fabric network as RxJS Observables", "author": "", "private": false, diff --git a/src/ChainStream.ts b/src/ChainStream.ts index bf36a5f..218e200 100644 --- a/src/ChainStream.ts +++ b/src/ChainStream.ts @@ -41,7 +41,7 @@ export interface LoggerInterface { export class ChainStream { private readonly chainInfo: BehaviorSubject; private identityPromise: Promise | undefined; - + private isDisconnected = false; constructor( private readonly caService: CAService, private readonly chainService: ChainService, @@ -109,6 +109,10 @@ export class ChainStream { return of([]).pipe( expand(() => { + if (this.isDisconnected) { + return of([]); + } + if (currentBlock >= this.chainInfo.value.height) { return timer(config.intervalMs).pipe( tap(() => this.logger.log(`No new blocks, retrying after ${config.intervalMs} ms...`)), @@ -154,6 +158,8 @@ export class ChainStream { } public disconnect() { + this.logger.log("Disconnected, closing the stream"); + this.isDisconnected = true; this.chainService.disconnect(); } } diff --git a/src/parseBlock.ts b/src/parseBlock.ts index ea7ea0b..9be2f17 100644 --- a/src/parseBlock.ts +++ b/src/parseBlock.ts @@ -72,7 +72,7 @@ export function parseBlock(block: any): Block { const subjectMatch = cert.subject.match(/OU=(\w+).*CN=(\w+)/s); const creatorName = subjectMatch ? (subjectMatch[1] ?? "") + "|" + (subjectMatch[2] ?? "") : "|"; - let chaincodeRWSets: Array; + let rwSets: Array; if (transactionType === "ENDORSER_TRANSACTION") { if (!rawTransaction.payload.data.actions) continue; @@ -90,8 +90,7 @@ export function parseBlock(block: any): Block { version: action.payload.action.proposal_response_payload.extension.chaincode_id.version }; - const allRWSets = action.payload.action.proposal_response_payload.extension.results.ns_rwset; - chaincodeRWSets = allRWSets.filter(({ namespace }) => namespace === chaincode.name) as Array; + rwSets = action.payload.action.proposal_response_payload.extension.results.ns_rwset as Array; if (transactionType === "CONFIG") { txId = sha256(JSON.stringify(rawTransaction)); @@ -103,15 +102,17 @@ export function parseBlock(block: any): Block { rangeReads: [] }; - const sets = chaincodeRWSets.reduce((acc, set) => { + const sets = rwSets.reduce((acc, set) => { const { reads, writes, range_queries_info } = set.rwset; const parsedReads = reads.map((read) => ({ + namespace: set.namespace, key: read.key.replace("\0", "/") })); acc.reads.push(...parsedReads); const rangeReads = range_queries_info.map((range) => ({ + namespace: set.namespace, startKey: range.start_key.replace("\0", "/"), endKey: range.end_key.replace("\0", "/") })); @@ -119,6 +120,7 @@ export function parseBlock(block: any): Block { const parsedWrites = writes.map((write) => { return { + namespace: set.namespace, isDelete: write.is_delete, key: write.key.replace("\0", "/"), value: parseOrString(write.value.toString()) // chain objects diff --git a/src/stream.spec.ts b/src/stream.spec.ts index 0e9fa9a..8f068b2 100644 --- a/src/stream.spec.ts +++ b/src/stream.spec.ts @@ -115,32 +115,6 @@ it("should stream transactions", async () => { expect(methodNames).toEqual([methodWanted]); }); -it("should stream transactions", async () => { - // Given - const fetchedTransactions: StreamedTransaction[] = []; - - const methodWanted = "GalaChainToken:TransferToken"; - - // When - connectedStream - .transactions((t) => t.method === methodWanted) - .fromBlock(0) - .subscribe({ - next: (transaction) => { - console.log("Transaction:", transaction.id); - fetchedTransactions.push(transaction); - }, - error: (err) => console.error("Error:", err), - complete: () => console.log("Stream completed") - }); - - await new Promise((resolve) => setTimeout(resolve, 10000)); - - // Then - const methodNames = Array.from(new Set(fetchedTransactions.map((t) => t.method))); - expect(methodNames).toEqual([methodWanted]); -}); - class ChainServiceWithEntropy { private readonly errorRate = 0.4; private readonly maxDelayMs = 500; diff --git a/src/types.ts b/src/types.ts index 88fc110..2efc598 100644 --- a/src/types.ts +++ b/src/types.ts @@ -87,15 +87,18 @@ export interface Transaction } export interface Read { + namespace: string; key: string; } export interface RangeRead { + namespace: string; startKey: string; endKey: string; } export interface Write { + namespace: string; isDelete: boolean; key: string; value: ChainObject;