diff --git a/e2e/mocks/api-feature.controller.ts b/e2e/mocks/api-feature.controller.ts new file mode 100644 index 0000000..c32b409 --- /dev/null +++ b/e2e/mocks/api-feature.controller.ts @@ -0,0 +1,46 @@ +import { Controller } from '@nestjs/common'; +import { RMQInjectService, RMQService } from '../../lib'; +import { + NotificationContracts, + SumContracts, +} from '../contracts/mock.contracts'; + +@Controller() +export class ApiFeatureController { + constructor( + public readonly rmqImplicitInject: RMQService, + @RMQInjectService("test2") public readonly rmqExplicitInject: RMQService + ) { } + + async sumSuccess(arrayToSum: number[]): Promise { + return this.rmqImplicitInject.send(SumContracts.topic, { arrayToSum }); + } + + async sumSuccess2(arrayToSum: number[]): Promise { + return this.rmqExplicitInject.send(SumContracts.topic, { arrayToSum }); + } + + async sumFailed(arrayToSum: string[]): Promise { + return this.rmqImplicitInject.send(SumContracts.topic, { arrayToSum }); + } + + async sumFailed2(arrayToSum: string[]): Promise { + return this.rmqExplicitInject.send(SumContracts.topic, { arrayToSum }); + } + + async notificationSuccess(message: string): Promise { + return this.rmqImplicitInject.notify(NotificationContracts.topic, { message }); + } + + async notificationSuccess2(message: string): Promise { + return this.rmqExplicitInject.notify(NotificationContracts.topic, { message }); + } + + async notificationFailed(message: number): Promise { + return this.rmqImplicitInject.notify(NotificationContracts.topic, { message }); + } + + async notificationFailed2(message: number): Promise { + return this.rmqExplicitInject.notify(NotificationContracts.topic, { message }); + } +} diff --git a/e2e/mocks/microservice.controller.ts b/e2e/mocks/microservice.controller.ts index c7770ca..cfe2fd6 100644 --- a/e2e/mocks/microservice.controller.ts +++ b/e2e/mocks/microservice.controller.ts @@ -10,7 +10,7 @@ import { ManualAckContracts, DebugContracts, CustomMessageFactoryContracts, PatternStarContracts, PatternHashContracts, } from '../contracts/mock.contracts'; -import { ERROR_TYPE } from '../../lib/constants'; +import { DEFAULT_SERVICE_NAME, ERROR_TYPE } from '../../lib/constants'; import { Message } from 'amqplib'; @Controller() @@ -33,6 +33,24 @@ export class MicroserviceController { return { result: arrayToSum.reduce((prev, cur) => prev + cur) }; } + @RMQRoute(SumContracts.topic, { name: 'test2' }) + @Validate() + sumRpc2({ arrayToSum }: SumContracts.Request): SumContracts.Response { + const result = arrayToSum.reduce((prev, cur) => prev + cur); + return { result }; + } + + @RMQRoute(SumContracts.topic, { name: 'test3' }) + @Validate() + sumRpc3({ arrayToSum }: SumContracts.Request): SumContracts.Response { + const result = arrayToSum.reduce((prev, cur) => prev + cur); + if (result !== 2) { + throw new Error('Do I look like a calculator to you?'); + } + + return { result }; + } + @RMQRoute(NotificationContracts.topic) @Validate() notificationNone({ message }: NotificationContracts.Request): void { @@ -40,6 +58,14 @@ export class MicroserviceController { return; } + @RMQRoute(NotificationContracts.topic, { name: ['test2', 'test3'] }) + @Validate() + notificationNone2({ message }: NotificationContracts.Request, @RMQMessage msg: ExtendedMessage): void { + console.log(message); + console.log(msg.serviceName); + return; + } + @RMQRoute(MultiplyContracts.topic) @Validate() multiplyRpc({ arrayToMultiply }: MultiplyContracts.Request): MultiplyContracts.Response { diff --git a/e2e/tests/rmqAsync.e2e-spec.ts b/e2e/tests/rmq-async.e2e-spec.ts similarity index 100% rename from e2e/tests/rmqAsync.e2e-spec.ts rename to e2e/tests/rmq-async.e2e-spec.ts diff --git a/e2e/tests/rmq-feature.e2e-spec.ts b/e2e/tests/rmq-feature.e2e-spec.ts new file mode 100644 index 0000000..e49cdcb --- /dev/null +++ b/e2e/tests/rmq-feature.e2e-spec.ts @@ -0,0 +1,297 @@ +import { Test } from '@nestjs/testing'; +import { RMQModule, RMQService } from '../../lib'; +import { INestApplication } from '@nestjs/common'; +import { ApiController } from '../mocks/api.controller'; +import { MicroserviceController } from '../mocks/microservice.controller'; +import { DEFAULT_SERVICE_NAME, ERROR_UNDEFINED_FROM_RPC } from '../../lib/constants'; +import { DoublePipe } from '../mocks/double.pipe'; +import { ZeroIntercepter } from '../mocks/zero.intercepter'; +import { ErrorHostHandler } from '../mocks/error-host.handler'; +import { getServiceToken } from '../../lib/utils/get-service-token'; +import { ApiFeatureController } from '../mocks/api-feature.controller'; + +class OverrideController extends ApiFeatureController { } + +describe('RMQe2e forFeature()', () => { + let api: INestApplication; + let apiController: ApiFeatureController; + let overrideController: OverrideController; + let microserviceController: MicroserviceController; + let rmqServiceDefault: RMQService; + let rmqServiceTest2: RMQService; + let rmqServiceTest3: RMQService; + + beforeAll(async () => { + const apiModule = await Test.createTestingModule({ + imports: [ + RMQModule.forRoot({ + exchangeName: 'test1', + connections: [ + { + login: 'guest', + password: 'guest', + host: 'localhost', + }, + ], + queueName: 'test-queue1', + heartbeatIntervalInSeconds: 10, + prefetchCount: 10, + middleware: [DoublePipe], + intercepters: [ZeroIntercepter], + errorHandler: ErrorHostHandler, + serviceName: 'test-service', + messagesTimeout: 2000, + }), + RMQModule.forRoot({ + name: 'test2', + exchangeName: 'test2', + connections: [ + { + login: 'guest', + password: 'guest', + host: 'localhost', + }, + ], + queueName: '', // random exclusive + heartbeatIntervalInSeconds: 10, + prefetchCount: 10, + middleware: [DoublePipe], + intercepters: [ZeroIntercepter], + errorHandler: ErrorHostHandler, + serviceName: 'test-service', + messagesTimeout: 2000, + }), + RMQModule.forRootAsync( + { + imports: [], + inject: [], + name: 'test3', + useFactory: () => ( + { + exchangeName: 'test3', + connections: [ + { + login: 'guest', + password: 'guest', + host: 'localhost', + }, + ], + queueName: 'test-queue3', + heartbeatIntervalInSeconds: 10, + prefetchCount: 10, + middleware: [DoublePipe], + intercepters: [ZeroIntercepter], + errorHandler: ErrorHostHandler, + serviceName: 'test-service', + messagesTimeout: 2000, + } + ) + } + ), + { + // uses test3 implicitly and test2 explicitly + imports: [ + RMQModule.forFeature('test3') + ], + controllers: [OverrideController], + module: class OverrideRMQModule { }, + }, + { + // uses default implicitly and test2 explicitly + imports: [], + controllers: [ApiFeatureController], + module: class SampleRMQModule { }, + } + ], + controllers: [MicroserviceController], + }).compile(); + api = apiModule.createNestApplication(); + await api.init(); + + apiController = apiModule.get(ApiFeatureController); + overrideController = apiModule.get(OverrideController); + microserviceController = apiModule.get(MicroserviceController); + rmqServiceDefault = apiController.rmqImplicitInject; + rmqServiceTest2 = apiController.rmqExplicitInject; + rmqServiceTest3 = overrideController.rmqImplicitInject; + console.warn = jest.fn(); + console.log = jest.fn(); + }); + + describe('rpc', () => { + it('check name', async () => { + expect(apiController.rmqImplicitInject.name).toBe(DEFAULT_SERVICE_NAME); + expect(apiController.rmqExplicitInject.name).toBe('test2'); + + expect(overrideController.rmqImplicitInject.name).toBe('test3'); + expect(overrideController.rmqExplicitInject.name).toBe('test2'); + }); + + it('check connection', async () => { + const isConnected = rmqServiceDefault.healthCheck(); + expect(isConnected).toBe(true); + + const isConnected2 = rmqServiceTest2.healthCheck(); + expect(isConnected2).toBe(true); + + const isConnected3 = rmqServiceTest3.healthCheck(); + expect(isConnected3).toBe(true); + }); + + it('default: successful send()', async () => { + const { result } = await apiController.sumSuccess([1, 2, 3]); + expect(result).toBe(6); + }); + + it('default: request validation failed', async () => { + try { + await apiController.sumFailed(['a', 'b', 'c']); + expect(true).toBe(false); + } catch (error) { + expect(error.message).toBe( + 'each value in arrayToSum must be a number conforming to the specified constraints', + ); + expect(error.type).toBeUndefined(); + expect(error.code).toBeUndefined(); + expect(error.data).toBeUndefined(); + expect(error.service).toBe('test-service'); + expect(error.host).not.toBeNull(); + } + }); + + it('test2: successful send()', async () => { + const { result } = await apiController.sumSuccess2([-10, 8, -9, 5]); + expect(result).toBe(-6); + }); + + it('test2: request validation failed', async () => { + try { + await apiController.sumFailed2(['a', 'b', 'c']); + expect(true).toBe(false); + } catch (error) { + expect(error.message).toBe( + 'each value in arrayToSum must be a number conforming to the specified constraints', + ); + expect(error.type).toBeUndefined(); + expect(error.code).toBeUndefined(); + expect(error.data).toBeUndefined(); + expect(error.service).toBe('test-service'); + expect(error.host).not.toBeNull(); + } + }); + + it('override - test3: successful send()', async () => { + const { result } = await overrideController.sumSuccess([1, 2, -1]); + expect(result).toBe(2); + }); + + it('override - test3: error thrown by microservice', async () => { + try { + await overrideController.sumSuccess([1, 2, 3]); + expect(true).toBe(false); + } catch (error) { + expect(error.message).toBe( + 'Do I look like a calculator to you?', + ); + expect(error.type).toBeUndefined(); + expect(error.code).toBeUndefined(); + expect(error.data).toBeUndefined(); + expect(error.service).toBe('test-service'); + expect(error.host).not.toBeNull(); + } + }); + + it('override - test2: successful send()', async () => { + const { result } = await overrideController.sumSuccess2([-10, 8, -9, 5]); + expect(result).toBe(-6); + }); + + it('override - test2: request validation failed', async () => { + try { + await overrideController.sumFailed2(['a', 'b', 'c']); + expect(true).toBe(false); + } catch (error) { + expect(error.message).toBe( + 'each value in arrayToSum must be a number conforming to the specified constraints', + ); + expect(error.type).toBeUndefined(); + expect(error.code).toBeUndefined(); + expect(error.data).toBeUndefined(); + expect(error.service).toBe('test-service'); + expect(error.host).not.toBeNull(); + } + }); + }); + + describe('none', () => { + it('default: successful notify()', async () => { + const res = await apiController.notificationSuccess('SECRETMESSAGE'); + await delay(1000); + expect(console.log).toBeCalledTimes(1); + expect(console.log).toHaveBeenCalledWith('SECRETMESSAGE'); + expect(res).toBeUndefined(); + jest.clearAllMocks(); + }); + + it('default: notify validation failed', async () => { + const res = await apiController.notificationFailed(0); + expect(console.log).toBeCalledTimes(0); + expect(res).toBeUndefined(); + expect(res).toBeUndefined(); + }); + + it('test2: successful notify()', async () => { + const res = await apiController.notificationSuccess2('SECRETMESSAGE2'); + await delay(1000); + expect(console.log).toBeCalledTimes(2); + expect(console.log).toHaveBeenCalledWith('SECRETMESSAGE2'); + expect(console.log).toHaveBeenCalledWith('test2'); + expect(res).toBeUndefined(); + jest.clearAllMocks(); + }); + + it('test2: notify validation failed', async () => { + const res = await apiController.notificationFailed2(0); + expect(console.log).toBeCalledTimes(0); + expect(res).toBeUndefined(); + expect(res).toBeUndefined(); + }); + + it('override - test3: successful notify()', async () => { + const res = await overrideController.notificationSuccess('SECRETMESSAGE3'); + await delay(1000); + expect(console.log).toBeCalledTimes(2); + expect(console.log).toHaveBeenCalledWith('SECRETMESSAGE3'); + expect(console.log).toHaveBeenCalledWith('test3'); + expect(res).toBeUndefined(); + jest.clearAllMocks(); + }); + + it('override - test2: successful notify()', async () => { + const res = await overrideController.notificationSuccess2('SECRETMESSAGE4'); + await delay(1000); + expect(console.log).toBeCalledTimes(2); + expect(console.log).toHaveBeenCalledWith('SECRETMESSAGE4'); + expect(console.log).toHaveBeenCalledWith('test2'); + expect(res).toBeUndefined(); + jest.clearAllMocks(); + }); + }); + + afterAll(async () => { + await delay(500); + await rmqServiceDefault.disconnect(); + await rmqServiceTest2.disconnect(); + await rmqServiceTest3.disconnect(); + await api.close(); + await delay(500); + }); +}); + +async function delay(time: number): Promise { + return new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, time); + }); +} diff --git a/e2e/tests/rmqTest.e2e-spec.ts b/e2e/tests/rmq-test.e2e-spec.ts similarity index 79% rename from e2e/tests/rmqTest.e2e-spec.ts rename to e2e/tests/rmq-test.e2e-spec.ts index cc5fd3c..a6790e9 100644 --- a/e2e/tests/rmqTest.e2e-spec.ts +++ b/e2e/tests/rmq-test.e2e-spec.ts @@ -2,7 +2,15 @@ import { Test } from '@nestjs/testing'; import { RMQModule, RMQService } from '../../lib'; import { INestApplication } from '@nestjs/common'; import { MicroserviceController } from '../mocks/microservice.controller'; -import { AppIdContracts, CustomMessageFactoryContracts, DivideContracts, MultiplyContracts, PatternHashContracts, PatternStarContracts, SumContracts } from '../contracts/mock.contracts'; +import { + AppIdContracts, + CustomMessageFactoryContracts, + DivideContracts, + MultiplyContracts, + PatternHashContracts, + PatternStarContracts, + SumContracts +} from '../contracts/mock.contracts'; import { ERROR_UNDEFINED_FROM_RPC } from '../../lib/constants'; import { DoublePipe } from '../mocks/double.pipe'; import { ZeroIntercepter } from '../mocks/zero.intercepter'; @@ -113,28 +121,46 @@ describe('RMQe2e forTest()', () => { describe('middleware', () => { it('doublePipe', async () => { - const { result } = await rmqService.triggerRoute(MultiplyContracts.topic, { + const { result } = await rmqService.triggerRoute< + MultiplyContracts.Request, + MultiplyContracts.Response + >( + MultiplyContracts.topic, + { arrayToMultiply: [1, 2] - }); + } + ); expect(result).toBe(8); }); }); describe('interceptor', () => { it('zeroInterceptor', async () => { - const { result } = await rmqService.triggerRoute(DivideContracts.topic, { + const { result } = await rmqService.triggerRoute< + DivideContracts.Request, + DivideContracts.Response + >( + DivideContracts.topic, + { first: 10, second: 5 - }); + } + ); expect(result).toBe(0); }); }); describe('msgFactory', () => { it('customMessageFactory', async () => { - const { num, appId } = await rmqService.triggerRoute(CustomMessageFactoryContracts.topic, { + const { num, appId } = await rmqService.triggerRoute< + CustomMessageFactoryContracts.Request, + CustomMessageFactoryContracts.Response + >( + CustomMessageFactoryContracts.topic, + { num: 1 - }); + } + ); expect(num).toBe(2); expect(appId).toBe('test-service'); }); @@ -142,16 +168,28 @@ describe('RMQe2e forTest()', () => { describe('msgPattent', () => { it('* pattern', async () => { - const { num } = await rmqService.triggerRoute(PatternStarContracts.topic, { + const { num } = await rmqService.triggerRoute< + PatternStarContracts.Request, + PatternStarContracts.Response + >( + PatternStarContracts.topic, + { num: 1 - }); + } + ); expect(num).toBe(1); }); it('# pattern', async () => { - const { num } = await rmqService.triggerRoute(PatternHashContracts.topic, { - num: 1 - }); + const { num } = await rmqService.triggerRoute< + PatternHashContracts.Request, + PatternHashContracts.Response + >( + PatternHashContracts.topic, + { + num: 1 + } + ); expect(num).toBe(1); }); }); diff --git a/lib/classes/rmq-extended-message.class.ts b/lib/classes/rmq-extended-message.class.ts index bc59c58..30ec6ee 100644 --- a/lib/classes/rmq-extended-message.class.ts +++ b/lib/classes/rmq-extended-message.class.ts @@ -1,39 +1,43 @@ -import { MessageFields, MessageProperties, Message } from "amqplib"; +import { MessageFields, MessageProperties, Message } from 'amqplib'; +import { IRMQMessage } from '../interfaces/rmq-message.interface'; -export class ExtendedMessage implements Message { - content: Buffer; - fields: MessageFields; - properties: MessageProperties; +export class ExtendedMessage implements IRMQMessage { + content: Buffer; + fields: MessageFields; + properties: MessageProperties; + serviceName: string; - constructor(msg: Message) { - this.content = msg.content; - this.fields = msg.fields; - this.properties = msg.properties; - } + constructor(msg: IRMQMessage) { + this.content = msg.content; + this.fields = msg.fields; + this.properties = msg.properties; + this.serviceName = msg.serviceName; + } - public getDebugString(): string { - try { - const content = JSON.parse(this.content.toString()) - const debugMsg = { - fields: this.fields, - properties: this.properties, - message: this.maskBuffers(content), - } - return JSON.stringify(debugMsg); - } catch (e) { - return e.message; - } - } + public getDebugString(): string { + try { + const content = JSON.parse(this.content.toString()); + const debugMsg = { + fields: this.fields, + properties: this.properties, + serviceName: this.serviceName, + message: this.maskBuffers(content), + }; + return JSON.stringify(debugMsg); + } catch (e) { + return e.message; + } + } - private maskBuffers(obj: any) { - let result: any = {}; - for (let prop in obj) { - if (obj[prop].type === 'Buffer') { - result[prop] = 'Buffer - length ' + (obj[prop].data as Buffer).length; - } else { - result[prop] = obj[prop]; - } - } - return result; - } + private maskBuffers(obj: any) { + const result: any = {}; + for (const prop in obj) { + if (obj[prop].type === 'Buffer') { + result[prop] = 'Buffer - length ' + (obj[prop].data as Buffer).length; + } else { + result[prop] = obj[prop]; + } + } + return result; + } } \ No newline at end of file diff --git a/lib/classes/rmq-pipe.class.ts b/lib/classes/rmq-pipe.class.ts index a625998..a686e8a 100644 --- a/lib/classes/rmq-pipe.class.ts +++ b/lib/classes/rmq-pipe.class.ts @@ -1,5 +1,6 @@ -import { Message } from 'amqplib'; import { LoggerService } from '@nestjs/common'; +import { Message } from 'amqplib'; +import { IRMQMessage } from '../interfaces/rmq-message.interface'; // tslint:disable-next-line: interface-name export class RMQPipeClass { @@ -9,7 +10,7 @@ export class RMQPipeClass { this.logger = logger; } - async transform(msg: Message): Promise { + async transform(msg: IRMQMessage): Promise { return msg; } } diff --git a/lib/constants.ts b/lib/constants.ts index 3846c6b..2c1bea3 100644 --- a/lib/constants.ts +++ b/lib/constants.ts @@ -20,6 +20,7 @@ export const DEFAULT_RECONNECT_TIME: number = 5; export const DEFAULT_HEARTBEAT_TIME: number = 5; export const DEFAULT_TIMEOUT: number = 30000; export const DEFAULT_PREFETCH_COUNT: number = 0; +export const DEFAULT_SERVICE_NAME: string = 'Default'; export const INITIALIZATION_STEP_DELAY: number = 300; export enum ERROR_TYPE { diff --git a/lib/decorators/rmq-inject-service.decorator.ts b/lib/decorators/rmq-inject-service.decorator.ts new file mode 100644 index 0000000..1d4f25b --- /dev/null +++ b/lib/decorators/rmq-inject-service.decorator.ts @@ -0,0 +1,5 @@ +import { Inject } from '@nestjs/common'; +import { DEFAULT_SERVICE_NAME } from '../constants'; +import { getServiceToken } from '../utils/get-service-token'; + +export const RMQInjectService = (serviceName: string = DEFAULT_SERVICE_NAME) => Inject(getServiceToken(serviceName)); \ No newline at end of file diff --git a/lib/helpers/logger.ts b/lib/helpers/logger.ts index a851f00..f204c95 100644 --- a/lib/helpers/logger.ts +++ b/lib/helpers/logger.ts @@ -1,18 +1,24 @@ import { Logger, LoggerService } from '@nestjs/common'; import { blueBright, white, yellow } from 'chalk'; +import { DEFAULT_SERVICE_NAME } from '../constants'; export class RQMColorLogger implements LoggerService { logMessages: boolean; + name: string; - constructor(logMessages: boolean) { + constructor(logMessages: boolean, name?: string) { this.logMessages = logMessages ?? false; + this.name = name ?? DEFAULT_SERVICE_NAME; } + log(message: any, context?: string): any { Logger.log(message, context); } + error(message: any, trace?: string, context?: string): any { Logger.error(message, trace, context); } + debug(message: any, context?: string): any { if(!this.logMessages) { return; @@ -24,6 +30,7 @@ export class RQMColorLogger implements LoggerService { Logger.log(message, context); } } + warn(message: any, context?: string): any { Logger.warn(message, context); } diff --git a/lib/index.ts b/lib/index.ts index 8f9ff20..c01729e 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -2,6 +2,7 @@ export * from './rmq.module'; export * from './rmq.service'; export * from './rmq-test.service'; export * from './interfaces/rmq-error-headers.interface'; +export * from './interfaces/rmq-message.interface'; export * from './interfaces/rmq-options.interface'; export * from './interfaces/rmq-publish-options.interface'; export * from './interfaces/rmq-service.interface'; @@ -10,6 +11,7 @@ export * from './decorators/rmq-controller.decorator'; export * from './decorators/rmq-pipe.decorator'; export * from './decorators/validate.decorator'; export * from './decorators/rmq-message.decorator'; +export * from './decorators/rmq-inject-service.decorator'; export * from './classes/rmq-pipe.class'; export * from './classes/rmq-intercepter.class'; export * from './classes/rmq-error.class'; diff --git a/lib/interfaces/queue-meta.interface.ts b/lib/interfaces/queue-meta.interface.ts index 223138c..f376c29 100644 --- a/lib/interfaces/queue-meta.interface.ts +++ b/lib/interfaces/queue-meta.interface.ts @@ -1,4 +1,4 @@ -import { Message } from 'amqplib'; +import { IRMQMessage } from './rmq-message.interface'; export interface IRouteMeta { topic: string; @@ -8,6 +8,7 @@ export interface IRouteMeta { } export interface IRouteOptions { + name?: string | string[]; manualAck?: boolean; - msgFactory?: (msg: Message) => any[]; + msgFactory?: (msg: IRMQMessage) => any[]; } diff --git a/lib/interfaces/rmq-message.interface.ts b/lib/interfaces/rmq-message.interface.ts new file mode 100644 index 0000000..267c369 --- /dev/null +++ b/lib/interfaces/rmq-message.interface.ts @@ -0,0 +1,5 @@ +import { Message } from 'amqplib'; + +export interface IRMQMessage extends Message { + serviceName: string; +} \ No newline at end of file diff --git a/lib/interfaces/rmq-options.interface.ts b/lib/interfaces/rmq-options.interface.ts index 2e3864d..ce3dac7 100644 --- a/lib/interfaces/rmq-options.interface.ts +++ b/lib/interfaces/rmq-options.interface.ts @@ -5,7 +5,7 @@ import { LoggerService } from '@nestjs/common'; import { ModuleMetadata } from '@nestjs/common/interfaces'; import { Channel, Options } from 'amqplib'; -export interface IRMQServiceOptions { +export interface IRMQServiceBaseOptions { exchangeName: string; connections: IRMQConnection[]; queueName?: string; @@ -15,6 +15,7 @@ export interface IRMQServiceOptions { prefetchCount?: number; isGlobalPrefetchCount?: boolean; isQueueDurable?: boolean; + isQueueExclusive?: boolean; isExchangeDurable?: boolean; assertExchangeType?: Parameters[1]; exchangeOptions?: Options.AssertExchange; @@ -29,6 +30,10 @@ export interface IRMQServiceOptions { serviceName?: string; } +export interface IRMQServiceOptions extends IRMQServiceBaseOptions { + name?: string; +} + export interface IRMQConnection { login: string; password: string; @@ -38,6 +43,7 @@ export interface IRMQConnection { } export interface IRMQServiceAsyncOptions extends Pick { - useFactory?: (...args: any[]) => Promise | IRMQServiceOptions; + name?: string; + useFactory?: (...args: any[]) => Promise | IRMQServiceBaseOptions; inject?: any[]; } diff --git a/lib/interfaces/rmq-service.interface.ts b/lib/interfaces/rmq-service.interface.ts index f9ce815..807e77f 100644 --- a/lib/interfaces/rmq-service.interface.ts +++ b/lib/interfaces/rmq-service.interface.ts @@ -2,11 +2,12 @@ import { Channel } from 'amqplib'; import { IPublishOptions } from '..'; export interface IRMQService { + readonly name: string; init: () => Promise; ack: (...params: Parameters) => ReturnType; nack: (...params: Parameters) => ReturnType; - send: (topic: string, message: IMessage, options?: IPublishOptions) => Promise - notify: (topic: string, message: IMessage, options?: IPublishOptions) => Promise + send: (topic: string, message: IMessage, options?: IPublishOptions) => Promise; + notify: (topic: string, message: IMessage, options?: IPublishOptions) => Promise; healthCheck: () => boolean; disconnect: () => Promise; mockReply?: (topic: string, reply: T) => void; diff --git a/lib/rmq-error.service.ts b/lib/rmq-error.service.ts index 2c42087..c88cc2a 100644 --- a/lib/rmq-error.service.ts +++ b/lib/rmq-error.service.ts @@ -4,14 +4,16 @@ import { hostname } from 'os'; import { Message } from 'amqplib'; import { RMQErrorHandler } from './classes/rmq-error-handler.class'; import { IRMQServiceOptions } from './interfaces/rmq-options.interface'; -import { RMQ_MODULE_OPTIONS } from './constants'; +import { DEFAULT_SERVICE_NAME, RMQ_MODULE_OPTIONS } from './constants'; @Injectable() export class RmqErrorService { private options: IRMQServiceOptions; + public readonly name: string; constructor(@Inject(RMQ_MODULE_OPTIONS) options: IRMQServiceOptions) { this.options = options; + this.name = this.options.name ?? DEFAULT_SERVICE_NAME; } public buildError(error: Error | RMQError) { diff --git a/lib/rmq-explorer.module.ts b/lib/rmq-explorer.module.ts new file mode 100644 index 0000000..c67dded --- /dev/null +++ b/lib/rmq-explorer.module.ts @@ -0,0 +1,12 @@ +import { Global, Module } from '@nestjs/common'; +import { RMQMetadataAccessor } from './rmq-metadata.accessor'; +import { RMQExplorer } from './rmq.explorer'; +import { DiscoveryModule } from '@nestjs/core'; + +@Global() +@Module({ + imports: [DiscoveryModule], + providers: [RMQMetadataAccessor, RMQExplorer], + exports: [RMQMetadataAccessor, RMQExplorer], +}) +export class RMQExplorerModule {} diff --git a/lib/rmq-global.module.ts b/lib/rmq-global.module.ts new file mode 100644 index 0000000..5474dae --- /dev/null +++ b/lib/rmq-global.module.ts @@ -0,0 +1,119 @@ +import { RMQService } from './rmq.service'; +import { DynamicModule, Global, Inject, Module, Provider } from '@nestjs/common'; +import { IRMQServiceAsyncOptions, IRMQServiceOptions } from './interfaces/rmq-options.interface'; +import { RMQMetadataAccessor } from './rmq-metadata.accessor'; +import { RMQ_MODULE_OPTIONS } from './constants'; +import { RmqErrorService } from './rmq-error.service'; +import { RMQTestService } from './rmq-test.service'; +import { getErrorServiceToken, getServiceToken } from './utils/get-service-token'; +import { RMQExplorerModule } from './rmq-explorer.module'; +import { DiscoveryModule } from '@nestjs/core'; + +@Global() +@Module( + { + imports: [DiscoveryModule] + } +) +export class RMQGlobalModule { + static forRoot(options: IRMQServiceOptions): DynamicModule { + const optionsProvider: Provider = { + provide: RMQ_MODULE_OPTIONS, + useValue: options, + }; + + const errorServiceProvider: Provider = { + provide: getErrorServiceToken(options.name), + useValue: new RmqErrorService(options), + }; + + const serviceProvider: Provider = { + provide: getServiceToken(options.name), + useFactory: async ( + metadataAccessor: RMQMetadataAccessor, + errorService: RmqErrorService + ) => new RMQService(options, metadataAccessor, errorService), + inject: [RMQMetadataAccessor, getErrorServiceToken(options.name)] + }; + + return { + module: RMQGlobalModule, + providers: [ + errorServiceProvider, + serviceProvider, + optionsProvider, + ], + exports: [serviceProvider], + imports: [RMQExplorerModule] + }; + } + + static forRootAsync(asyncOptions: IRMQServiceAsyncOptions): DynamicModule { + const errorServiceProvider = { + provide: getErrorServiceToken(asyncOptions.name), + useFactory: async (options: IRMQServiceOptions,) => new RmqErrorService(options), + inject: [RMQ_MODULE_OPTIONS] + }; + + const serviceProvider = { + provide: getServiceToken(asyncOptions.name), + useFactory: async ( + options: IRMQServiceOptions, + metadataAccessor: RMQMetadataAccessor, + errorService: RmqErrorService + ) => new RMQService(options, metadataAccessor, errorService), + inject: [RMQ_MODULE_OPTIONS, RMQMetadataAccessor, getErrorServiceToken(asyncOptions.name)], + }; + + const asyncProvider = RMQGlobalModule.createAsyncOptionsProvider(asyncOptions); + return { + module: RMQGlobalModule, + providers: [ + asyncProvider, + errorServiceProvider, + serviceProvider + ], + exports: [serviceProvider], + imports: [RMQExplorerModule, ...(asyncOptions.imports ?? [])] + }; + } + + static forTest(options: Partial): DynamicModule { + const optionsProvider = { + provide: RMQ_MODULE_OPTIONS, + useValue: options, + }; + + const serviceProvider = { + provide: getServiceToken(options.name), + useFactory: async ( + metadataAccessor: RMQMetadataAccessor + ) => new RMQTestService(options as IRMQServiceOptions, metadataAccessor), + inject: [RMQMetadataAccessor] + }; + + return { + module: RMQGlobalModule, + providers: [ + serviceProvider, + optionsProvider + ], + exports: [serviceProvider], + imports: [RMQExplorerModule] + }; + } + + private static createAsyncOptionsProvider(options: IRMQServiceAsyncOptions): Provider { + return { + provide: RMQ_MODULE_OPTIONS, + useFactory: async (...args: any[]): Promise => { + const config = await options.useFactory(...args); + return { + ...config, + name: options.name, + }; + }, + inject: options.inject || [], + }; + } +} diff --git a/lib/rmq-metadata.accessor.ts b/lib/rmq-metadata.accessor.ts index 11a3ad5..9962de6 100644 --- a/lib/rmq-metadata.accessor.ts +++ b/lib/rmq-metadata.accessor.ts @@ -8,6 +8,7 @@ import { } from './constants'; import { IRouteOptions } from './interfaces/queue-meta.interface'; import { RMQService } from './rmq.service'; +import { getRouteKey } from './utils/get-route-key'; @Injectable() export class RMQMetadataAccessor { @@ -17,13 +18,17 @@ export class RMQMetadataAccessor { return this.reflector.get(RMQ_ROUTES_PATH, target); } - getAllRMQPaths(): string[] { - return Reflect.getMetadata(RMQ_ROUTES_META, RMQService) ?? []; + getAllRMQRouteKeys(serviceName?: string): string[] { + serviceName = serviceName ? getRouteKey('', serviceName) : undefined; + return (Reflect.getMetadata(RMQ_ROUTES_META, RMQService) ?? []) + .filter( + (m: string) => !serviceName || m.startsWith(serviceName) + ); } - addRMQPath(path: string): void { - const paths: string[] = this.getAllRMQPaths(); - paths.push(path); + addRMQRouteKey(routeKey: string): void { + const paths: string[] = this.getAllRMQRouteKeys(); + paths.push(routeKey); Reflect.defineMetadata(RMQ_ROUTES_META, paths, RMQService); } diff --git a/lib/rmq-test.service.ts b/lib/rmq-test.service.ts index 8b571e8..7a1d800 100644 --- a/lib/rmq-test.service.ts +++ b/lib/rmq-test.service.ts @@ -1,31 +1,42 @@ import { Inject, Injectable, LoggerService, OnModuleInit } from '@nestjs/common'; import { Channel, Message } from 'amqplib'; import { IPublishOptions, IRMQServiceOptions, RMQError } from '.'; -import { CONNECTED_MESSAGE, ERROR_NO_ROUTE, ERROR_TYPE, RMQ_MODULE_OPTIONS } from './constants'; +import { CONNECTED_MESSAGE, DEFAULT_SERVICE_NAME, ERROR_NO_ROUTE, ERROR_TYPE, RMQ_MODULE_OPTIONS } from './constants'; import { RQMColorLogger } from './helpers/logger'; import { IRMQService } from './interfaces/rmq-service.interface'; import { RMQMetadataAccessor } from './rmq-metadata.accessor'; import { requestEmitter, responseEmitter, ResponseEmitterResult } from './emmiters/router.emmiter'; import { validateOptions } from './option.validator'; import { getUniqId } from './utils/get-uniq-id'; +import { IRMQMessage } from './interfaces/rmq-message.interface'; +import { getRouteKey } from './utils/get-route-key'; @Injectable() export class RMQTestService implements OnModuleInit, IRMQService { private options: IRMQServiceOptions; - private routes: string[]; + private routeKeys: string[]; private logger: LoggerService; - private isInitialized: boolean = false; private replyStack = new Map(); private mockStack = new Map(); private mockErrorStack = new Map(); + private isInitialized = false; + public readonly name: string; - constructor(@Inject(RMQ_MODULE_OPTIONS) options: IRMQServiceOptions, private readonly metadataAccessor: RMQMetadataAccessor) { + constructor( + @Inject(RMQ_MODULE_OPTIONS) options: IRMQServiceOptions, + private readonly metadataAccessor: RMQMetadataAccessor + ) { this.options = options; - this.logger = options.logger ? options.logger : new RQMColorLogger(this.options.logMessages); + this.name = options.name ?? DEFAULT_SERVICE_NAME; + this.logger = options.logger ? options.logger : new RQMColorLogger(this.options.logMessages, this.name); validateOptions(this.options, this.logger); } async onModuleInit() { + if (this.isInitialized) { + return; + } + await this.init(); this.isInitialized = true; } @@ -41,7 +52,8 @@ export class RMQTestService implements OnModuleInit, IRMQService { public async triggerRoute(path: string, data: T): Promise { return new Promise(async (resolve, reject) => { const correlationId = getUniqId(); - let msg: Message = { + let msg: IRMQMessage = { + serviceName: this.name, content: Buffer.from(JSON.stringify(data)), fields: { deliveryTag: 1, @@ -65,8 +77,9 @@ export class RMQTestService implements OnModuleInit, IRMQService { expiration: 0, replyTo: 'mock' } - } - const route = this.getRouteByTopic(path); + }; + + const route = this.getRouteKeyByTopic(path); if (route) { msg = await this.useMiddleware(msg); this.replyStack.set(correlationId, { resolve, reject }); @@ -74,7 +87,7 @@ export class RMQTestService implements OnModuleInit, IRMQService { } else { throw new RMQError(ERROR_NO_ROUTE, ERROR_TYPE.TRANSPORT); } - }) + }); } public async init(): Promise { @@ -106,23 +119,36 @@ export class RMQTestService implements OnModuleInit, IRMQService { } public async disconnect() { - responseEmitter.removeAllListeners(); + this.detachEmitters(); + } + + private detachEmitters(): void { + responseEmitter.removeListener(getRouteKey(ResponseEmitterResult.success, this.name), this.onSuccessResponse); + responseEmitter.removeListener(getRouteKey(ResponseEmitterResult.error, this.name), this.onErrorResponse); + responseEmitter.removeListener(getRouteKey(ResponseEmitterResult.ack, this.name), this.onAcknowledgeResponse); } private attachEmitters(): void { - responseEmitter.on(ResponseEmitterResult.success, async (msg: Message, result) => { - const { resolve } = this.replyStack.get(msg.properties.correlationId); - result = await this.intercept(result, msg); - resolve(result); - }); - responseEmitter.on(ResponseEmitterResult.error, async (msg: Message, err) => { - const { reject } = this.replyStack.get(msg.properties.correlationId); - await this.intercept('', msg, err); - reject(err); - }); - responseEmitter.on(ResponseEmitterResult.ack, async (msg: Message) => { - this.ack(msg); - }); + this.detachEmitters(); + responseEmitter.on(getRouteKey(ResponseEmitterResult.success, this.name), this.onSuccessResponse); + responseEmitter.on(getRouteKey(ResponseEmitterResult.error, this.name), this.onErrorResponse); + responseEmitter.on(getRouteKey(ResponseEmitterResult.ack, this.name), this.onAcknowledgeResponse); + } + + private onSuccessResponse = async (msg: Message, result: any): Promise => { + const { resolve } = this.replyStack.get(msg.properties.correlationId); + result = await this.intercept(result, msg); + resolve(result); + } + + private onErrorResponse = async (msg: Message, err: Error | RMQError): Promise => { + const { reject } = this.replyStack.get(msg.properties.correlationId); + await this.intercept('', msg, err); + reject(err); + } + + private onAcknowledgeResponse = (msg: Message): void => { + this.ack(msg); } private async intercept(res: any, msg: Message, error?: Error) { @@ -136,31 +162,35 @@ export class RMQTestService implements OnModuleInit, IRMQService { } private async bindRMQRoutes(): Promise { - this.routes = this.metadataAccessor.getAllRMQPaths(); - if (this.routes.length > 0) { - this.routes.map(async (r) => { + this.routeKeys = this.metadataAccessor.getAllRMQRouteKeys(this.name); + if (this.routeKeys.length > 0) { + this.routeKeys.map(async (r) => { this.logger.log(`Mapped ${r}`, 'RMQRoute'); }); } } - private async useMiddleware(msg: Message) { + private async useMiddleware(msg: IRMQMessage) { if (!this.options.middleware || this.options.middleware.length === 0) { return msg; } for (const middleware of this.options.middleware) { - msg = await new middleware(this.logger).transform(msg); + // to be backward compatible + msg = (await new middleware(this.logger).transform(msg)) as IRMQMessage; + msg.serviceName = this.name; } return msg; } - private getRouteByTopic(topic: string): string { - return this.routes.find((route) => { - if (route === topic) { + private getRouteKeyByTopic(topic: string): string { + const routeKey = getRouteKey(topic, this.name); + + return this.routeKeys.find((route) => { + if (route === routeKey) { return true; } const regexString = '^' + route.replace(/\*/g, '([^.]+)').replace(/#/g, '([^.]+\.?)+') + '$'; - return topic.search(regexString) !== -1; + return routeKey.search(regexString) !== -1; }); } diff --git a/lib/rmq.explorer.ts b/lib/rmq.explorer.ts index e0c4b35..c0acb48 100644 --- a/lib/rmq.explorer.ts +++ b/lib/rmq.explorer.ts @@ -3,17 +3,17 @@ import { DiscoveryService } from '@nestjs/core'; import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper'; import { MetadataScanner } from '@nestjs/core/metadata-scanner'; import { RMQMetadataAccessor } from './rmq-metadata.accessor'; -import { Message } from 'amqplib'; import { requestEmitter, responseEmitter, ResponseEmitterResult } from './emmiters/router.emmiter'; -import { ERROR_TYPE, ERROR_UNDEFINED_FROM_RPC } from './constants'; +import { DEFAULT_SERVICE_NAME, ERROR_TYPE, ERROR_UNDEFINED_FROM_RPC } from './constants'; import { ExtendedMessage } from './classes/rmq-extended-message.class'; import { RMQError } from './classes/rmq-error.class'; import { IRouteOptions } from './interfaces/queue-meta.interface'; import { validate } from 'class-validator'; +import { IRMQMessage } from './interfaces/rmq-message.interface'; +import { getRouteKey } from './utils/get-route-key'; @Injectable() export class RMQExplorer implements OnModuleInit { - constructor( private readonly discoveryService: DiscoveryService, private readonly metadataAccessor: RMQMetadataAccessor, @@ -50,12 +50,26 @@ export class RMQExplorer implements OnModuleInit { if (!path || !options) { return; } - this.metadataAccessor.addRMQPath(path); - this.attachEmitter(path, options, instance, methodRef); + + const services = typeof options.name === 'string' ? + [options.name] : + (options.name ?? [DEFAULT_SERVICE_NAME]); + + for (const service of services) { + const routeKey = getRouteKey(path, service); + this.metadataAccessor.addRMQRouteKey(routeKey); + this.attachEmitter(routeKey, service, options, instance, methodRef); + } } - private attachEmitter(path: string, options: IRouteOptions, instance: Record, methodRef: Function) { - requestEmitter.on(path, async (msg: Message) => { + private attachEmitter( + routeKey: string, + serviceName: string, + options: IRouteOptions, + instance: Record, + methodRef: Function + ) { + requestEmitter.on(routeKey, async (msg: IRMQMessage) => { const messageParams: number[] = this.metadataAccessor.getRMQMessageIndexes(Object.getPrototypeOf(instance), methodRef.name); try { @@ -70,30 +84,44 @@ export class RMQExplorer implements OnModuleInit { const error = await this.validateRequest(instance, methodRef, funcArgs); if (error) { responseEmitter.emit( - ResponseEmitterResult.error, + getRouteKey(ResponseEmitterResult.error, serviceName), msg, new RMQError(error, ERROR_TYPE.RMQ), ); - responseEmitter.emit(ResponseEmitterResult.ack, msg); + responseEmitter.emit( + getRouteKey(ResponseEmitterResult.ack, serviceName), + msg + ); return; } const result = await methodRef.apply(instance, funcArgs); if (msg.properties.replyTo && result) { - responseEmitter.emit(ResponseEmitterResult.success, msg, result); + responseEmitter.emit( + getRouteKey(ResponseEmitterResult.success, serviceName), + msg, + result + ); } else if (msg.properties.replyTo && result === undefined) { responseEmitter.emit( - ResponseEmitterResult.error, + getRouteKey(ResponseEmitterResult.error, serviceName), msg, new RMQError(ERROR_UNDEFINED_FROM_RPC, ERROR_TYPE.RMQ), ); } } catch (err) { if (msg.properties.replyTo) { - responseEmitter.emit(ResponseEmitterResult.error, msg, err); + responseEmitter.emit( + getRouteKey(ResponseEmitterResult.error, serviceName), + msg, + err + ); } } if (!options?.manualAck) { - responseEmitter.emit(ResponseEmitterResult.ack, msg); + responseEmitter.emit( + getRouteKey(ResponseEmitterResult.ack, serviceName), + msg + ); } }); } @@ -120,6 +148,6 @@ export class RMQExplorer implements OnModuleInit { } } -export const RMQMessageFactory = (msg: Message) => { +export const RMQMessageFactory = (msg: IRMQMessage) => { return [JSON.parse(msg.content.toString())]; }; \ No newline at end of file diff --git a/lib/rmq.module.ts b/lib/rmq.module.ts index ad22233..a2229cc 100644 --- a/lib/rmq.module.ts +++ b/lib/rmq.module.ts @@ -1,56 +1,50 @@ import { RMQService } from './rmq.service'; -import { DynamicModule, Global, Module, Provider } from '@nestjs/common'; +import { DynamicModule, Module, Provider } from '@nestjs/common'; import { IRMQServiceAsyncOptions, IRMQServiceOptions } from './interfaces/rmq-options.interface'; -import { RMQMetadataAccessor } from './rmq-metadata.accessor'; -import { RMQExplorer } from './rmq.explorer'; -import { DiscoveryModule } from '@nestjs/core'; -import { RMQ_MODULE_OPTIONS } from './constants'; -import { RmqErrorService } from './rmq-error.service'; -import { RMQTestService } from './rmq-test.service'; +import { DEFAULT_SERVICE_NAME } from './constants'; +import { RMQGlobalModule } from './rmq-global.module'; +import { getServiceToken } from './utils/get-service-token'; -@Global() -@Module({ - imports: [DiscoveryModule], - providers: [RMQMetadataAccessor, RMQExplorer, RmqErrorService] -}) +@Module({}) export class RMQModule { static forRoot(options: IRMQServiceOptions): DynamicModule { return { module: RMQModule, - providers: [RMQService, { provide: RMQ_MODULE_OPTIONS, useValue: options }], - exports: [RMQService], + imports: [RMQGlobalModule.forRoot(options)], }; } static forRootAsync(options: IRMQServiceAsyncOptions): DynamicModule { - const asyncOptions = this.createAsyncOptionsProvider(options); return { module: RMQModule, - imports: options.imports, - providers: [RMQService, RMQMetadataAccessor, RMQExplorer, asyncOptions], - exports: [RMQService], + imports: [RMQGlobalModule.forRootAsync(options)], }; } - static forTest(options: Partial) { + static forTest(options: Partial): DynamicModule { return { module: RMQModule, - providers: [{ - provide: RMQService, - useClass: RMQTestService - }, { provide: RMQ_MODULE_OPTIONS, useValue: options }], - exports: [RMQService], + imports: [RMQGlobalModule.forTest(options)], }; } - private static createAsyncOptionsProvider(options: IRMQServiceAsyncOptions): Provider { + static forFeature( + serviceName: string = DEFAULT_SERVICE_NAME + ): DynamicModule { + // redirect service by overriding the name, + // might result in competing instances, + // but is easy for converting codes already in the wild + // not great, not terrible + const provider: Provider = { + provide: RMQService, + useFactory: (service: RMQService) => service, + inject: [getServiceToken(serviceName)], + }; + return { - provide: RMQ_MODULE_OPTIONS, - useFactory: async (...args: any[]) => { - const config = await options.useFactory(...args); - return config; - }, - inject: options.inject || [], + module: RMQModule, + providers: [provider], + exports: [provider], }; } } diff --git a/lib/rmq.service.spec.ts b/lib/rmq.service.spec.ts index 324d431..e9b4895 100644 --- a/lib/rmq.service.spec.ts +++ b/lib/rmq.service.spec.ts @@ -7,8 +7,8 @@ describe('RMQService', () => { let rmqService: RMQService; beforeEach(async () => { - let accessor = new RMQMetadataAccessor(new Reflector()); - let errorService = new RmqErrorService({ + const accessor = new RMQMetadataAccessor(new Reflector()); + const errorService = new RmqErrorService({ exchangeName: 'test', connections: [] }); @@ -17,43 +17,43 @@ describe('RMQService', () => { serviceName: '', connections: [] }, accessor, errorService); - rmqService['routes'] = [ - 'exect.match.rpc', - '*.*.star', - '#.hash', - 'pattent.#', - ] + rmqService['routeKeys'] = [ + 'Default:exect.match.rpc', + 'Default:*.*.star', + 'Default:#.hash', + 'Default:pattent.#', + ]; }); describe('Test regex', () => { it('Matching', async () => { - const res = rmqService['getRouteByTopic']('exect.match.rpc') - expect(res).toBe(rmqService['routes'][0]); + const res = rmqService['getRouteKeyByTopic']('exect.match.rpc'); + expect(res).toBe(rmqService['routeKeys'][0]); }); it('Pattern * - success', async () => { - const res = rmqService['getRouteByTopic']('oh.thisis.star') - expect(res).toBe(rmqService['routes'][1]); + const res = rmqService['getRouteKeyByTopic']('oh.thisis.star'); + expect(res).toBe(rmqService['routeKeys'][1]); }); it('Pattern * - fail', async () => { - const res = rmqService['getRouteByTopic']('oh.this.is.star') + const res = rmqService['getRouteKeyByTopic']('oh.this.is.star'); expect(res).toBe(undefined); }); it('Pattern # - success start', async () => { - const res = rmqService['getRouteByTopic']('this.is.real.hash') - expect(res).toBe(rmqService['routes'][2]); + const res = rmqService['getRouteKeyByTopic']('this.is.real.hash'); + expect(res).toBe(rmqService['routeKeys'][2]); }); it('Pattern # - success end', async () => { - const res = rmqService['getRouteByTopic']('pattent.topic') - expect(res).toBe(rmqService['routes'][3]); + const res = rmqService['getRouteKeyByTopic']('pattent.topic'); + expect(res).toBe(rmqService['routeKeys'][3]); }); it('Pattern # - fail', async () => { - const res = rmqService['getRouteByTopic']('this.pattent.topic') + const res = rmqService['getRouteKeyByTopic']('this.pattent.topic'); expect(res).toBe(undefined); }); }); diff --git a/lib/rmq.service.ts b/lib/rmq.service.ts index e058d09..8e6e338 100644 --- a/lib/rmq.service.ts +++ b/lib/rmq.service.ts @@ -13,7 +13,7 @@ import { ERROR_TYPE, REPLY_QUEUE, DEFAULT_HEARTBEAT_TIME, - RMQ_MODULE_OPTIONS, INITIALIZATION_STEP_DELAY, ERROR_NO_QUEUE, + RMQ_MODULE_OPTIONS, INITIALIZATION_STEP_DELAY, ERROR_NO_QUEUE, DEFAULT_SERVICE_NAME, } from './constants'; import { EventEmitter } from 'events'; import { Channel, Message } from 'amqplib'; @@ -30,6 +30,8 @@ import { RMQMetadataAccessor } from './rmq-metadata.accessor'; import { RmqErrorService } from './rmq-error.service'; import { getUniqId } from './utils/get-uniq-id'; import { IRMQService } from './interfaces/rmq-service.interface'; +import { IRMQMessage } from './interfaces/rmq-message.interface'; +import { getRouteKey, getTopic } from './utils/get-route-key'; @Injectable() export class RMQService implements OnModuleInit, IRMQService { @@ -39,11 +41,11 @@ export class RMQService implements OnModuleInit, IRMQService { private options: IRMQServiceOptions; private sendResponseEmitter: EventEmitter = new EventEmitter(); private replyQueue: string = REPLY_QUEUE; - private routes: string[]; + private routeKeys: string[]; private logger: LoggerService; - private isConnected: boolean = false; private isInitialized: boolean = false; + public readonly name: string; constructor( @Inject(RMQ_MODULE_OPTIONS) options: IRMQServiceOptions, @@ -51,11 +53,16 @@ export class RMQService implements OnModuleInit, IRMQService { private readonly errorService: RmqErrorService ) { this.options = options; - this.logger = options.logger ? options.logger : new RQMColorLogger(this.options.logMessages); + this.name = this.options.name ?? DEFAULT_SERVICE_NAME; + this.logger = options.logger ? options.logger : new RQMColorLogger(this.options.logMessages, this.name); validateOptions(this.options, this.logger); } async onModuleInit() { + if (this.isInitialized) { + return; + } + await this.init(); this.isInitialized = true; } @@ -177,7 +184,7 @@ export class RMQService implements OnModuleInit, IRMQService { this.options.prefetchCount ?? DEFAULT_PREFETCH_COUNT, this.options.isGlobalPrefetchCount ?? false ); - if (this.options.queueName) { + if (typeof this.options.queueName === 'string') { this.listen(channel); } this.logConnected(); @@ -208,21 +215,29 @@ export class RMQService implements OnModuleInit, IRMQService { } private async listen(channel: Channel) { - await channel.assertQueue(this.options.queueName, { + const queue = await channel.assertQueue(this.options.queueName, { durable: this.options.isQueueDurable ?? true, + exclusive: this.options.isQueueExclusive ?? !this.options.queueName, arguments: this.options.queueArguments ?? {}, }); + this.options.queueName = queue.queue; + await this.bindRMQRoutes(channel); await channel.consume( this.options.queueName, async (msg: Message) => { - this.logger.debug(`Received ▼ [${msg.fields.routingKey}] ${msg.content}`); - const route = this.getRouteByTopic(msg.fields.routingKey); - if (route) { - msg = await this.useMiddleware(msg); - requestEmitter.emit(route, msg); + const message: IRMQMessage = { + ...msg, + serviceName: this.name + }; + + this.logger.debug(`Received ▼ [${message.fields.routingKey}] ${message.content}`); + const routeKey = this.getRouteKeyByTopic(message.fields.routingKey); + if (routeKey) { + msg = await this.useMiddleware(message); + requestEmitter.emit(routeKey, message); } else { - this.reply('', msg, new RMQError(ERROR_NO_ROUTE, ERROR_TYPE.TRANSPORT)); + this.reply('', message, new RMQError(ERROR_NO_ROUTE, ERROR_TYPE.TRANSPORT)); } }, { noAck: false } @@ -230,29 +245,39 @@ export class RMQService implements OnModuleInit, IRMQService { } private async bindRMQRoutes(channel: Channel): Promise { - this.routes = this.metadataAccessor.getAllRMQPaths(); - if (this.routes.length > 0) { - this.routes.map(async (r) => { + this.routeKeys = this.metadataAccessor.getAllRMQRouteKeys(this.name); + + if (this.routeKeys.length > 0) { + this.routeKeys.map(async (r) => { this.logger.log(`Mapped ${r}`, 'RMQRoute'); - await channel.bindQueue(this.options.queueName, this.options.exchangeName, r); + await channel.bindQueue(this.options.queueName, this.options.exchangeName, getTopic(r)); }); } } private detachEmitters(): void { - responseEmitter.removeAllListeners(); + responseEmitter.removeListener(getRouteKey(ResponseEmitterResult.success, this.name), this.onSuccessResponse); + responseEmitter.removeListener(getRouteKey(ResponseEmitterResult.error, this.name), this.onErrorResponse); + responseEmitter.removeListener(getRouteKey(ResponseEmitterResult.ack, this.name), this.onAcknowledgeResponse); } private attachEmitters(): void { - responseEmitter.on(ResponseEmitterResult.success, async (msg, result) => { - this.reply(result, msg); - }); - responseEmitter.on(ResponseEmitterResult.error, async (msg, err) => { - this.reply('', msg, err); - }); - responseEmitter.on(ResponseEmitterResult.ack, async (msg) => { - this.ack(msg); - }); + this.detachEmitters(); + responseEmitter.on(getRouteKey(ResponseEmitterResult.success, this.name), this.onSuccessResponse); + responseEmitter.on(getRouteKey(ResponseEmitterResult.error, this.name), this.onErrorResponse); + responseEmitter.on(getRouteKey(ResponseEmitterResult.ack, this.name), this.onAcknowledgeResponse); + } + + private onSuccessResponse = (msg: Message, result: any): void => { + this.reply(result, msg); + } + + private onErrorResponse = (msg: Message, err: Error | RMQError): void => { + this.reply('', msg, err); + } + + private onAcknowledgeResponse = (msg: Message): void => { + this.ack(msg); } private async reply(res: any, msg: Message, error: Error | RMQError = null) { @@ -266,22 +291,26 @@ export class RMQService implements OnModuleInit, IRMQService { this.logger.debug(`Sent ▲ [${msg.fields.routingKey}] ${JSON.stringify(res)}`); } - private getRouteByTopic(topic: string): string { - return this.routes.find((route) => { - if (route === topic) { + private getRouteKeyByTopic(topic: string): string { + const routeKey = getRouteKey(topic, this.name); + + return this.routeKeys.find((route) => { + if (route === routeKey) { return true; } const regexString = '^' + route.replace(/\*/g, '([^.]+)').replace(/#/g, '([^.]+\.?)+') + '$'; - return topic.search(regexString) !== -1; + return routeKey.search(regexString) !== -1; }); } - private async useMiddleware(msg: Message) { + private async useMiddleware(msg: IRMQMessage) { if (!this.options.middleware || this.options.middleware.length === 0) { return msg; } for (const middleware of this.options.middleware) { - msg = await new middleware(this.logger).transform(msg); + // to be backward compatible + msg = (await new middleware(this.logger).transform(msg)) as IRMQMessage; + msg.serviceName = this.name; } return msg; } @@ -310,7 +339,7 @@ export class RMQService implements OnModuleInit, IRMQService { private logConnected() { this.logger.log(CONNECTED_MESSAGE, 'RMQModule'); - if (!this.options.queueName && this.metadataAccessor.getAllRMQPaths().length > 0) { + if (!this.options.queueName && this.metadataAccessor.getAllRMQRouteKeys(this.name).length > 0) { this.logger.warn(ERROR_NO_QUEUE, 'RMQModule'); } } diff --git a/lib/utils/get-route-key.ts b/lib/utils/get-route-key.ts new file mode 100644 index 0000000..e5c4552 --- /dev/null +++ b/lib/utils/get-route-key.ts @@ -0,0 +1,14 @@ +import { DEFAULT_SERVICE_NAME } from '../constants'; + +export const getRouteKey = (pathOrTopic: string, serviceName: string = DEFAULT_SERVICE_NAME): string => { + return `${serviceName}:${pathOrTopic}`; +}; + +export const getTopic = (routeKey: string): string => { + const index = routeKey.indexOf(':'); + if (index < 0) { + return routeKey; + } + + return routeKey.substr(index + 1); +}; \ No newline at end of file diff --git a/lib/utils/get-service-token.ts b/lib/utils/get-service-token.ts new file mode 100644 index 0000000..af05c6a --- /dev/null +++ b/lib/utils/get-service-token.ts @@ -0,0 +1,24 @@ +import { Type } from '@nestjs/common'; +import { RMQService } from '../rmq.service'; +import { DEFAULT_SERVICE_NAME } from '../constants'; +import { RmqErrorService } from '../rmq-error.service'; + +export const getErrorServiceToken = (name: string = DEFAULT_SERVICE_NAME): string | Function | Type => { + if (isDefaultService(name)) { + return RmqErrorService; + } + + return `${name}RMQErrorService`; +}; + +export const getServiceToken = (name: string = DEFAULT_SERVICE_NAME): string | Function | Type => { + if (isDefaultService(name)) { + return RMQService; + } + + return `${name}RMQService`; +}; + +export const isDefaultService = (name: string = DEFAULT_SERVICE_NAME): boolean => { + return name === DEFAULT_SERVICE_NAME || !name; +}; \ No newline at end of file