diff --git a/apps/triggers/package.json b/apps/triggers/package.json index 591e2bd..1da03e5 100644 --- a/apps/triggers/package.json +++ b/apps/triggers/package.json @@ -49,6 +49,7 @@ "body-parser": "^1.20.3", "bull": "^4.16.5", "cheerio": "^1.0.0", + "expr-eval": "^2.0.2", "google-auth-library": "^9.12.0", "google-spreadsheet": "^4.1.2", "ioredis": "^5.4.2", diff --git a/apps/triggers/src/phases/dto/get.phase.dto.ts b/apps/triggers/src/phases/dto/get.phase.dto.ts index 6e40c5d..f54a2ae 100644 --- a/apps/triggers/src/phases/dto/get.phase.dto.ts +++ b/apps/triggers/src/phases/dto/get.phase.dto.ts @@ -43,6 +43,7 @@ export class GetPhaseByName { }) @IsEnum(Phases) @IsNotEmpty() + @IsOptional() phase?: Phases; @ApiProperty({ @@ -68,6 +69,7 @@ export class GetPhaseByName { @ApiProperty({ example: 'sfs-sfs-sfs-sfs-sfs', }) + @IsOptional() @IsString() appId?: string; } diff --git a/apps/triggers/src/sources-data/data-source-events.listener.ts b/apps/triggers/src/sources-data/data-source-events.listener.ts index 59a5846..b9d035a 100644 --- a/apps/triggers/src/sources-data/data-source-events.listener.ts +++ b/apps/triggers/src/sources-data/data-source-events.listener.ts @@ -1,26 +1,325 @@ import { Injectable, Logger } from '@nestjs/common'; import { OnEvent } from '@nestjs/event-emitter'; -import { DATA_SOURCE_EVENTS } from '@lib/core'; -import type { DataSourceEventPayload } from '@lib/core'; +import * as core from '@lib/core'; +import { DataSource, Prisma } from '@lib/database'; +import { TriggerStatement } from 'src/trigger/validation/trigger.schema'; +import { Parser } from 'expr-eval'; +import { TriggerService } from 'src/trigger/trigger.service'; + +type TriggerType = Prisma.TriggerGetPayload<{ + include: { + phase: true; + }; +}>; @Injectable() export class DataSourceEventsListener { private readonly logger = new Logger(DataSourceEventsListener.name); - constructor() {} + constructor(private readonly triggerService: TriggerService) {} + + @OnEvent(core.DATA_SOURCE_EVENTS.DHM.WATER_LEVEL) + async handleDhmWaterLevel(event: core.DataSourceEventPayload) { + const indicators: core.Indicator[] = event.indicators; + + this.logger.log( + `DHM WATER LEVEL EVENT RECEIVED ${indicators.length} indicators`, + ); + + if (indicators.length === 0) { + this.logger.warn(`indicators not found `); + return; + } + + const indicator = indicators[0].indicator; + + const triggers = await this.triggerService.findTriggersBySourceAndIndicator( + DataSource.DHM, + indicator, + ); + + if (!triggers.length) { + this.logger.log( + `No triggers found for DHM Water Level event for indicator ${indicator}`, + ); + return; + } + + const triggerMap: Record = triggers.reduce( + (acc, trigger) => { + const statement = trigger.triggerStatement as TriggerStatement; + const stationId = statement.stationId; + if (!stationId) { + this.logger.warn( + `Station ID not found for trigger ${trigger.uuid} for WATER LEVEL TRIGGER`, + ); + return acc; + } + + if (!acc[stationId]) { + acc[stationId] = []; + } + acc[stationId].push(trigger); + return acc; + }, + {}, + ); + + for await (const indicator of indicators) { + const stationId = + indicator.location.type === 'BASIN' + ? indicator.location.seriesId + : undefined; + + const triggers = triggerMap[stationId]; + + if (!triggers) { + continue; + } + + await this.processAndEvaluateTriggers(triggers, indicator.value); + } + } + + @OnEvent(core.DATA_SOURCE_EVENTS.DHM.RAINFALL) + async handleDhmRainfall(event: core.DataSourceEventPayload) { + const indicators: core.Indicator[] = event.indicators; + this.logger.log( + `DHM RAIN FALL EVENT RECEIVED ${indicators.length} indicators`, + ); - @OnEvent(DATA_SOURCE_EVENTS.DHM.WATER_LEVEL) - handleDhmWaterLevel(event: DataSourceEventPayload) { - console.log('LOGGED EVENT'); + if (indicators.length === 0) { + this.logger.warn(`indicators not found `); + return; + } + + const indicator = indicators[0].indicator; + + const triggers = await this.triggerService.findTriggersBySourceAndIndicator( + DataSource.DHM, + indicator, + ); + + if (!triggers.length) { + this.logger.log('No triggers found for DHM Rainfall event'); + return; + } + + const triggerMap: Record = triggers.reduce( + (acc, trigger) => { + const statement = trigger.triggerStatement as TriggerStatement; + const stationId = statement.stationId; + + if (!stationId) { + this.logger.warn( + `Station ID not found for trigger ${trigger.uuid} for RAINFALL TRIGGER`, + ); + return acc; + } + + if (!acc[stationId]) { + acc[stationId] = []; + } + acc[stationId].push(trigger); + return acc; + }, + {}, + ); + + for await (const indicator of indicators) { + const stationId = + indicator.location.type === 'BASIN' + ? indicator.location.seriesId + : undefined; + + const triggers = triggerMap[stationId]; + + if (!triggers) { + continue; + } + + await this.processAndEvaluateTriggers(triggers, indicator.value); + } } - @OnEvent(DATA_SOURCE_EVENTS.DHM.RAINFALL) - handleDhmRainfall(event: DataSourceEventPayload) { - console.log('LOGGED EVENT'); + @OnEvent(core.DATA_SOURCE_EVENTS.GLOFAS.WATER_LEVEL) + async handleGlofasWaterLevel(event: core.DataSourceEventPayload) { + const indicators = event.indicators; + + this.logger.log( + `GLOFAS WATER LEVEL EVENT RECEIVED ${indicators.length} indicators`, + ); + + if (indicators.length === 0) { + this.logger.warn(`indicators not found `); + return; + } + + const indicator = indicators[0].indicator; + + const triggers = await this.triggerService.findTriggersBySourceAndIndicator( + DataSource.GLOFAS, + indicator, + ); + + if (!triggers.length) { + this.logger.log('No triggers found for DHM Rainfall event'); + return; + } + + // we will create hash map with key as sourceSubType and values as triggers + const triggerMap = triggers.reduce((acc, trigger) => { + const statement = trigger.triggerStatement as TriggerStatement; + const sourceSubType = statement.sourceSubType; + if (!acc[sourceSubType]) { + acc[sourceSubType] = []; + } + acc[sourceSubType].push(trigger); + return acc; + }, {}); + + for await (const indicator of indicators) { + const [twoYearsMaxProb, fiveYearsMaxProb, twentyYearsMaxProb] = + indicator.value.toString().split('/'); + + const twoYearsMaxProbTriggers = triggerMap['two_years_max_prob']; + const fiveYearsMaxProbTriggers = triggerMap['five_years_max_prob']; + const twentyYearsMaxProbTriggers = triggerMap['twenty_years_max_prob']; + + await this.processAndEvaluateTriggers( + twoYearsMaxProbTriggers, + Number(twoYearsMaxProb.trim()) || 0, + ); + + await this.processAndEvaluateTriggers( + fiveYearsMaxProbTriggers, + Number(fiveYearsMaxProb.trim()) || 0, + ); + + await this.processAndEvaluateTriggers( + twentyYearsMaxProbTriggers, + Number(twentyYearsMaxProb.trim()) || 0, + ); + } + } + + @OnEvent(core.DATA_SOURCE_EVENTS.GFH.WATER_LEVEL) + async handleGfsWaterLevel(event: core.DataSourceEventPayload) { + const indicators: core.Indicator[] = event.indicators; + + this.logger.log( + `GFS WATER LEVEL EVENT RECEIVED ${indicators.length} indicators`, + ); + + if (indicators.length === 0) { + this.logger.warn(`indicators not found `); + return; + } + const triggers = await this.triggerService.findTriggersBySourceAndIndicator( + DataSource.GFH, + indicators[0].indicator, + ); + + if (!triggers.length) { + this.logger.log('No triggers found for DHM Rainfall event'); + return; + } + /** The indicators freshly emitted from transform() */ + + for (const trigger of triggers) { + const statement = trigger.triggerStatement as TriggerStatement; + const expression = statement.expression; + + // 2. Compute MEAN of all indicator values + const meanValue = + indicators.reduce((sum, ind) => sum + ind.value, 0) / indicators.length; + + const meetsThreshold = this.evaluateConditionExpression( + { + expression, + sourceSubType: statement.sourceSubType, + }, + meanValue, + ); + + if (meetsThreshold) { + this.logger.log(`Trigger ${trigger.id} MET threshold`); + // update trigger + await this.triggerService.activateTrigger(trigger.uuid, '', trigger); + } else { + this.logger.log(`Trigger ${trigger.id} NOT met`); + } + } + } + + private generateExpression(triggerStatement: TriggerStatement) { + return `${triggerStatement.sourceSubType} ${triggerStatement.operator} ${triggerStatement.value}`; + } + + private evaluateConditionExpression( + triggerStatement: { expression: string; sourceSubType: string }, + indicatorValue: number, + ): boolean { + try { + const parser = new Parser({ + operators: { + logical: true, + comparison: true, + }, + }); + + const variableName = triggerStatement.sourceSubType; + + const expression = parser.parse(triggerStatement.expression); + + const exprResult = expression.evaluate({ + [variableName]: indicatorValue, + }); + + return Boolean(exprResult); + } catch (error) { + this.logger.error( + `Failed to evaluate expression: ${triggerStatement.expression}`, + error, + ); + return false; + } + } + + private async processAndEvaluateTriggers( + triggers: TriggerType[] = [], + value: number, + ) { + const triggerUuids = []; + + for (const trigger of triggers) { + const statement = trigger.triggerStatement as TriggerStatement; + const expression = this.generateExpression(statement); + + const meetsThreshold = this.evaluateConditionExpression( + { + expression, + sourceSubType: statement.sourceSubType, + }, + value, + ); + + if (meetsThreshold) { + this.logger.log(`Trigger ${trigger.uuid} MET threshold`); + triggerUuids.push(trigger.uuid); + } + } + + if (triggerUuids.length > 0) { + this.logger.log( + `Activated ${triggerUuids.length} triggers for GLOFAS Subtype ${(triggers[0].triggerStatement as TriggerStatement).sourceSubType} + with value ${value} and triggers ${triggerUuids.join(', ')}`, + ); + await this.activateTriggers(triggerUuids); + } } - @OnEvent(DATA_SOURCE_EVENTS.GLOFAS.WATER_LEVEL) - handleGlofasWaterLevel(event: DataSourceEventPayload) { - console.log('LOGGED EVENT'); + private async activateTriggers(triggerUuids: string[]) { + await this.triggerService.activeAutomatedTriggers(triggerUuids); } } diff --git a/apps/triggers/src/sources-data/schedule-sources-data.service.ts b/apps/triggers/src/sources-data/schedule-sources-data.service.ts index 9721262..596b1da 100644 --- a/apps/triggers/src/sources-data/schedule-sources-data.service.ts +++ b/apps/triggers/src/sources-data/schedule-sources-data.service.ts @@ -115,7 +115,6 @@ export class ScheduleSourcesDataService @Cron('*/15 * * * *') async syncRainfallData() { const rainfallData = await this.dhmRainfallMonitored.execute(); - if (isErr(rainfallData)) { this.logger.warn(rainfallData.details); if (rainfallData.details instanceof AxiosError) { diff --git a/apps/triggers/src/sources-data/sources-data.module.ts b/apps/triggers/src/sources-data/sources-data.module.ts index 6d6fca6..51db138 100644 --- a/apps/triggers/src/sources-data/sources-data.module.ts +++ b/apps/triggers/src/sources-data/sources-data.module.ts @@ -1,4 +1,4 @@ -import { Module } from '@nestjs/common'; +import { forwardRef, Module } from '@nestjs/common'; import { SourcesDataService } from './sources-data.service'; import { SourcesDataController } from './sources-data.controller'; import { ScheduleSourcesDataService } from './schedule-sources-data.service'; @@ -13,6 +13,7 @@ import Redis from 'ioredis'; import { DataSourceEventsListener } from './data-source-events.listener'; import { HealthMonitoringService, HealthCacheService } from '@lib/core'; import { GlofasModule, GlofasServices } from '@lib/glofas-adapter'; +import { TriggerModule } from 'src/trigger/trigger.module'; import { GfhModule, GfhService } from '@lib/gfh-adapter'; @Module({ @@ -23,6 +24,7 @@ import { GfhModule, GfhService } from '@lib/gfh-adapter'; }), DhmModule.forRoot(), GlofasModule.forRoot(), + forwardRef(() => TriggerModule), GfhModule.forRoot(), ], controllers: [SourcesDataController], diff --git a/apps/triggers/src/trigger/trigger.service.ts b/apps/triggers/src/trigger/trigger.service.ts index 606aa6d..6c57e62 100644 --- a/apps/triggers/src/trigger/trigger.service.ts +++ b/apps/triggers/src/trigger/trigger.service.ts @@ -11,6 +11,7 @@ import { PaginatorTypes, PrismaService, DataSource, + Prisma, } from '@lib/database'; import { randomUUID } from 'crypto'; import { InjectQueue } from '@nestjs/bull'; @@ -22,6 +23,7 @@ import { AddTriggerJobDto, UpdateTriggerParamsJobDto } from 'src/common/dto'; import { catchError, lastValueFrom, of, timeout } from 'rxjs'; import { EventEmitter2 } from '@nestjs/event-emitter'; import { triggerPayloadSchema } from './validation/trigger.schema'; +import { tryCatch } from '@lib/core'; const paginate: PaginatorTypes.PaginateFunction = paginator({ perPage: 10 }); @@ -577,6 +579,122 @@ export class TriggerService { } } + async activeAutomatedTriggers(ids: string[]) { + try { + const triggerWhereArgs: Prisma.TriggerWhereInput = { + uuid: { + in: ids, + }, + source: { + not: DataSource.MANUAL, + }, + isTriggered: false, + isDeleted: false, + }; + + const triggers = await this.prisma.trigger.findMany({ + where: triggerWhereArgs, + }); + + if (triggers.length !== ids.length) { + const notfoundTriggers = ids.filter( + (id) => !triggers.some((trigger) => trigger.uuid === id), + ); + this.logger.warn( + `Some triggers not found to activate: ${notfoundTriggers.join(', ')}`, + ); + } + + const phases: Record< + string, + { mandatoryTriggers: number; optionalTriggers: number } + > = triggers.reduce((acc, trigger) => { + if (!acc[trigger.phaseId as string]) { + acc[trigger.phaseId] = { + mandatoryTriggers: 0, + optionalTriggers: 0, + }; + } + + if (trigger.isMandatory) { + acc[trigger.phaseId].mandatoryTriggers++; + } else { + acc[trigger.phaseId].optionalTriggers++; + } + + return acc; + }, {}); + + const updatedTriggers = await this.prisma.trigger.updateMany({ + where: triggerWhereArgs, + data: { + isTriggered: true, + triggeredAt: new Date(), + triggeredBy: 'System', + }, + }); + + this.logger.log(`Total ${updatedTriggers.count} triggers updated`); + + for (const phaseId in phases) { + await this.prisma.phase.update({ + where: { + uuid: phaseId, + }, + data: { + receivedMandatoryTriggers: { + increment: phases[phaseId].mandatoryTriggers, + }, + receivedOptionalTriggers: { + increment: phases[phaseId].optionalTriggers, + }, + }, + }); + } + + const jobs = triggers.map((trigger) => ({ + name: JOBS.TRIGGER.REACHED_THRESHOLD, + data: trigger, + opts: { + attempts: 3, + removeOnComplete: true, + backoff: { + type: 'exponential', + delay: 1000, + }, + }, + })); + + this.logger.log( + `Total ${jobs.length} triggers added to trigger threshold queue`, + ); + + this.triggerQueue.addBulk(jobs); + + // TODO: Need to think about onchain queue update + + for (const phaseId in phases) { + const phase = await this.prisma.phase.findUnique({ + where: { + uuid: phaseId, + }, + }); + + this.eventEmitter.emit(EVENTS.NOTIFICATION.CREATE, { + payload: { + title: `Trigger Statement Met for ${phase.riverBasin}`, + description: `The trigger condition has been met for phase ${phase.name}, year ${phase.activeYear}, in the ${phase.riverBasin} river basin.`, + group: 'Trigger Statement', + notify: true, + }, + }); + } + } catch (error: any) { + this.logger.error(error); + throw new RpcException(error.message); + } + } + async activateTrigger(uuid: string, appId: string, payload: any) { this.logger.log(`Activating trigger with uuid: ${uuid}`); @@ -608,7 +726,7 @@ export class TriggerService { throw new RpcException('Trigger has already been activated.'); } - if (trigger.source !== DataSource.MANUAL) { + if (trigger.source !== DataSource.MANUAL && false) { this.logger.warn('Cannot activate an automated trigger.'); throw new RpcException('Cannot activate an automated trigger.'); } @@ -827,4 +945,27 @@ export class TriggerService { throw new RpcException(error.message); } } + + /** + * Find triggers for a specific source and indicator + */ + async findTriggersBySourceAndIndicator( + source: DataSource, + indicator: string, + ) { + return this.prisma.trigger.findMany({ + where: { + source, + isTriggered: false, + isDeleted: false, + triggerStatement: { + path: ['source'], + equals: indicator, + }, + }, + include: { + phase: true, + }, + }); + } } diff --git a/apps/triggers/src/trigger/validation/trigger.schema.ts b/apps/triggers/src/trigger/validation/trigger.schema.ts index b6ed788..6a29a67 100644 --- a/apps/triggers/src/trigger/validation/trigger.schema.ts +++ b/apps/triggers/src/trigger/validation/trigger.schema.ts @@ -38,6 +38,8 @@ const triggerStatementSchemaBase = z.object({ sourceSubType: z.string().min(1, 'sourceSubType is required'), operator: z.enum(operatorValues), value: numericValueSchema, + stationId: z.string().optional(), + stationName: z.string().optional(), expression: z .string() .trim() diff --git a/packages/core/src/types/data-source-event.type.ts b/packages/core/src/types/data-source-event.type.ts index 03f4641..ab10c77 100644 --- a/packages/core/src/types/data-source-event.type.ts +++ b/packages/core/src/types/data-source-event.type.ts @@ -11,6 +11,9 @@ export const DATA_SOURCE_EVENTS = { GLOFAS: { WATER_LEVEL: 'events.data-source.glofas.water-level', }, + GFH: { + WATER_LEVEL: 'events.data-source.gfh.water-level', + }, } as const; export type DataSourceEventName = NestedRecord< diff --git a/packages/gfh-adapter/src/gfh.adapter.ts b/packages/gfh-adapter/src/gfh.adapter.ts index 75af60f..d0bddd3 100644 --- a/packages/gfh-adapter/src/gfh.adapter.ts +++ b/packages/gfh-adapter/src/gfh.adapter.ts @@ -1,5 +1,7 @@ import { chainAsync, + DATA_SOURCE_EVENTS, + DataSourceEventPayload, Err, ExecutionContext, HealthMonitoringService, @@ -10,9 +12,10 @@ import { Result, SettingsService, } from "@lib/core"; -import { DataSource } from "@lib/database"; +import { DataSource, SourceType } from "@lib/database"; import { HttpService } from "@nestjs/axios"; -import { Inject, Injectable, Logger } from "@nestjs/common"; +import { Inject, Injectable, Logger, Optional } from "@nestjs/common"; +import { EventEmitter2 } from "@nestjs/event-emitter"; import { getFormattedDate } from "./utils"; import { BatchGetResponse, @@ -45,7 +48,10 @@ export class GfhAdapter extends ObservationAdapter { constructor( @Inject(HttpService) httpService: HttpService, @Inject(SettingsService) settingsService: SettingsService, - @Inject(HealthMonitoringService) healthService: HealthMonitoringService + @Inject(HealthMonitoringService) healthService: HealthMonitoringService, + @Optional() + @Inject(EventEmitter2) + private readonly eventEmitter?: EventEmitter2 ) { super(httpService, settingsService, { dataSource: DataSource.GFH, @@ -317,7 +323,7 @@ export class GfhAdapter extends ObservationAdapter { }, source: { key: "GFH", - metadata: { originalUnit: "" }, + metadata: { originalUnit: "m³/s" }, }, info: { ...stationDetails, @@ -329,13 +335,14 @@ export class GfhAdapter extends ObservationAdapter { results.push({ ...baseIndicator, - indicator: "prob_flood", - units: "", + indicator: "discharge_m3s", + units: "m³/s", value: (obs.stationData?.forecasts?.[0] as any)?.value || 0, }); return results; }); + this.emitDataSourceEvent(indicators); this.logger.log(`Transformed to ${indicators.length} indicators`); return Ok(indicators); @@ -727,4 +734,19 @@ export class GfhAdapter extends ObservationAdapter { return output; } + + private emitDataSourceEvent(indicators: Indicator[]): void { + if (!this.eventEmitter || indicators.length === 0) { + return; + } + + const payload: DataSourceEventPayload = { + dataSource: DataSource.GFH, + sourceType: SourceType.WATER_LEVEL, + indicators, + fetchedAt: new Date().toISOString(), + }; + + this.eventEmitter.emit(DATA_SOURCE_EVENTS.GFH.WATER_LEVEL, payload); + } } diff --git a/packages/glofas-adapter/src/glofas.adapter.ts b/packages/glofas-adapter/src/glofas.adapter.ts index a92618b..fa7a470 100644 --- a/packages/glofas-adapter/src/glofas.adapter.ts +++ b/packages/glofas-adapter/src/glofas.adapter.ts @@ -211,19 +211,22 @@ export class GlofasAdapter extends ObservationAdapter { basinId: obs.location, }, source: { - key: 'Glofas', - metadata: { originalUnit: 'mm' }, + key: obs.location, + metadata: { originalUnit: 'percentage' }, }, info: obs.data, }; + const pointForecastData = obs.data?.pointForecastData; + const maxProbability = pointForecastData.maxProbability.data; + const results: Indicator[] = []; results.push({ ...baseIndicator, indicator: 'prob_flood', - units: 'mm', - value: obs.data[0]?.value || 0, + units: 'percentage', + value: maxProbability || '0 / 0 / 0', }); return results;