From 4818c3eebc3a7c046b73ea60417bc0732f9d69bf Mon Sep 17 00:00:00 2001 From: bipinparajuli Date: Fri, 28 Nov 2025 10:13:51 +0545 Subject: [PATCH 1/8] feat:implement event handler --- apps/triggers/package.json | 1 + .../data-source-events.listener.ts | 185 +++++++++++++++++- .../src/sources-data/sources-data.module.ts | 4 +- 3 files changed, 180 insertions(+), 10 deletions(-) diff --git a/apps/triggers/package.json b/apps/triggers/package.json index 4f0611c..ecd9d84 100644 --- a/apps/triggers/package.json +++ b/apps/triggers/package.json @@ -47,6 +47,7 @@ "body-parser": "^1.20.3", "bull": "^4.16.5", "cheerio": "^1.0.0", + "expr-eval": "^2.0.2", "google-auth-library": "^9.12.0", "google-spreadsheet": "^4.1.2", "ioredis": "^5.4.2", diff --git a/apps/triggers/src/sources-data/data-source-events.listener.ts b/apps/triggers/src/sources-data/data-source-events.listener.ts index 59a5846..a8ddf65 100644 --- a/apps/triggers/src/sources-data/data-source-events.listener.ts +++ b/apps/triggers/src/sources-data/data-source-events.listener.ts @@ -1,26 +1,193 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { forwardRef, Inject, Injectable, Logger } from '@nestjs/common'; import { OnEvent } from '@nestjs/event-emitter'; import { DATA_SOURCE_EVENTS } from '@lib/core'; -import type { DataSourceEventPayload } from '@lib/core'; +import type { DataSourceEventPayload, Indicator } from '@lib/core'; +import { DataSource, PrismaService } from '@lib/database'; +import { TriggerStatement } from 'src/trigger/validation/trigger.schema'; +import { Parser } from 'expr-eval'; +import { TriggerService } from 'src/trigger/trigger.service'; @Injectable() export class DataSourceEventsListener { private readonly logger = new Logger(DataSourceEventsListener.name); - constructor() {} + constructor( + private prisma: PrismaService, + @Inject(forwardRef(() => TriggerService)) + private readonly triggerService: TriggerService, + ) {} @OnEvent(DATA_SOURCE_EVENTS.DHM.WATER_LEVEL) - handleDhmWaterLevel(event: DataSourceEventPayload) { - console.log('LOGGED EVENT'); + async handleDhmWaterLevel(event: DataSourceEventPayload) { + const indicators: Indicator[] = event.indicators; + this.logger.log(`DHM WATER LEVEL EVENT RECEIVED`, indicators); + if (indicators.length === 0) { + this.logger.warn(`indicators not found `); + return; + } + const triggers = await this.prisma.trigger.findMany({ + where: { + source: DataSource.DHM, + triggerStatement: { + path: ['source'], + equals: event.indicators[0].indicator, + }, + }, + }); + + if (!triggers.length) { + this.logger.log('No triggers found for DHM Rainfall event'); + return; + } + + for (const trigger of triggers) { + const statement = trigger.triggerStatement as TriggerStatement; + const expression = statement.expression; + + // 2. Compute MEAN of all indicator values + const meanValue = + indicators.reduce((sum, ind) => sum + ind.value, 0) / indicators.length; + + const meetsThreshold = this.evaluateConditionExpression( + { + expression, + sourceSubType: statement.sourceSubType, + }, + meanValue, + ); + + if (meetsThreshold) { + this.logger.log(`Trigger ${trigger.id} MET threshold`); + // update trigger + await this.triggerService.activateTrigger(trigger.uuid, '', trigger); + } else { + this.logger.log(`Trigger ${trigger.id} NOT met`); + } + } } @OnEvent(DATA_SOURCE_EVENTS.DHM.RAINFALL) - handleDhmRainfall(event: DataSourceEventPayload) { - console.log('LOGGED EVENT'); + async handleDhmRainfall(event: DataSourceEventPayload) { + const indicators: Indicator[] = event.indicators; + this.logger.log(`DHM RAIN FALL EVENT RECEIVED`, indicators); + + if (indicators.length === 0) { + this.logger.warn(`indicators not found `); + return; + } + const triggers = await this.prisma.trigger.findMany({ + where: { + source: DataSource.DHM, + triggerStatement: { + path: ['source'], + equals: event.indicators[0].indicator, + }, + }, + }); + + if (!triggers.length) { + this.logger.log('No triggers found for DHM Rainfall event'); + return; + } + + for (const trigger of triggers) { + const statement = trigger.triggerStatement as TriggerStatement; + const expression = statement.expression; + + // 2. Compute MEAN of all indicator values + const meanValue = + indicators.reduce((sum, ind) => sum + ind.value, 0) / indicators.length; + + const meetsThreshold = this.evaluateConditionExpression( + { + expression, + sourceSubType: statement.sourceSubType, + }, + meanValue, + ); + + if (meetsThreshold) { + this.logger.log(`Trigger ${trigger.id} MET threshold`); + // update trigger + await this.triggerService.activateTrigger(trigger.uuid, '', trigger); + } else { + this.logger.log(`Trigger ${trigger.id} NOT met`); + } + } } @OnEvent(DATA_SOURCE_EVENTS.GLOFAS.WATER_LEVEL) - handleGlofasWaterLevel(event: DataSourceEventPayload) { - console.log('LOGGED EVENT'); + async handleGlofasWaterLevel(event: DataSourceEventPayload) { + const indicators: Indicator[] = event.indicators; + + this.logger.log(`GLOFAS WATER LEVEL EVENT RECEIVED`, indicators); + + if (indicators.length === 0) { + this.logger.warn(`indicators not found `); + return; + } + const triggers = await this.prisma.trigger.findMany({ + where: { + source: DataSource.DHM, + triggerStatement: { + path: ['source'], + equals: event.indicators[0].indicator, + }, + }, + }); + + if (!triggers.length) { + this.logger.log('No triggers found for DHM Rainfall event'); + return; + } + /** The indicators freshly emitted from transform() */ + + for (const trigger of triggers) { + const statement = trigger.triggerStatement as TriggerStatement; + const expression = statement.expression; + + // 2. Compute MEAN of all indicator values + const meanValue = + indicators.reduce((sum, ind) => sum + ind.value, 0) / indicators.length; + + const meetsThreshold = this.evaluateConditionExpression( + { + expression, + sourceSubType: statement.sourceSubType, + }, + meanValue, + ); + + if (meetsThreshold) { + this.logger.log(`Trigger ${trigger.id} MET threshold`); + // update trigger + await this.triggerService.activateTrigger(trigger.uuid, '', trigger); + } else { + this.logger.log(`Trigger ${trigger.id} NOT met`); + } + } + } + + private evaluateConditionExpression( + triggerStatement: { expression: string; sourceSubType: string }, + indicatorValue: number, + ): boolean { + try { + const parser = new Parser(); + + const variableName = triggerStatement.sourceSubType; + + const exprResult = parser.evaluate(triggerStatement.expression, { + [variableName]: indicatorValue, + }); + + return Boolean(exprResult); + } catch (error) { + this.logger.error( + `Failed to evaluate expression: ${triggerStatement.expression}`, + error, + ); + return false; + } } } diff --git a/apps/triggers/src/sources-data/sources-data.module.ts b/apps/triggers/src/sources-data/sources-data.module.ts index 9793377..32436a0 100644 --- a/apps/triggers/src/sources-data/sources-data.module.ts +++ b/apps/triggers/src/sources-data/sources-data.module.ts @@ -1,4 +1,4 @@ -import { Module } from '@nestjs/common'; +import { forwardRef, Module } from '@nestjs/common'; import { SourcesDataService } from './sources-data.service'; import { SourcesDataController } from './sources-data.controller'; import { ScheduleSourcesDataService } from './schedule-sources-data.service'; @@ -14,6 +14,7 @@ import Redis from 'ioredis'; import { DataSourceEventsListener } from './data-source-events.listener'; import { HealthMonitoringService, HealthCacheService } from '@lib/core'; import { GlofasModule, GlofasServices } from '@lib/glofas-adapter'; +import { TriggerModule } from 'src/trigger/trigger.module'; @Module({ imports: [ @@ -23,6 +24,7 @@ import { GlofasModule, GlofasServices } from '@lib/glofas-adapter'; }), DhmModule.forRoot(), GlofasModule.forRoot(), + forwardRef(() => TriggerModule), ], controllers: [SourcesDataController], providers: [ From 200514645a2d5e1c97af7f46c5907e74c4095482 Mon Sep 17 00:00:00 2001 From: bipinparajuli Date: Fri, 28 Nov 2025 16:55:04 +0545 Subject: [PATCH 2/8] fix:clean up --- .../data-source-events.listener.ts | 40 ++---- .../schedule-sources-data.service.ts | 120 +++++++++--------- apps/triggers/src/trigger/trigger.service.ts | 18 +++ 3 files changed, 90 insertions(+), 88 deletions(-) diff --git a/apps/triggers/src/sources-data/data-source-events.listener.ts b/apps/triggers/src/sources-data/data-source-events.listener.ts index a8ddf65..ea602de 100644 --- a/apps/triggers/src/sources-data/data-source-events.listener.ts +++ b/apps/triggers/src/sources-data/data-source-events.listener.ts @@ -13,7 +13,6 @@ export class DataSourceEventsListener { constructor( private prisma: PrismaService, - @Inject(forwardRef(() => TriggerService)) private readonly triggerService: TriggerService, ) {} @@ -25,15 +24,10 @@ export class DataSourceEventsListener { this.logger.warn(`indicators not found `); return; } - const triggers = await this.prisma.trigger.findMany({ - where: { - source: DataSource.DHM, - triggerStatement: { - path: ['source'], - equals: event.indicators[0].indicator, - }, - }, - }); + const triggers = await this.triggerService.findTriggersBySourceAndIndicator( + DataSource.DHM, + indicators[0].indicator, + ); if (!triggers.length) { this.logger.log('No triggers found for DHM Rainfall event'); @@ -75,15 +69,10 @@ export class DataSourceEventsListener { this.logger.warn(`indicators not found `); return; } - const triggers = await this.prisma.trigger.findMany({ - where: { - source: DataSource.DHM, - triggerStatement: { - path: ['source'], - equals: event.indicators[0].indicator, - }, - }, - }); + const triggers = await this.triggerService.findTriggersBySourceAndIndicator( + DataSource.DHM, + indicators[0].indicator, + ); if (!triggers.length) { this.logger.log('No triggers found for DHM Rainfall event'); @@ -126,15 +115,10 @@ export class DataSourceEventsListener { this.logger.warn(`indicators not found `); return; } - const triggers = await this.prisma.trigger.findMany({ - where: { - source: DataSource.DHM, - triggerStatement: { - path: ['source'], - equals: event.indicators[0].indicator, - }, - }, - }); + const triggers = await this.triggerService.findTriggersBySourceAndIndicator( + DataSource.GLOFAS, + indicators[0].indicator, + ); if (!triggers.length) { this.logger.log('No triggers found for DHM Rainfall event'); diff --git a/apps/triggers/src/sources-data/schedule-sources-data.service.ts b/apps/triggers/src/sources-data/schedule-sources-data.service.ts index be418c3..e253fb5 100644 --- a/apps/triggers/src/sources-data/schedule-sources-data.service.ts +++ b/apps/triggers/src/sources-data/schedule-sources-data.service.ts @@ -80,7 +80,7 @@ export class ScheduleSourcesDataService } // run every 15 minutes - @Cron('*/15 * * * *') + @Cron('*/30 * * * * *') async syncRiverWaterData() { const riverData = await this.dhmWaterMonitored.execute(); @@ -150,10 +150,9 @@ export class ScheduleSourcesDataService } // run every 15 minutes - @Cron('*/15 * * * *') + @Cron('*/30 * * * *') async syncRainfallData() { const rainfallData = await this.dhmRainfallMonitored.execute(); - if (isErr(rainfallData)) { this.logger.warn(rainfallData.details); if (rainfallData.details instanceof AxiosError) { @@ -165,63 +164,64 @@ export class ScheduleSourcesDataService // return; } // Currently rainfall api is not working so we are using dummy data - const info: RainfallStationData = { - id: 111, - name: 'Doda river at East-West Highway', - basin: 'Koshi', - blink: false, - status: 'BELOW WARNING LEVEL', - history: [ - { - max: 0, - min: 0, - value: 0, - datetime: '2025-10-14T05:00:00.000Z', - }, - { - max: 0, - min: 0, - value: 0, - datetime: '2025-10-14T06:00:00.000Z', - }, - { - max: 0, - min: 0, - value: 0, - datetime: '2025-10-14T07:00:00.000Z', - }, - { - max: 0, - min: 0, - value: 0, - datetime: '2025-10-14T08:00:00.000Z', - }, - { - max: 0, - min: 0, - value: 0, - datetime: '2025-10-14T09:00:00.000Z', - }, - { - max: 0, - min: 0, - value: 0, - datetime: '2025-10-14T10:00:00.000Z', - }, - ], - district: 'Sunsari', - interval: null, - latitude: 26.855192, - longitude: 87.152283, - series_id: 1505, - description: 'Hydrological Station with RLS', - stationIndex: '695', - indicator: 'water_level_m', - units: 'mm', - value: 10.9, - }; - - await this.dhmService.saveDataInDhm(SourceType.RAINFALL, info.name, info); + // const info: RainfallStationData = { + // id: 111, + // name: 'Doda river at East-West Highway', + // basin: 'Koshi', + // blink: false, + // status: 'BELOW WARNING LEVEL', + // history: [ + // { + // max: 0, + // min: 0, + // value: 0, + // datetime: '2025-10-14T05:00:00.000Z', + // }, + // { + // max: 0, + // min: 0, + // value: 0, + // datetime: '2025-10-14T06:00:00.000Z', + // }, + // { + // max: 0, + // min: 0, + // value: 0, + // datetime: '2025-10-14T07:00:00.000Z', + // }, + // { + // max: 0, + // min: 0, + // value: 0, + // datetime: '2025-10-14T08:00:00.000Z', + // }, + // { + // max: 0, + // min: 0, + // value: 0, + // datetime: '2025-10-14T09:00:00.000Z', + // }, + // { + // max: 0, + // min: 0, + // value: 0, + // datetime: '2025-10-14T10:00:00.000Z', + // }, + // ], + // district: 'Sunsari', + // interval: null, + // latitude: 26.855192, + // longitude: 87.152283, + // series_id: 1505, + // description: 'Hydrological Station with RLS', + // stationIndex: '695', + // indicator: 'water_level_m', + // units: 'mm', + // value: 10.9, + // }; + console.log({ rainfallData }); + + // await this.dhmService.saveDataInDhm(SourceType.RAINFALL, info.name, info); } // run every hour diff --git a/apps/triggers/src/trigger/trigger.service.ts b/apps/triggers/src/trigger/trigger.service.ts index 99aae02..0f67a0e 100644 --- a/apps/triggers/src/trigger/trigger.service.ts +++ b/apps/triggers/src/trigger/trigger.service.ts @@ -827,4 +827,22 @@ export class TriggerService { throw new RpcException(error.message); } } + + /** + * Find triggers for a specific source and indicator + */ + async findTriggersBySourceAndIndicator( + source: DataSource, + indicator: string, + ) { + return this.prisma.trigger.findMany({ + where: { + source, + triggerStatement: { + path: ['source'], + equals: indicator, + }, + }, + }); + } } From b10f937800b902a17ded03066b03ee8455ae8936 Mon Sep 17 00:00:00 2001 From: bipinparajuli Date: Mon, 1 Dec 2025 11:03:13 +0545 Subject: [PATCH 3/8] fix:removed console log --- .../src/sources-data/schedule-sources-data.service.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/apps/triggers/src/sources-data/schedule-sources-data.service.ts b/apps/triggers/src/sources-data/schedule-sources-data.service.ts index 5a9fd7e..b7c88bd 100644 --- a/apps/triggers/src/sources-data/schedule-sources-data.service.ts +++ b/apps/triggers/src/sources-data/schedule-sources-data.service.ts @@ -80,7 +80,7 @@ export class ScheduleSourcesDataService } // run every 15 minutes - @Cron('*/30 * * * * *') + @Cron('*/15 * * * *') async syncRiverWaterData() { const riverData = await this.dhmWaterMonitored.execute(); @@ -151,7 +151,7 @@ export class ScheduleSourcesDataService } // run every 15 minutes - @Cron('*/30 * * * *') + @Cron('*/15 * * * *') async syncRainfallData() { const rainfallData = await this.dhmRainfallMonitored.execute(); if (isErr(rainfallData)) { @@ -222,7 +222,6 @@ export class ScheduleSourcesDataService units: 'mm', value: 10.9, }; - console.log({ rainfallData }); const result = await this.dhmService.saveDataInDhm( SourceType.RAINFALL, From 2093c7cffcba0b29a6d4b707723a1d79e98b73e2 Mon Sep 17 00:00:00 2001 From: bipinparajuli Date: Mon, 1 Dec 2025 14:06:19 +0545 Subject: [PATCH 4/8] fix:remove indicators from log --- .../src/sources-data/data-source-events.listener.ts | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/apps/triggers/src/sources-data/data-source-events.listener.ts b/apps/triggers/src/sources-data/data-source-events.listener.ts index ea602de..3cf0b21 100644 --- a/apps/triggers/src/sources-data/data-source-events.listener.ts +++ b/apps/triggers/src/sources-data/data-source-events.listener.ts @@ -19,7 +19,9 @@ export class DataSourceEventsListener { @OnEvent(DATA_SOURCE_EVENTS.DHM.WATER_LEVEL) async handleDhmWaterLevel(event: DataSourceEventPayload) { const indicators: Indicator[] = event.indicators; - this.logger.log(`DHM WATER LEVEL EVENT RECEIVED`, indicators); + this.logger.log( + `DHM WATER LEVEL EVENT RECEIVED ${indicators.length} indicators`, + ); if (indicators.length === 0) { this.logger.warn(`indicators not found `); return; @@ -63,7 +65,9 @@ export class DataSourceEventsListener { @OnEvent(DATA_SOURCE_EVENTS.DHM.RAINFALL) async handleDhmRainfall(event: DataSourceEventPayload) { const indicators: Indicator[] = event.indicators; - this.logger.log(`DHM RAIN FALL EVENT RECEIVED`, indicators); + this.logger.log( + `DHM RAIN FALL EVENT RECEIVED ${indicators.length} indicators`, + ); if (indicators.length === 0) { this.logger.warn(`indicators not found `); @@ -109,7 +113,9 @@ export class DataSourceEventsListener { async handleGlofasWaterLevel(event: DataSourceEventPayload) { const indicators: Indicator[] = event.indicators; - this.logger.log(`GLOFAS WATER LEVEL EVENT RECEIVED`, indicators); + this.logger.log( + `GLOFAS WATER LEVEL EVENT RECEIVED ${indicators.length} indicators`, + ); if (indicators.length === 0) { this.logger.warn(`indicators not found `); From cf0a0eae15063dae0d3229801c047a226824c31a Mon Sep 17 00:00:00 2001 From: bipinparajuli Date: Thu, 4 Dec 2025 09:21:09 +0545 Subject: [PATCH 5/8] feat:added event for gfh --- .../data-source-events.listener.ts | 49 +++++++++++++++++++ .../core/src/types/data-source-event.type.ts | 3 ++ packages/gfh-adapter/src/gfh.adapter.ts | 32 ++++++++++-- 3 files changed, 81 insertions(+), 3 deletions(-) diff --git a/apps/triggers/src/sources-data/data-source-events.listener.ts b/apps/triggers/src/sources-data/data-source-events.listener.ts index 3cf0b21..500b336 100644 --- a/apps/triggers/src/sources-data/data-source-events.listener.ts +++ b/apps/triggers/src/sources-data/data-source-events.listener.ts @@ -158,6 +158,55 @@ export class DataSourceEventsListener { } } + @OnEvent(DATA_SOURCE_EVENTS.GFH.WATER_LEVEL) + async handleGfsWaterLevel(event: DataSourceEventPayload) { + const indicators: Indicator[] = event.indicators; + + this.logger.log( + `GFS WATER LEVEL EVENT RECEIVED ${indicators.length} indicators`, + ); + + if (indicators.length === 0) { + this.logger.warn(`indicators not found `); + return; + } + const triggers = await this.triggerService.findTriggersBySourceAndIndicator( + DataSource.GFH, + indicators[0].indicator, + ); + + if (!triggers.length) { + this.logger.log('No triggers found for DHM Rainfall event'); + return; + } + /** The indicators freshly emitted from transform() */ + + for (const trigger of triggers) { + const statement = trigger.triggerStatement as TriggerStatement; + const expression = statement.expression; + + // 2. Compute MEAN of all indicator values + const meanValue = + indicators.reduce((sum, ind) => sum + ind.value, 0) / indicators.length; + + const meetsThreshold = this.evaluateConditionExpression( + { + expression, + sourceSubType: statement.sourceSubType, + }, + meanValue, + ); + + if (meetsThreshold) { + this.logger.log(`Trigger ${trigger.id} MET threshold`); + // update trigger + await this.triggerService.activateTrigger(trigger.uuid, '', trigger); + } else { + this.logger.log(`Trigger ${trigger.id} NOT met`); + } + } + } + private evaluateConditionExpression( triggerStatement: { expression: string; sourceSubType: string }, indicatorValue: number, diff --git a/packages/core/src/types/data-source-event.type.ts b/packages/core/src/types/data-source-event.type.ts index 03f4641..ab10c77 100644 --- a/packages/core/src/types/data-source-event.type.ts +++ b/packages/core/src/types/data-source-event.type.ts @@ -11,6 +11,9 @@ export const DATA_SOURCE_EVENTS = { GLOFAS: { WATER_LEVEL: 'events.data-source.glofas.water-level', }, + GFH: { + WATER_LEVEL: 'events.data-source.gfh.water-level', + }, } as const; export type DataSourceEventName = NestedRecord< diff --git a/packages/gfh-adapter/src/gfh.adapter.ts b/packages/gfh-adapter/src/gfh.adapter.ts index 75af60f..bff5d60 100644 --- a/packages/gfh-adapter/src/gfh.adapter.ts +++ b/packages/gfh-adapter/src/gfh.adapter.ts @@ -1,5 +1,7 @@ import { chainAsync, + DATA_SOURCE_EVENTS, + DataSourceEventPayload, Err, ExecutionContext, HealthMonitoringService, @@ -10,9 +12,10 @@ import { Result, SettingsService, } from "@lib/core"; -import { DataSource } from "@lib/database"; +import { DataSource, SourceType } from "@lib/database"; import { HttpService } from "@nestjs/axios"; -import { Inject, Injectable, Logger } from "@nestjs/common"; +import { Inject, Injectable, Logger, Optional } from "@nestjs/common"; +import { EventEmitter2 } from "@nestjs/event-emitter"; import { getFormattedDate } from "./utils"; import { BatchGetResponse, @@ -45,7 +48,10 @@ export class GfhAdapter extends ObservationAdapter { constructor( @Inject(HttpService) httpService: HttpService, @Inject(SettingsService) settingsService: SettingsService, - @Inject(HealthMonitoringService) healthService: HealthMonitoringService + @Inject(HealthMonitoringService) healthService: HealthMonitoringService, + @Optional() + @Inject(EventEmitter2) + private readonly eventEmitter?: EventEmitter2 ) { super(httpService, settingsService, { dataSource: DataSource.GFH, @@ -70,6 +76,7 @@ export class GfhAdapter extends ObservationAdapter { }); const GfhApiData = SettingsService.get("GFHAPIKEY") as IApiKeyData; this.apiKey = GfhApiData.API_KEY || ""; + console.log(this.apiKey); } /** @@ -87,9 +94,11 @@ export class GfhAdapter extends ObservationAdapter { this.logger.error("GFH URL or api key is not configured"); return Err("GFH URL or api key is not configured"); } + console.log(this.gauges.length); if (this.gauges.length === 0) { this.gauges = await this.fetchGauges(); + console.log(this.gauges); } if (this.gauges.length === 0) { this.logger.error("No gauges found"); @@ -97,6 +106,7 @@ export class GfhAdapter extends ObservationAdapter { } const config: GfhStationDetails[] = this.getConfig(); + console.log({ config }); const allStationDetails = config.flatMap( (gfhStationDetails) => gfhStationDetails.STATION_LOCATIONS_DETAILS @@ -336,6 +346,7 @@ export class GfhAdapter extends ObservationAdapter { return results; }); + this.emitDataSourceEvent(indicators); this.logger.log(`Transformed to ${indicators.length} indicators`); return Ok(indicators); @@ -727,4 +738,19 @@ export class GfhAdapter extends ObservationAdapter { return output; } + + private emitDataSourceEvent(indicators: Indicator[]): void { + if (!this.eventEmitter || indicators.length === 0) { + return; + } + + const payload: DataSourceEventPayload = { + dataSource: DataSource.GFH, + sourceType: SourceType.WATER_LEVEL, + indicators, + fetchedAt: new Date().toISOString(), + }; + + this.eventEmitter.emit(DATA_SOURCE_EVENTS.GFH.WATER_LEVEL, payload); + } } From 45de9904b4d21b6183e30634779f5c727a3fe6d2 Mon Sep 17 00:00:00 2001 From: bipinparajuli Date: Mon, 8 Dec 2025 09:49:59 +0545 Subject: [PATCH 6/8] fix:remove logs --- packages/gfh-adapter/src/gfh.adapter.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/packages/gfh-adapter/src/gfh.adapter.ts b/packages/gfh-adapter/src/gfh.adapter.ts index bff5d60..04a7eba 100644 --- a/packages/gfh-adapter/src/gfh.adapter.ts +++ b/packages/gfh-adapter/src/gfh.adapter.ts @@ -76,7 +76,6 @@ export class GfhAdapter extends ObservationAdapter { }); const GfhApiData = SettingsService.get("GFHAPIKEY") as IApiKeyData; this.apiKey = GfhApiData.API_KEY || ""; - console.log(this.apiKey); } /** @@ -94,11 +93,9 @@ export class GfhAdapter extends ObservationAdapter { this.logger.error("GFH URL or api key is not configured"); return Err("GFH URL or api key is not configured"); } - console.log(this.gauges.length); if (this.gauges.length === 0) { this.gauges = await this.fetchGauges(); - console.log(this.gauges); } if (this.gauges.length === 0) { this.logger.error("No gauges found"); @@ -106,7 +103,6 @@ export class GfhAdapter extends ObservationAdapter { } const config: GfhStationDetails[] = this.getConfig(); - console.log({ config }); const allStationDetails = config.flatMap( (gfhStationDetails) => gfhStationDetails.STATION_LOCATIONS_DETAILS From d84213d7c1949474b916732f5c02bfa274e13ced Mon Sep 17 00:00:00 2001 From: Dipesh Kumar Sah Date: Wed, 10 Dec 2025 15:12:40 +0545 Subject: [PATCH 7/8] feat: update glofas algorithm to evaluate trigger statement --- apps/triggers/src/phases/dto/get.phase.dto.ts | 2 + .../data-source-events.listener.ts | 154 +++++++++++++----- packages/glofas-adapter/src/glofas.adapter.ts | 11 +- 3 files changed, 120 insertions(+), 47 deletions(-) diff --git a/apps/triggers/src/phases/dto/get.phase.dto.ts b/apps/triggers/src/phases/dto/get.phase.dto.ts index 6e40c5d..f54a2ae 100644 --- a/apps/triggers/src/phases/dto/get.phase.dto.ts +++ b/apps/triggers/src/phases/dto/get.phase.dto.ts @@ -43,6 +43,7 @@ export class GetPhaseByName { }) @IsEnum(Phases) @IsNotEmpty() + @IsOptional() phase?: Phases; @ApiProperty({ @@ -68,6 +69,7 @@ export class GetPhaseByName { @ApiProperty({ example: 'sfs-sfs-sfs-sfs-sfs', }) + @IsOptional() @IsString() appId?: string; } diff --git a/apps/triggers/src/sources-data/data-source-events.listener.ts b/apps/triggers/src/sources-data/data-source-events.listener.ts index 500b336..9c77148 100644 --- a/apps/triggers/src/sources-data/data-source-events.listener.ts +++ b/apps/triggers/src/sources-data/data-source-events.listener.ts @@ -1,8 +1,7 @@ -import { forwardRef, Inject, Injectable, Logger } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { OnEvent } from '@nestjs/event-emitter'; -import { DATA_SOURCE_EVENTS } from '@lib/core'; -import type { DataSourceEventPayload, Indicator } from '@lib/core'; -import { DataSource, PrismaService } from '@lib/database'; +import * as core from '@lib/core'; +import { DataSource, Prisma } from '@lib/database'; import { TriggerStatement } from 'src/trigger/validation/trigger.schema'; import { Parser } from 'expr-eval'; import { TriggerService } from 'src/trigger/trigger.service'; @@ -11,14 +10,11 @@ import { TriggerService } from 'src/trigger/trigger.service'; export class DataSourceEventsListener { private readonly logger = new Logger(DataSourceEventsListener.name); - constructor( - private prisma: PrismaService, - private readonly triggerService: TriggerService, - ) {} + constructor(private readonly triggerService: TriggerService) {} - @OnEvent(DATA_SOURCE_EVENTS.DHM.WATER_LEVEL) - async handleDhmWaterLevel(event: DataSourceEventPayload) { - const indicators: Indicator[] = event.indicators; + @OnEvent(core.DATA_SOURCE_EVENTS.DHM.WATER_LEVEL) + async handleDhmWaterLevel(event: core.DataSourceEventPayload) { + const indicators: core.Indicator[] = event.indicators; this.logger.log( `DHM WATER LEVEL EVENT RECEIVED ${indicators.length} indicators`, ); @@ -62,9 +58,9 @@ export class DataSourceEventsListener { } } - @OnEvent(DATA_SOURCE_EVENTS.DHM.RAINFALL) - async handleDhmRainfall(event: DataSourceEventPayload) { - const indicators: Indicator[] = event.indicators; + @OnEvent(core.DATA_SOURCE_EVENTS.DHM.RAINFALL) + async handleDhmRainfall(event: core.DataSourceEventPayload) { + const indicators: core.Indicator[] = event.indicators; this.logger.log( `DHM RAIN FALL EVENT RECEIVED ${indicators.length} indicators`, ); @@ -109,9 +105,9 @@ export class DataSourceEventsListener { } } - @OnEvent(DATA_SOURCE_EVENTS.GLOFAS.WATER_LEVEL) - async handleGlofasWaterLevel(event: DataSourceEventPayload) { - const indicators: Indicator[] = event.indicators; + @OnEvent(core.DATA_SOURCE_EVENTS.GLOFAS.WATER_LEVEL) + async handleGlofasWaterLevel(event: core.DataSourceEventPayload) { + const indicators = event.indicators; this.logger.log( `GLOFAS WATER LEVEL EVENT RECEIVED ${indicators.length} indicators`, @@ -121,46 +117,56 @@ export class DataSourceEventsListener { this.logger.warn(`indicators not found `); return; } + + const indicator = indicators[0].indicator; + const triggers = await this.triggerService.findTriggersBySourceAndIndicator( DataSource.GLOFAS, - indicators[0].indicator, + indicator, ); if (!triggers.length) { this.logger.log('No triggers found for DHM Rainfall event'); return; } - /** The indicators freshly emitted from transform() */ - for (const trigger of triggers) { + // we will create hash map with key as sourceSubType and values as triggers + const triggerMap = triggers.reduce((acc, trigger) => { const statement = trigger.triggerStatement as TriggerStatement; - const expression = statement.expression; + const sourceSubType = statement.sourceSubType; + if (!acc[sourceSubType]) { + acc[sourceSubType] = []; + } + acc[sourceSubType].push(trigger); + return acc; + }, {}); - // 2. Compute MEAN of all indicator values - const meanValue = - indicators.reduce((sum, ind) => sum + ind.value, 0) / indicators.length; + for await (const indicator of indicators) { + const [twoYearsMaxProb, fiveYearsMaxProb, twentyYearsMaxProb] = + indicator.value.toString().split('/'); - const meetsThreshold = this.evaluateConditionExpression( - { - expression, - sourceSubType: statement.sourceSubType, - }, - meanValue, - ); + const twoYearsMaxProbTriggers = triggerMap['two_years_max_prob']; + const fiveYearsMaxProbTriggers = triggerMap['five_years_max_prob']; + const twentyYearsMaxProbTriggers = triggerMap['twenty_years_max_prob']; - if (meetsThreshold) { - this.logger.log(`Trigger ${trigger.id} MET threshold`); - // update trigger - await this.triggerService.activateTrigger(trigger.uuid, '', trigger); - } else { - this.logger.log(`Trigger ${trigger.id} NOT met`); - } + await this.processGlofasTriggers( + twoYearsMaxProbTriggers, + Number(twoYearsMaxProb.trim()) || 0, + ); + await this.processGlofasTriggers( + fiveYearsMaxProbTriggers, + Number(fiveYearsMaxProb.trim()) || 0, + ); + await this.processGlofasTriggers( + twentyYearsMaxProbTriggers, + Number(twentyYearsMaxProb.trim()) || 0, + ); } } - @OnEvent(DATA_SOURCE_EVENTS.GFH.WATER_LEVEL) - async handleGfsWaterLevel(event: DataSourceEventPayload) { - const indicators: Indicator[] = event.indicators; + @OnEvent(core.DATA_SOURCE_EVENTS.GFH.WATER_LEVEL) + async handleGfsWaterLevel(event: core.DataSourceEventPayload) { + const indicators: core.Indicator[] = event.indicators; this.logger.log( `GFS WATER LEVEL EVENT RECEIVED ${indicators.length} indicators`, @@ -207,16 +213,27 @@ export class DataSourceEventsListener { } } + private generateExpression(triggerStatement: TriggerStatement) { + return `${triggerStatement.sourceSubType} ${triggerStatement.operator} ${triggerStatement.value}`; + } + private evaluateConditionExpression( triggerStatement: { expression: string; sourceSubType: string }, indicatorValue: number, ): boolean { try { - const parser = new Parser(); + const parser = new Parser({ + operators: { + logical: true, + comparison: true, + }, + }); const variableName = triggerStatement.sourceSubType; - const exprResult = parser.evaluate(triggerStatement.expression, { + const expression = parser.parse(triggerStatement.expression); + + const exprResult = expression.evaluate({ [variableName]: indicatorValue, }); @@ -229,4 +246,55 @@ export class DataSourceEventsListener { return false; } } + + private async processGlofasTriggers( + triggers: Prisma.TriggerGetPayload<{ + include: { + phase: true; + }; + }>[] = [], + value: number, + ) { + const activatedTriggers = []; + Promise.all( + triggers.map(async (trigger) => { + const statement = trigger.triggerStatement as TriggerStatement; + const expression = this.generateExpression(statement); + + const meetsThreshold = this.evaluateConditionExpression( + { + expression, + sourceSubType: statement.sourceSubType, + }, + value, + ); + + if (meetsThreshold) { + this.logger.log(`Trigger ${trigger.id} MET threshold`); + const t = await this.activateTrigger(trigger); + activatedTriggers.push(trigger); + return t; + } + + return null; + }), + ); + + if (activatedTriggers.length > 0) { + this.logger.log( + `Activated ${activatedTriggers.length} triggers for GLOFAS Subtype ${(triggers[0].triggerStatement as TriggerStatement).sourceSubType} + with value ${value} and triggers ${activatedTriggers.map((trigger) => trigger.id).join(', ')}`, + ); + } + } + + private async activateTrigger( + trigger: Prisma.TriggerGetPayload<{ + include: { + phase: true; + }; + }>, + ) { + await this.triggerService.activateTrigger(trigger.uuid, '', trigger); + } } diff --git a/packages/glofas-adapter/src/glofas.adapter.ts b/packages/glofas-adapter/src/glofas.adapter.ts index a92618b..fa7a470 100644 --- a/packages/glofas-adapter/src/glofas.adapter.ts +++ b/packages/glofas-adapter/src/glofas.adapter.ts @@ -211,19 +211,22 @@ export class GlofasAdapter extends ObservationAdapter { basinId: obs.location, }, source: { - key: 'Glofas', - metadata: { originalUnit: 'mm' }, + key: obs.location, + metadata: { originalUnit: 'percentage' }, }, info: obs.data, }; + const pointForecastData = obs.data?.pointForecastData; + const maxProbability = pointForecastData.maxProbability.data; + const results: Indicator[] = []; results.push({ ...baseIndicator, indicator: 'prob_flood', - units: 'mm', - value: obs.data[0]?.value || 0, + units: 'percentage', + value: maxProbability || '0 / 0 / 0', }); return results; From 039497164a1b7b8a90fb7d089c5fe55186321ceb Mon Sep 17 00:00:00 2001 From: Dipesh Kumar Sah Date: Wed, 10 Dec 2025 16:20:05 +0545 Subject: [PATCH 8/8] feat: update DHM algo to evaluate trigger statement --- .../data-source-events.listener.ts | 193 ++++++++++-------- apps/triggers/src/trigger/trigger.service.ts | 125 +++++++++++- .../src/trigger/validation/trigger.schema.ts | 2 + packages/gfh-adapter/src/gfh.adapter.ts | 6 +- 4 files changed, 238 insertions(+), 88 deletions(-) diff --git a/apps/triggers/src/sources-data/data-source-events.listener.ts b/apps/triggers/src/sources-data/data-source-events.listener.ts index 9c77148..b9d035a 100644 --- a/apps/triggers/src/sources-data/data-source-events.listener.ts +++ b/apps/triggers/src/sources-data/data-source-events.listener.ts @@ -6,6 +6,12 @@ import { TriggerStatement } from 'src/trigger/validation/trigger.schema'; import { Parser } from 'expr-eval'; import { TriggerService } from 'src/trigger/trigger.service'; +type TriggerType = Prisma.TriggerGetPayload<{ + include: { + phase: true; + }; +}>; + @Injectable() export class DataSourceEventsListener { private readonly logger = new Logger(DataSourceEventsListener.name); @@ -15,46 +21,63 @@ export class DataSourceEventsListener { @OnEvent(core.DATA_SOURCE_EVENTS.DHM.WATER_LEVEL) async handleDhmWaterLevel(event: core.DataSourceEventPayload) { const indicators: core.Indicator[] = event.indicators; + this.logger.log( `DHM WATER LEVEL EVENT RECEIVED ${indicators.length} indicators`, ); + if (indicators.length === 0) { this.logger.warn(`indicators not found `); return; } + + const indicator = indicators[0].indicator; + const triggers = await this.triggerService.findTriggersBySourceAndIndicator( DataSource.DHM, - indicators[0].indicator, + indicator, ); if (!triggers.length) { - this.logger.log('No triggers found for DHM Rainfall event'); + this.logger.log( + `No triggers found for DHM Water Level event for indicator ${indicator}`, + ); return; } - for (const trigger of triggers) { - const statement = trigger.triggerStatement as TriggerStatement; - const expression = statement.expression; + const triggerMap: Record = triggers.reduce( + (acc, trigger) => { + const statement = trigger.triggerStatement as TriggerStatement; + const stationId = statement.stationId; + if (!stationId) { + this.logger.warn( + `Station ID not found for trigger ${trigger.uuid} for WATER LEVEL TRIGGER`, + ); + return acc; + } - // 2. Compute MEAN of all indicator values - const meanValue = - indicators.reduce((sum, ind) => sum + ind.value, 0) / indicators.length; + if (!acc[stationId]) { + acc[stationId] = []; + } + acc[stationId].push(trigger); + return acc; + }, + {}, + ); - const meetsThreshold = this.evaluateConditionExpression( - { - expression, - sourceSubType: statement.sourceSubType, - }, - meanValue, - ); + for await (const indicator of indicators) { + const stationId = + indicator.location.type === 'BASIN' + ? indicator.location.seriesId + : undefined; - if (meetsThreshold) { - this.logger.log(`Trigger ${trigger.id} MET threshold`); - // update trigger - await this.triggerService.activateTrigger(trigger.uuid, '', trigger); - } else { - this.logger.log(`Trigger ${trigger.id} NOT met`); + const triggers = triggerMap[stationId]; + + if (!triggers) { + continue; } + + await this.processAndEvaluateTriggers(triggers, indicator.value); } } @@ -69,9 +92,12 @@ export class DataSourceEventsListener { this.logger.warn(`indicators not found `); return; } + + const indicator = indicators[0].indicator; + const triggers = await this.triggerService.findTriggersBySourceAndIndicator( DataSource.DHM, - indicators[0].indicator, + indicator, ); if (!triggers.length) { @@ -79,29 +105,40 @@ export class DataSourceEventsListener { return; } - for (const trigger of triggers) { - const statement = trigger.triggerStatement as TriggerStatement; - const expression = statement.expression; + const triggerMap: Record = triggers.reduce( + (acc, trigger) => { + const statement = trigger.triggerStatement as TriggerStatement; + const stationId = statement.stationId; - // 2. Compute MEAN of all indicator values - const meanValue = - indicators.reduce((sum, ind) => sum + ind.value, 0) / indicators.length; + if (!stationId) { + this.logger.warn( + `Station ID not found for trigger ${trigger.uuid} for RAINFALL TRIGGER`, + ); + return acc; + } - const meetsThreshold = this.evaluateConditionExpression( - { - expression, - sourceSubType: statement.sourceSubType, - }, - meanValue, - ); + if (!acc[stationId]) { + acc[stationId] = []; + } + acc[stationId].push(trigger); + return acc; + }, + {}, + ); - if (meetsThreshold) { - this.logger.log(`Trigger ${trigger.id} MET threshold`); - // update trigger - await this.triggerService.activateTrigger(trigger.uuid, '', trigger); - } else { - this.logger.log(`Trigger ${trigger.id} NOT met`); + for await (const indicator of indicators) { + const stationId = + indicator.location.type === 'BASIN' + ? indicator.location.seriesId + : undefined; + + const triggers = triggerMap[stationId]; + + if (!triggers) { + continue; } + + await this.processAndEvaluateTriggers(triggers, indicator.value); } } @@ -149,15 +186,17 @@ export class DataSourceEventsListener { const fiveYearsMaxProbTriggers = triggerMap['five_years_max_prob']; const twentyYearsMaxProbTriggers = triggerMap['twenty_years_max_prob']; - await this.processGlofasTriggers( + await this.processAndEvaluateTriggers( twoYearsMaxProbTriggers, Number(twoYearsMaxProb.trim()) || 0, ); - await this.processGlofasTriggers( + + await this.processAndEvaluateTriggers( fiveYearsMaxProbTriggers, Number(fiveYearsMaxProb.trim()) || 0, ); - await this.processGlofasTriggers( + + await this.processAndEvaluateTriggers( twentyYearsMaxProbTriggers, Number(twentyYearsMaxProb.trim()) || 0, ); @@ -247,54 +286,40 @@ export class DataSourceEventsListener { } } - private async processGlofasTriggers( - triggers: Prisma.TriggerGetPayload<{ - include: { - phase: true; - }; - }>[] = [], + private async processAndEvaluateTriggers( + triggers: TriggerType[] = [], value: number, ) { - const activatedTriggers = []; - Promise.all( - triggers.map(async (trigger) => { - const statement = trigger.triggerStatement as TriggerStatement; - const expression = this.generateExpression(statement); - - const meetsThreshold = this.evaluateConditionExpression( - { - expression, - sourceSubType: statement.sourceSubType, - }, - value, - ); - - if (meetsThreshold) { - this.logger.log(`Trigger ${trigger.id} MET threshold`); - const t = await this.activateTrigger(trigger); - activatedTriggers.push(trigger); - return t; - } + const triggerUuids = []; - return null; - }), - ); + for (const trigger of triggers) { + const statement = trigger.triggerStatement as TriggerStatement; + const expression = this.generateExpression(statement); + + const meetsThreshold = this.evaluateConditionExpression( + { + expression, + sourceSubType: statement.sourceSubType, + }, + value, + ); - if (activatedTriggers.length > 0) { + if (meetsThreshold) { + this.logger.log(`Trigger ${trigger.uuid} MET threshold`); + triggerUuids.push(trigger.uuid); + } + } + + if (triggerUuids.length > 0) { this.logger.log( - `Activated ${activatedTriggers.length} triggers for GLOFAS Subtype ${(triggers[0].triggerStatement as TriggerStatement).sourceSubType} - with value ${value} and triggers ${activatedTriggers.map((trigger) => trigger.id).join(', ')}`, + `Activated ${triggerUuids.length} triggers for GLOFAS Subtype ${(triggers[0].triggerStatement as TriggerStatement).sourceSubType} + with value ${value} and triggers ${triggerUuids.join(', ')}`, ); + await this.activateTriggers(triggerUuids); } } - private async activateTrigger( - trigger: Prisma.TriggerGetPayload<{ - include: { - phase: true; - }; - }>, - ) { - await this.triggerService.activateTrigger(trigger.uuid, '', trigger); + private async activateTriggers(triggerUuids: string[]) { + await this.triggerService.activeAutomatedTriggers(triggerUuids); } } diff --git a/apps/triggers/src/trigger/trigger.service.ts b/apps/triggers/src/trigger/trigger.service.ts index 12e2ee8..6c57e62 100644 --- a/apps/triggers/src/trigger/trigger.service.ts +++ b/apps/triggers/src/trigger/trigger.service.ts @@ -11,6 +11,7 @@ import { PaginatorTypes, PrismaService, DataSource, + Prisma, } from '@lib/database'; import { randomUUID } from 'crypto'; import { InjectQueue } from '@nestjs/bull'; @@ -22,6 +23,7 @@ import { AddTriggerJobDto, UpdateTriggerParamsJobDto } from 'src/common/dto'; import { catchError, lastValueFrom, of, timeout } from 'rxjs'; import { EventEmitter2 } from '@nestjs/event-emitter'; import { triggerPayloadSchema } from './validation/trigger.schema'; +import { tryCatch } from '@lib/core'; const paginate: PaginatorTypes.PaginateFunction = paginator({ perPage: 10 }); @@ -577,6 +579,122 @@ export class TriggerService { } } + async activeAutomatedTriggers(ids: string[]) { + try { + const triggerWhereArgs: Prisma.TriggerWhereInput = { + uuid: { + in: ids, + }, + source: { + not: DataSource.MANUAL, + }, + isTriggered: false, + isDeleted: false, + }; + + const triggers = await this.prisma.trigger.findMany({ + where: triggerWhereArgs, + }); + + if (triggers.length !== ids.length) { + const notfoundTriggers = ids.filter( + (id) => !triggers.some((trigger) => trigger.uuid === id), + ); + this.logger.warn( + `Some triggers not found to activate: ${notfoundTriggers.join(', ')}`, + ); + } + + const phases: Record< + string, + { mandatoryTriggers: number; optionalTriggers: number } + > = triggers.reduce((acc, trigger) => { + if (!acc[trigger.phaseId as string]) { + acc[trigger.phaseId] = { + mandatoryTriggers: 0, + optionalTriggers: 0, + }; + } + + if (trigger.isMandatory) { + acc[trigger.phaseId].mandatoryTriggers++; + } else { + acc[trigger.phaseId].optionalTriggers++; + } + + return acc; + }, {}); + + const updatedTriggers = await this.prisma.trigger.updateMany({ + where: triggerWhereArgs, + data: { + isTriggered: true, + triggeredAt: new Date(), + triggeredBy: 'System', + }, + }); + + this.logger.log(`Total ${updatedTriggers.count} triggers updated`); + + for (const phaseId in phases) { + await this.prisma.phase.update({ + where: { + uuid: phaseId, + }, + data: { + receivedMandatoryTriggers: { + increment: phases[phaseId].mandatoryTriggers, + }, + receivedOptionalTriggers: { + increment: phases[phaseId].optionalTriggers, + }, + }, + }); + } + + const jobs = triggers.map((trigger) => ({ + name: JOBS.TRIGGER.REACHED_THRESHOLD, + data: trigger, + opts: { + attempts: 3, + removeOnComplete: true, + backoff: { + type: 'exponential', + delay: 1000, + }, + }, + })); + + this.logger.log( + `Total ${jobs.length} triggers added to trigger threshold queue`, + ); + + this.triggerQueue.addBulk(jobs); + + // TODO: Need to think about onchain queue update + + for (const phaseId in phases) { + const phase = await this.prisma.phase.findUnique({ + where: { + uuid: phaseId, + }, + }); + + this.eventEmitter.emit(EVENTS.NOTIFICATION.CREATE, { + payload: { + title: `Trigger Statement Met for ${phase.riverBasin}`, + description: `The trigger condition has been met for phase ${phase.name}, year ${phase.activeYear}, in the ${phase.riverBasin} river basin.`, + group: 'Trigger Statement', + notify: true, + }, + }); + } + } catch (error: any) { + this.logger.error(error); + throw new RpcException(error.message); + } + } + async activateTrigger(uuid: string, appId: string, payload: any) { this.logger.log(`Activating trigger with uuid: ${uuid}`); @@ -608,7 +726,7 @@ export class TriggerService { throw new RpcException('Trigger has already been activated.'); } - if (trigger.source !== DataSource.MANUAL) { + if (trigger.source !== DataSource.MANUAL && false) { this.logger.warn('Cannot activate an automated trigger.'); throw new RpcException('Cannot activate an automated trigger.'); } @@ -838,11 +956,16 @@ export class TriggerService { return this.prisma.trigger.findMany({ where: { source, + isTriggered: false, + isDeleted: false, triggerStatement: { path: ['source'], equals: indicator, }, }, + include: { + phase: true, + }, }); } } diff --git a/apps/triggers/src/trigger/validation/trigger.schema.ts b/apps/triggers/src/trigger/validation/trigger.schema.ts index b6ed788..6a29a67 100644 --- a/apps/triggers/src/trigger/validation/trigger.schema.ts +++ b/apps/triggers/src/trigger/validation/trigger.schema.ts @@ -38,6 +38,8 @@ const triggerStatementSchemaBase = z.object({ sourceSubType: z.string().min(1, 'sourceSubType is required'), operator: z.enum(operatorValues), value: numericValueSchema, + stationId: z.string().optional(), + stationName: z.string().optional(), expression: z .string() .trim() diff --git a/packages/gfh-adapter/src/gfh.adapter.ts b/packages/gfh-adapter/src/gfh.adapter.ts index 04a7eba..d0bddd3 100644 --- a/packages/gfh-adapter/src/gfh.adapter.ts +++ b/packages/gfh-adapter/src/gfh.adapter.ts @@ -323,7 +323,7 @@ export class GfhAdapter extends ObservationAdapter { }, source: { key: "GFH", - metadata: { originalUnit: "" }, + metadata: { originalUnit: "m³/s" }, }, info: { ...stationDetails, @@ -335,8 +335,8 @@ export class GfhAdapter extends ObservationAdapter { results.push({ ...baseIndicator, - indicator: "prob_flood", - units: "", + indicator: "discharge_m3s", + units: "m³/s", value: (obs.stationData?.forecasts?.[0] as any)?.value || 0, });