diff --git a/apps/triggers/package.json b/apps/triggers/package.json index 4f0611c..591e2bd 100644 --- a/apps/triggers/package.json +++ b/apps/triggers/package.json @@ -40,6 +40,8 @@ "@lib/database": "workspace:*", "@lib/dhm-adapter": "workspace:*", "@lib/glofas-adapter": "workspace:*", + "@lib/gfh-adapter": "workspace:*", + "@nestjs/event-emitter": "3.0.0", "@nestjs/mapped-types": "*", "@rumsan/connect": "^1.0.6", "@rumsan/sdk": "^0.0.47", diff --git a/apps/triggers/src/sources-data/schedule-sources-data.service.ts b/apps/triggers/src/sources-data/schedule-sources-data.service.ts index 681a30f..32e5754 100644 --- a/apps/triggers/src/sources-data/schedule-sources-data.service.ts +++ b/apps/triggers/src/sources-data/schedule-sources-data.service.ts @@ -15,6 +15,7 @@ import { RainfallStationData, } from '@lib/dhm-adapter'; import { GlofasAdapter, GlofasServices } from '@lib/glofas-adapter'; +import { GfhAdapter, GfhService } from '@lib/gfh-adapter'; import { Indicator, isErr, @@ -34,6 +35,7 @@ export class ScheduleSourcesDataService private dhmWaterMonitored: HealthMonitoredAdapter; private dhmRainfallMonitored: HealthMonitoredAdapter; private glofasMonitored: HealthMonitoredAdapter; + private gfhMonitored: HealthMonitoredAdapter; constructor( @Inject(HealthCacheService) @@ -41,8 +43,10 @@ export class ScheduleSourcesDataService private readonly dhmWaterLevelAdapter: DhmWaterLevelAdapter, private readonly dhmRainfallLevelAdapter: DhmRainfallAdapter, private readonly glofasAdapter: GlofasAdapter, + private readonly gfhAdapter: GfhAdapter, private readonly dhmService: DhmService, private readonly glofasServices: GlofasServices, + private readonly gfhService: GfhService, private readonly healthService: HealthMonitoringService, ) { this.dhmWaterMonitored = this.wrapWithHealthMonitoring( @@ -52,6 +56,7 @@ export class ScheduleSourcesDataService this.dhmRainfallLevelAdapter, ); this.glofasMonitored = this.wrapWithHealthMonitoring(this.glofasAdapter); + this.gfhMonitored = this.wrapWithHealthMonitoring(this.gfhAdapter); } onModuleInit() { @@ -60,6 +65,7 @@ export class ScheduleSourcesDataService this.dhmWaterLevelAdapter, this.dhmRainfallLevelAdapter, this.glofasAdapter, + this.gfhAdapter, ].forEach((adapter) => adapter.setHealthService(this.healthService)); } @@ -67,6 +73,7 @@ export class ScheduleSourcesDataService this.syncRiverWaterData(); this.syncRainfallData(); this.synchronizeGlofas(); + this.syncGfhData(); } private wrapWithHealthMonitoring( @@ -255,4 +262,29 @@ export class ScheduleSourcesDataService ); }); } + + //run every 24 hours + @Cron('0 0 * * *') + async syncGfhData() { + const gfhResult = await this.gfhMonitored.execute(); + + if (isErr(gfhResult)) { + this.logger.warn(gfhResult.details); + if (gfhResult.details instanceof AxiosError) { + const errorMessage = `HTTP Error: ${gfhResult.details.response?.status} ${gfhResult.details.response?.statusText} - Data: ${JSON.stringify(gfhResult.details.response?.data)} - Config: ${JSON.stringify(gfhResult.details.response?.config)}`; + this.logger.warn(errorMessage); + } else { + this.logger.warn(gfhResult.details); + } + return; + } + + gfhResult.data.forEach(async (indicator) => { + await this.gfhService.saveDataInGfh( + SourceType.WATER_LEVEL, + (indicator.location as any).basinId, + indicator, + ); + }); + } } diff --git a/apps/triggers/src/sources-data/sources-data.module.ts b/apps/triggers/src/sources-data/sources-data.module.ts index 9793377..6d6fca6 100644 --- a/apps/triggers/src/sources-data/sources-data.module.ts +++ b/apps/triggers/src/sources-data/sources-data.module.ts @@ -9,11 +9,11 @@ import { GlofasService } from './glofas.service'; import { ConfigService } from '@nestjs/config'; import { BullModule } from '@nestjs/bull'; import { BQUEUE } from 'src/constant'; -import { GfhService } from './gfh.service'; import Redis from 'ioredis'; import { DataSourceEventsListener } from './data-source-events.listener'; import { HealthMonitoringService, HealthCacheService } from '@lib/core'; import { GlofasModule, GlofasServices } from '@lib/glofas-adapter'; +import { GfhModule, GfhService } from '@lib/gfh-adapter'; @Module({ imports: [ @@ -23,6 +23,7 @@ import { GlofasModule, GlofasServices } from '@lib/glofas-adapter'; }), DhmModule.forRoot(), GlofasModule.forRoot(), + GfhModule.forRoot(), ], controllers: [SourcesDataController], providers: [ @@ -37,6 +38,7 @@ import { GlofasModule, GlofasServices } from '@lib/glofas-adapter'; HealthMonitoringService, DhmServiceLib, GlofasServices, + GfhService, { provide: 'REDIS_CLIENT', useFactory: (configService: ConfigService) => { diff --git a/packages/core/src/types/indicator.type.ts b/packages/core/src/types/indicator.type.ts index f5b8297..686cf66 100644 --- a/packages/core/src/types/indicator.type.ts +++ b/packages/core/src/types/indicator.type.ts @@ -13,7 +13,7 @@ export interface Indicator { kind: 'OBSERVATION' | 'FORECAST'; indicator: IndicatorType; value: number; - info?: T[]; + info?: T | T[]; units: string; issuedAt: string; location: LocationType; diff --git a/packages/core/src/types/result.type.ts b/packages/core/src/types/result.type.ts index 7fb539e..fb94b95 100644 --- a/packages/core/src/types/result.type.ts +++ b/packages/core/src/types/result.type.ts @@ -9,10 +9,7 @@ export type Result = executionContext?: ExecutionContext; }; -export function Ok( - data: T, - executionContext?: ExecutionContext, -): Result { +export function Ok(data: T, executionContext?: ExecutionContext): Result { return { success: true, data, executionContext }; } @@ -24,9 +21,7 @@ export function Err( return { success: false, error, details, executionContext }; } -export function isErr( - result: Result, -): result is { +export function isErr(result: Result): result is { success: false; error: string; details?: unknown; @@ -60,13 +55,20 @@ export function chain( export async function chainAsync( result: Result | Promise>, - fn: (data: T) => Promise> | Result, + fn: ( + data: T, + executionContext?: ExecutionContext, + ) => Promise> | Result, ): Promise> { const resolvedResult = await result; if (!resolvedResult.success) { return resolvedResult; } - const nextResult = await fn(resolvedResult.data); + + const nextResult = await fn( + resolvedResult.data, + resolvedResult.executionContext, + ); if (nextResult.success && resolvedResult.executionContext) { return { ...nextResult, diff --git a/packages/database/prisma/seeds/seed-gfh-config.ts b/packages/database/prisma/seeds/seed-gfh-config.ts new file mode 100644 index 0000000..7144a32 --- /dev/null +++ b/packages/database/prisma/seeds/seed-gfh-config.ts @@ -0,0 +1,69 @@ +import { PrismaClient } from '../../generated/prisma'; +import { GfhApiKeyType } from '../../src/nestjs'; + +const prisma = new PrismaClient(); +const config: GfhApiKeyType = { + name: 'GFHAPIKEY', + value: { + API_KEY: 'AIzaSyC_5LUv3k3A', + }, + isPrivate: false, +}; + +const main = async () => { + console.log('#'.repeat(30)); + console.log('Seeding GFH API KEY'); + console.log('#'.repeat(30)); + + try { + const gfhApiKey = await prisma.setting.findUnique({ + where: { name: config.name }, + }); + + if (gfhApiKey) { + console.log('GFH API KEY already exists'); + await prisma.setting.delete({ + where: { name: config.name }, + }); + console.log('Old GFH API KEY deleted'); + } + + await prisma.setting.create({ + data: { + name: config.name, + value: config.value, + isPrivate: config.isPrivate, + dataType: 'OBJECT', + requiredFields: [], + isReadOnly: false, + }, + }); + + console.log('GFH API KEY seeded successfully'); + } catch (error: any) { + console.error('Error seeding GFH API KEY:', error); + await prisma.setting.create({ + data: { + name: config.name, + value: config.value, + isPrivate: config.isPrivate, + dataType: 'OBJECT', + requiredFields: [], + isReadOnly: false, + }, + }); + } +}; + +main() + .then(async () => {}) + .catch(async (error) => { + console.log(error); + }) + .finally(async () => { + console.log('#'.repeat(30)); + console.log('Seeding completed'); + console.log('#'.repeat(30)); + console.log('\n'); + await prisma.$disconnect(); + }); diff --git a/packages/database/prisma/seeds/seed.ts b/packages/database/prisma/seeds/seed.ts index 10a9db0..adbaeb2 100644 --- a/packages/database/prisma/seeds/seed.ts +++ b/packages/database/prisma/seeds/seed.ts @@ -62,7 +62,7 @@ const config: DataSourceType = { { STATION_NAME: 'Sarda River Basin', RIVER_NAME: 'doda', - STATION_ID: 'G10165', + STATION_ID: 'G10166', POINT_ID: 'SI002576', LISFLOOD_DRAINAGE_AREA: 432, 'LISFLOOD_X_(DEG)': 80.422917, diff --git a/packages/database/src/nestjs/types/gfh-apiKey.type.ts b/packages/database/src/nestjs/types/gfh-apiKey.type.ts new file mode 100644 index 0000000..9224c10 --- /dev/null +++ b/packages/database/src/nestjs/types/gfh-apiKey.type.ts @@ -0,0 +1,7 @@ +export interface GfhApiKeyType { + name: string; + value: { + API_KEY: string; + }; + isPrivate: boolean; +} diff --git a/packages/database/src/nestjs/types/index.ts b/packages/database/src/nestjs/types/index.ts index d055aaa..08b1acf 100644 --- a/packages/database/src/nestjs/types/index.ts +++ b/packages/database/src/nestjs/types/index.ts @@ -1,2 +1,3 @@ export * from './datasource.type'; export * from './datasource-config.type'; +export * from './gfh-apiKey.type'; diff --git a/packages/gfh-adapter/eslint.config.mjs b/packages/gfh-adapter/eslint.config.mjs new file mode 100644 index 0000000..bb0b0e5 --- /dev/null +++ b/packages/gfh-adapter/eslint.config.mjs @@ -0,0 +1,5 @@ +/** @type {import("eslint").Linter.Config[]} */ + +import { nestjs } from "@workspace/eslint-config/nest-js"; + +export default [...nestjs]; diff --git a/packages/gfh-adapter/index.ts b/packages/gfh-adapter/index.ts new file mode 100644 index 0000000..3bd16e1 --- /dev/null +++ b/packages/gfh-adapter/index.ts @@ -0,0 +1 @@ +export * from "./src"; diff --git a/packages/gfh-adapter/package.json b/packages/gfh-adapter/package.json new file mode 100644 index 0000000..f8ba0e1 --- /dev/null +++ b/packages/gfh-adapter/package.json @@ -0,0 +1,27 @@ +{ + "name": "@lib/gfh-adapter", + "version": "1.0.0", + "main": "./dist/index.js", + "types": "./dist/index.d.ts", + "scripts": { + "build": "tsc", + "test": "jest" + }, + "dependencies": { + "@lib/core": "workspace:*", + "@lib/database": "workspace:*", + "axios": "^1.12.2", + "@nestjs/axios": "^4.0.1", + "cheerio": "^1.1.2" + }, + "devDependencies": { + "@types/jest": "^27.0.2", + "@workspace/eslint-config": "workspace:*", + "@workspace/typescript-config": "workspace:*", + "jest": "^27.0.6", + "typescript": "5.9.2" + }, + "exports": { + ".": "./dist/index.js" + } +} diff --git a/packages/gfh-adapter/src/gfh.adapter.ts b/packages/gfh-adapter/src/gfh.adapter.ts index e69de29..75af60f 100644 --- a/packages/gfh-adapter/src/gfh.adapter.ts +++ b/packages/gfh-adapter/src/gfh.adapter.ts @@ -0,0 +1,730 @@ +import { + chainAsync, + Err, + ExecutionContext, + HealthMonitoringService, + Indicator, + ItemError, + ObservationAdapter, + Ok, + Result, + SettingsService, +} from "@lib/core"; +import { DataSource } from "@lib/database"; +import { HttpService } from "@nestjs/axios"; +import { Inject, Injectable, Logger } from "@nestjs/common"; +import { getFormattedDate } from "./utils"; +import { + BatchGetResponse, + Forecast, + Gauge, + GaugeData, + GaugeInfo, + GfhFetchResponse, + GfhObservation, + GfhStationDetails, + IApiKeyData, + Point, + ProcessedForecast, + QueryForecastsResponse, + SearchGaugesRequest, + SearchGaugesResponse, + StationLoacationDetails, + StationResult, +} from "types"; + +@Injectable() +export class GfhAdapter extends ObservationAdapter { + private readonly logger = new Logger(GfhAdapter.name); + private readonly regionCode = "NP"; + private readonly pageSize = 1000; + private readonly matchRadiusKm = 12; + private apiKey: string = ""; + private gauges: Gauge[] = []; + + constructor( + @Inject(HttpService) httpService: HttpService, + @Inject(SettingsService) settingsService: SettingsService, + @Inject(HealthMonitoringService) healthService: HealthMonitoringService + ) { + super(httpService, settingsService, { + dataSource: DataSource.GFH, + }); + this.setHealthService(healthService); + } + + getAdapterId(): string { + return "GFH"; + } + + async init() { + this.logger.log("GFH Adapter Initialization"); + + this.registerHealthConfig({ + adapterId: this.getAdapterId(), + name: "Gfh API", + dataSource: DataSource.GFH, + sourceUrl: this.getUrl() || "", + fetchIntervalMinutes: 60, + staleThresholdMultiplier: 1.1, + }); + const GfhApiData = SettingsService.get("GFHAPIKEY") as IApiKeyData; + this.apiKey = GfhApiData.API_KEY || ""; + } + + /** + * Fetch raw data from GFH API + */ + async fetch(): Promise> { + const itemErrors: ItemError[] = []; + const successfulResults: GfhFetchResponse[] = []; + + try { + this.logger.log("Fetching GFH observations..."); + + const baseUrl = this.getUrl(); + if (!baseUrl || !this.apiKey) { + this.logger.error("GFH URL or api key is not configured"); + return Err("GFH URL or api key is not configured"); + } + + if (this.gauges.length === 0) { + this.gauges = await this.fetchGauges(); + } + if (this.gauges.length === 0) { + this.logger.error("No gauges found"); + return Err("No gauges found"); + } + + const config: GfhStationDetails[] = this.getConfig(); + + const allStationDetails = config.flatMap( + (gfhStationDetails) => gfhStationDetails.STATION_LOCATIONS_DETAILS + ); + + const results = await Promise.allSettled( + config.flatMap((gfhStationDetails) => { + return gfhStationDetails.STATION_LOCATIONS_DETAILS.map( + async (stationDetails): Promise => { + const [stationGaugeMapping, uniqueGaugeIds] = + this.matchStationToGauge(this.gauges, stationDetails); + + return { + data: await this.processGaugeData(uniqueGaugeIds), + location: gfhStationDetails.RIVER_BASIN, + stationId: stationDetails.STATION_ID, + stationGaugeMapping, + }; + } + ); + }) + ); + + results.forEach((result, index) => { + const stationDetails = allStationDetails[index]; + if (!stationDetails) { + return; + } + if (result.status === "fulfilled") { + successfulResults.push(result.value); + } else { + itemErrors.push({ + itemId: stationDetails.STATION_ID, + itemName: stationDetails.STATION_NAME, + stage: "fetch" as const, + code: "FETCH_FAILED", + message: result.reason?.message || "Unknown error", + timestamp: new Date().toISOString(), + }); + this.logger.warn( + `Failed to fetch data for station ${stationDetails.STATION_ID}: ${result.reason?.message}` + ); + } + }); + + if (successfulResults.length === 0) { + return Err("All stations failed", null, { + totalItems: allStationDetails.length, + successfulItems: 0, + failedItems: allStationDetails.length, + itemErrors, + }); + } + + if (itemErrors.length > 0) { + return Ok(successfulResults, { + totalItems: allStationDetails.length, + successfulItems: successfulResults.length, + failedItems: itemErrors.length, + itemErrors, + }); + } + + return Ok(successfulResults, { + totalItems: allStationDetails.length, + successfulItems: successfulResults.length, + itemErrors, + failedItems: 0, + }); + } catch (error: any) { + this.logger.error("Failed to fetch GFH data", error); + return Err("Failed to fetch GFH observations", error); + } + } + + /** + * Parse and extract meaningful observation data + */ + aggregate( + rawDatas: GfhFetchResponse[], + executionContext?: ExecutionContext + ): Result { + const itemErrors: ItemError[] = executionContext?.itemErrors || []; + + try { + const observations: GfhObservation[] = []; + + const config: GfhStationDetails[] = this.getConfig(); + const stationDetailsMap = new Map(); + + config.forEach((gfhStationDetails) => { + gfhStationDetails.STATION_LOCATIONS_DETAILS.forEach( + (stationDetails) => { + stationDetailsMap.set(stationDetails.STATION_ID, stationDetails); + } + ); + }); + + for (const rawData of rawDatas) { + const stationDetails = stationDetailsMap.get(rawData.stationId); + if (!stationDetails) { + this.logger.warn( + `Station details not found for station ${rawData.stationId}` + ); + continue; + } + + const output = this.buildFinalOutput( + rawData.stationGaugeMapping, + rawData.data + ); + + const [stationKey, stationData] = Object.entries(output)[0] || []; + if (!stationKey || !stationData) { + this.logger.warn( + `No data found for station ${stationDetails.STATION_NAME}` + ); + if ( + executionContext?.itemErrors + ?.map((item) => item.itemId) + .includes(stationDetails.STATION_ID) + ) { + continue; + } else { + itemErrors.push({ + itemId: stationDetails.STATION_ID, + itemName: stationDetails.STATION_NAME, + stage: "aggregate" as const, + code: "GFH_NO_DATA", + message: `No data found for station ${stationDetails.STATION_NAME}`, + timestamp: new Date().toISOString(), + }); + } + continue; + } + + observations.push({ + stationData: stationData as StationResult, + stationName: stationDetails.STATION_NAME, + riverBasin: rawData.location, + }); + } + + this.logger.log(`Aggregated ${observations.length} GFH observations`); + + if (observations.length === 0) { + return Err("No data found for any station", null, { + totalItems: executionContext?.totalItems || rawDatas.length, + successfulItems: 0, + failedItems: itemErrors.length, + itemErrors, + }); + } + + if (itemErrors.length > 0) { + return Ok(observations, { + totalItems: executionContext?.totalItems || rawDatas.length, + successfulItems: observations.length, + failedItems: itemErrors.length, + itemErrors, + }); + } + + return Ok(observations); + } catch (error: any) { + this.logger.error("Failed to aggregate GFH data", error); + return Err("Failed to parse GFH data", error); + } + } + + /** + * Transform GFH observations to standard Indicators + */ + transform(aggregatedData: GfhObservation[]): Result { + try { + const observations = aggregatedData as GfhObservation[]; + + const { dateString } = getFormattedDate(); + + const indicators: Indicator[] = observations.flatMap((obs) => { + const stationDetails = { + riverBasin: obs.riverBasin, + forecastDate: dateString, + source: obs.stationData?.source || "", + latitude: obs.stationData?.gaugeLocation?.latitude?.toFixed(6) || "", + longitude: + obs.stationData?.gaugeLocation?.longitude?.toFixed(6) || "", + stationName: obs.stationName || "", + warningLevel: + obs.stationData?.model_metadata?.thresholds?.warningLevel?.toFixed( + 3 + ) || "", + dangerLevel: + obs.stationData?.model_metadata?.thresholds?.dangerLevel?.toFixed( + 3 + ) || "", + extremeDangerLevel: + obs.stationData?.model_metadata?.thresholds?.extremeDangerLevel?.toFixed( + 3 + ) || "", + basinSize: + obs.stationData?.model_metadata?.thresholds?.basinSize || 0, + riverGaugeId: obs.stationData?.gaugeId || "", + }; + + const history = (obs.stationData?.forecasts || []).map((forecast) => ({ + value: (forecast as any).value || 0, + datetime: + forecast.timeRange?.startTime || obs.stationData?.issuedTime || "", + })); + + const baseIndicator = { + kind: "OBSERVATION" as const, + issuedAt: new Date().toISOString(), + location: { + type: "BASIN" as const, + basinId: obs.riverBasin, + }, + source: { + key: "GFH", + metadata: { originalUnit: "" }, + }, + info: { + ...stationDetails, + history, + }, + }; + + const results: Indicator[] = []; + + results.push({ + ...baseIndicator, + indicator: "prob_flood", + units: "", + value: (obs.stationData?.forecasts?.[0] as any)?.value || 0, + }); + + return results; + }); + + this.logger.log(`Transformed to ${indicators.length} indicators`); + return Ok(indicators); + } catch (error: any) { + this.logger.error("Failed to transform GFH data", error); + return Err("Failed to transform to indicators", error); + } + } + + /** + * Main pipeline execution - chains fetch → aggregate → transform + * Using functional composition - no if-else needed! + */ + async execute(): Promise> { + return chainAsync( + this.fetch(), + (rawData: GfhFetchResponse[], executionContext?: ExecutionContext) => + chainAsync( + this.aggregate(rawData, executionContext), + (observations: GfhObservation[]) => this.transform(observations) + ) + ); + } + + private async fetchGauges() { + const requestData: SearchGaugesRequest = { + regionCode: this.regionCode, + pageSize: this.pageSize, + includeNonQualityVerified: true, + }; + + const allGauges: Gauge[] = []; + try { + while (true) { + const response = await this.makeRequest( + "gauges:searchGaugesByArea", + "POST", + undefined, + requestData + ); + + if (!response) { + break; + } + const gauges = response.gauges || []; + allGauges.push(...gauges); + + const nextPageToken = response.nextPageToken; + if (!nextPageToken) { + break; + } + requestData.pageToken = nextPageToken; + } + return allGauges; + } catch (error) { + this.logger.error("Failed to fetch gauges", error); + return []; + } + } + + private matchStationToGauge( + gauges: Gauge[], + station: StationLoacationDetails + ): [Record, Set] { + this.logger.log( + `Matching station ${station.STATION_ID} to gauges within ${this.matchRadiusKm}km...` + ); + + const validGauges = this.filterValidGauges(gauges); + const stationGaugeMapping: Record = {}; + const uniqueGaugeIds = new Set(); + + try { + if (station.RIVER_GAUGE_ID) { + // If the station has a specific gauge ID, prioritize it + this.logger.log( + `Station ${station.STATION_ID} has specific gauge ID ${station.RIVER_GAUGE_ID}` + ); + + const matchedGauge = validGauges.find( + (g) => g.gaugeId === station.RIVER_GAUGE_ID + ); + if (matchedGauge) { + const stationPoint = this.createPoint( + station["LISFLOOD_X_(DEG)"], + station["LISFLOOD_Y_[DEG]"] + ); + + const gaugePoint = this.createPoint( + matchedGauge.location.longitude, + matchedGauge.location.latitude + ); + + const distance = this.haversineKm(stationPoint, gaugePoint); + + stationGaugeMapping[station.STATION_ID] = { + gaugeId: matchedGauge.gaugeId, + distance, + source: matchedGauge.source || "", + gaugeLocation: matchedGauge.location, + qualityVerified: matchedGauge.qualityVerified || false, + }; + uniqueGaugeIds.add(matchedGauge.gaugeId); + this.logger.log( + `Station ${station.STATION_ID} matched to gauge ${matchedGauge.gaugeId} (${distance.toFixed(2)}km)` + ); + + return [stationGaugeMapping, uniqueGaugeIds]; + } + } + + const stationPoint = this.createPoint( + station["LISFLOOD_X_(DEG)"], + station["LISFLOOD_Y_[DEG]"] + ); + + // Calculate distances to all gauges + const gaugeDistances = validGauges.map((gauge) => { + const gaugePoint = this.createPoint( + gauge.location.longitude, + gauge.location.latitude + ); + return { + gauge, + distance: this.haversineKm(stationPoint, gaugePoint), + }; + }); + + // Find nearby gauges + const nearbyGauges = gaugeDistances.filter( + (gd) => gd.distance <= this.matchRadiusKm + ); + + if (nearbyGauges.length === 0) { + stationGaugeMapping[station.STATION_ID] = null; + this.logger.warn( + `No gauges found within ${this.matchRadiusKm}km for station ${station.STATION_ID}` + ); + } else { + // Find closest gauge + const bestGauge = nearbyGauges.reduce((min, current) => + current.distance < min.distance ? current : min + ); + + const gaugeId = bestGauge.gauge.gaugeId; + uniqueGaugeIds.add(gaugeId); + + stationGaugeMapping[station.STATION_ID] = { + gaugeId, + distance: bestGauge.distance, + source: bestGauge.gauge.source || "", + gaugeLocation: bestGauge.gauge.location, + qualityVerified: bestGauge.gauge.qualityVerified || false, + }; + + this.logger.log( + `Station ${station.STATION_ID} matched to gauge ${gaugeId} ` + + `(${bestGauge.distance.toFixed(2)}km)` + ); + } + } catch (error) { + this.logger.error( + `Error matching station ${station.STATION_ID}: ${error}` + ); + stationGaugeMapping[station.STATION_ID] = null; + } + + return [stationGaugeMapping, uniqueGaugeIds]; + } + + private filterValidGauges(gauges: Gauge[]): Gauge[] { + return gauges.filter( + (g) => + g.location && + typeof g.location.latitude === "number" && + typeof g.location.longitude === "number" + ); + } + + private createPoint(x: number, y: number): Point { + return { x, y }; + } + + private haversineKm(pt1: Point, pt2: Point): number { + const R = 6371; // Earth's radius in km + const dLat = this.toRadians(pt2.y - pt1.y); + const dLon = this.toRadians(pt2.x - pt1.x); + + const a = + Math.sin(dLat / 2) * Math.sin(dLat / 2) + + Math.cos(this.toRadians(pt1.y)) * + Math.cos(this.toRadians(pt2.y)) * + Math.sin(dLon / 2) * + Math.sin(dLon / 2); + const c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a)); + + const test = R * c; + return test; + } + + private toRadians(degrees: number): number { + return degrees * (Math.PI / 180); + } + + private async processGaugeData( + uniqueGaugeIds: Set + ): Promise> { + this.logger.log( + `Processing data for ${uniqueGaugeIds.size} unique gauges...` + ); + + const gaugeDataCache: Record = {}; + + for (const gaugeId of uniqueGaugeIds) { + // Fetch metadata + const metadata = await this.fetchGaugeMetadata(gaugeId); + + // Fetch forecasts + const forecasts = await this.fetchGaugeForecasts(gaugeId); + + // Process latest forecast + let latestForecast: ProcessedForecast | null = null; + if (forecasts.length > 0) { + const latest = forecasts.reduce((max, current) => + new Date(current.issuedTime) > new Date(max.issuedTime) + ? current + : max + ); + + // Extract first forecast range for summary + const firstRange = latest.forecastRanges?.[0] || {}; + + latestForecast = { + issuedTime: latest.issuedTime, + forecastTimeRange: firstRange.timeRange || {}, + forecastTrend: firstRange.trend || "UNKNOWN", + severity: firstRange.severity || "UNKNOWN", + forecastRanges: latest.forecastRanges || [], + }; + } + + gaugeDataCache[gaugeId] = { + model_metadata: metadata, + all_forecasts: forecasts, + latest_forecast: latestForecast, + }; + } + + return gaugeDataCache; + } + + private async fetchGaugeMetadata(gaugeId: string): Promise { + const endpoint = "gaugeModels:batchGet"; + const params = { names: `gaugeModels/${gaugeId}` }; + + const response = await this.makeRequest( + endpoint, + "GET", + params + ); + + if (response && response.gaugeModels) { + return response.gaugeModels[0] || {}; + } + + return {}; + } + + private async makeRequest( + endpoint: string, + method: "GET" | "POST" = "GET", + params?: Record, + jsonData?: any + ): Promise { + const url = new URL(`${this.getUrl()}/${endpoint}`); + + const options: RequestInit = { + method, + headers: { + "Content-Type": "application/json", + }, + }; + + if (method === "POST") { + if (jsonData) { + options.body = JSON.stringify(jsonData); + } + url.searchParams.append("key", this.apiKey); + } else { + // GET request + if (params) { + Object.entries(params).forEach(([key, value]) => { + if (Array.isArray(value)) { + value.forEach((v) => url.searchParams.append(key, v)); + } else { + url.searchParams.append(key, String(value)); + } + }); + } + url.searchParams.append("key", this.apiKey); + } + + const response = await fetch(url.toString(), options); + + if (!response.ok) { + throw new Error(`HTTP error! status: ${response.status}`); + } + + return await response.json(); + } + + private async fetchGaugeForecasts( + gaugeId: string, + daysBack: number = 7 + ): Promise { + const now = new Date(); + const startDate = new Date(now.getTime() - daysBack * 24 * 60 * 60 * 1000); + + const endpoint = "gauges:queryGaugeForecasts"; + const params = { + gaugeIds: [gaugeId], + issuedTimeStart: startDate.toISOString().split("T")[0], + issuedTimeEnd: now.toISOString().split("T")[0], + }; + + const response = await this.makeRequest( + endpoint, + "GET", + params + ); + + if (response && response.forecasts) { + const gaugeForecasts = response.forecasts[gaugeId]; + return gaugeForecasts?.forecasts || []; + } + + return []; + } + + private buildFinalOutput( + stationGaugeMapping: Record, + gaugeDataCache: Record + ): Record { + this.logger.log("Building final output..."); + + const output: Record = {}; + + for (const [stationId, gaugeInfo] of Object.entries(stationGaugeMapping)) { + if (gaugeInfo === null) { + output[stationId] = null; + continue; + } + + const gaugeId = gaugeInfo.gaugeId; + const cachedData: any = gaugeDataCache[gaugeId] || {}; + + // Build base result + const result: StationResult = { + gaugeId, + distance_km: Math.round(gaugeInfo.distance * 100) / 100, + source: gaugeInfo.source, + gaugeLocation: gaugeInfo.gaugeLocation, + qualityVerified: gaugeInfo.qualityVerified, + model_metadata: cachedData.model_metadata || {}, + issuedTime: null, + forecastTimeRange: {}, + forecastTrend: "UNKNOWN", + severity: "UNKNOWN", + forecasts: [], + total_forecasts_available: 0, + }; + + // Add forecast information + const latestForecast = cachedData.latest_forecast; + if (latestForecast) { + result.issuedTime = latestForecast.issuedTime; + result.forecastTimeRange = latestForecast.forecastTimeRange; + result.forecastTrend = latestForecast.forecastTrend; + result.severity = latestForecast.severity; + result.forecasts = latestForecast.forecastRanges; + result.total_forecasts_available = + cachedData.all_forecasts?.length || 0; + } else { + result.message = "No forecasts available"; + } + + output[stationId] = result; + } + + return output; + } +} diff --git a/packages/gfh-adapter/src/gfh.module.ts b/packages/gfh-adapter/src/gfh.module.ts index e69de29..ae40d1e 100644 --- a/packages/gfh-adapter/src/gfh.module.ts +++ b/packages/gfh-adapter/src/gfh.module.ts @@ -0,0 +1,33 @@ +import { Module, DynamicModule } from '@nestjs/common'; +import { HttpModule } from '@nestjs/axios'; +import { GfhAdapter } from './gfh.adapter'; +import { + HealthMonitoringService, + SettingsModule, + SettingsService, +} from '@lib/core'; + +const adapters = [GfhAdapter]; + +@Module({}) +export class GfhModule { + static forRoot(): DynamicModule { + return { + global: true, + module: GfhModule, + imports: [HttpModule, SettingsModule], + providers: [...adapters, SettingsService, HealthMonitoringService], + exports: [...adapters], + }; + } + + static forFeature(): DynamicModule { + return { + module: GfhModule, + imports: [SettingsModule], + providers: [...adapters, HealthMonitoringService], + exports: [...adapters], + }; + } +} + diff --git a/packages/gfh-adapter/src/gfh.service.ts b/packages/gfh-adapter/src/gfh.service.ts new file mode 100644 index 0000000..566e28e --- /dev/null +++ b/packages/gfh-adapter/src/gfh.service.ts @@ -0,0 +1,82 @@ +import { DataSource, PrismaService, SourceType } from "@lib/database"; +import { Inject, Injectable, Logger } from "@nestjs/common"; + +@Injectable() +export class GfhService { + private readonly logger = new Logger(GfhService.name); + constructor(@Inject(PrismaService) private readonly prisma: PrismaService) {} + + async saveDataInGfh( + type: SourceType, + riverBasin: string, + payload: any + ): Promise { + try { + return await this.prisma.$transaction(async (tx) => { + // have to check if old formatted date exit if exit update that. + const existingRecord = await tx.sourcesData.findFirst({ + where: { + dataSource: DataSource.GFH, + source: { + riverBasin, + }, + AND: [ + { + info: { + path: ["info", "stationName"], + equals: payload.info.stationName, + }, + }, + { + info: { + path: ["info", "forecastDate"], + equals: payload.info.forecastDate, + }, + }, + ], + }, + }); + + if (existingRecord) { + this.logger.log( + `Updating existing record with new data for ${payload?.info?.stationName}` + ); + return await tx.sourcesData.update({ + where: { id: existingRecord.id }, + data: { + info: { + ...JSON.parse(JSON.stringify(payload)), + }, + updatedAt: new Date(), + }, + }); + } else { + this.logger.log( + `Creating new record for ${payload?.info?.stationName}` + ); + return await tx.sourcesData.create({ + data: { + type, + dataSource: DataSource.GFH, + info: JSON.parse(JSON.stringify(payload)), + source: { + connectOrCreate: { + where: { + riverBasin, + }, + create: { + source: [DataSource.GFH], + riverBasin, + }, + }, + }, + }, + }); + } + }); + } catch (err) { + this.logger.error(`Error saving data for ${riverBasin}:`, err); + throw err; + } + } +} diff --git a/packages/gfh-adapter/src/index.ts b/packages/gfh-adapter/src/index.ts index e69de29..f91a000 100644 --- a/packages/gfh-adapter/src/index.ts +++ b/packages/gfh-adapter/src/index.ts @@ -0,0 +1,4 @@ +export * from "./gfh.adapter"; +export * from "./gfh.module"; +export * from "./gfh.service"; +export * from "./types"; diff --git a/packages/gfh-adapter/src/package.json b/packages/gfh-adapter/src/package.json deleted file mode 100644 index e69de29..0000000 diff --git a/packages/gfh-adapter/src/types/gfh-observation.type.ts b/packages/gfh-adapter/src/types/gfh-observation.type.ts index e69de29..7d57e6f 100644 --- a/packages/gfh-adapter/src/types/gfh-observation.type.ts +++ b/packages/gfh-adapter/src/types/gfh-observation.type.ts @@ -0,0 +1,152 @@ +export type GfhStationDetails = { + RIVER_BASIN: string; + STATION_LOCATIONS_DETAILS: StationLoacationDetails[]; +}; + +export interface StationLoacationDetails { + LATITUDE: number; + POINT_ID: string; + RIVER_GAUGE_ID?: string; + LONGITUDE: number; + RIVER_NAME: string; + STATION_ID: string; + STATION_NAME: string; + "LISFLOOD_X_(DEG)": number; + "LISFLOOD_Y_[DEG]": number; + LISFLOOD_DRAINAGE_AREA: number; +} + +export interface SearchGaugesRequest { + regionCode: string; + pageSize: number; + includeNonQualityVerified: boolean; + pageToken?: string; +} + +export interface Gauge { + gaugeId: string; + location: Location; + source?: string; + qualityVerified?: boolean; + [key: string]: any; +} + +interface Location { + latitude: number; + longitude: number; +} + +export interface SearchGaugesResponse { + gauges: Gauge[]; + nextPageToken?: string; +} + +export interface IApiKeyData { + API_KEY: string; +} + +export interface GaugeInfo { + gaugeId: string; + distance: number; + source: string; + gaugeLocation: Location; + qualityVerified: boolean; +} + +export interface Point { + x: number; + y: number; +} +interface ForecastRange { + timeRange?: { + startTime?: string; + endTime?: string; + }; + trend?: string; + severity?: string; + [key: string]: any; +} + +export interface Forecast { + issuedTime: string; + forecastRanges: ForecastRange[]; + [key: string]: any; +} + +export interface ProcessedForecast { + issuedTime: string; + forecastTimeRange: any; + forecastTrend: string; + severity: string; + forecastRanges: ForecastRange[]; +} +export interface GaugeData { + model_metadata: any; + all_forecasts: Forecast[]; + latest_forecast: ProcessedForecast | null; +} + +export interface BatchGetResponse { + gaugeModels: any[]; +} + +export interface QueryForecastsResponse { + forecasts: Record; +} + +export interface ProcessedForecast { + issuedTime: string; + forecastTimeRange: any; + forecastTrend: string; + severity: string; + forecastRanges: ForecastRange[]; +} + +export interface StationResult { + gaugeId: string; + distance_km: number; + source: string; + gaugeLocation: Location; + qualityVerified: boolean; + model_metadata: any; + issuedTime: string | null; + forecastTimeRange: any; + forecastTrend: string; + severity: string; + forecasts: ForecastRange[]; + total_forecasts_available: number; + message?: string; +} + +export interface GfhFetchResponse { + data: Record; + location: string; + stationId: string; + stationGaugeMapping: Record; +} + +export interface GfhObservation { + stationData: StationResult; + stationName: string; + riverBasin: string; +} + +export interface GfhHistoryItem { + value: string | number; + datetime: string; +} + +export interface GfhTransformedResult { + riverBasin: string; + source: string; + latitude: string; + longitude: string; + riverGaugeId: string; + stationName: string; + warningLevel: string; + dangerLevel: string; + extremeDangerLevel: string; + basinSize: number; + forecastDate: string; + history: GfhHistoryItem[]; +} diff --git a/packages/gfh-adapter/src/types/index.ts b/packages/gfh-adapter/src/types/index.ts new file mode 100644 index 0000000..060f24e --- /dev/null +++ b/packages/gfh-adapter/src/types/index.ts @@ -0,0 +1 @@ +export * from "./gfh-observation.type"; diff --git a/packages/gfh-adapter/src/utils/index.ts b/packages/gfh-adapter/src/utils/index.ts new file mode 100644 index 0000000..ff53349 --- /dev/null +++ b/packages/gfh-adapter/src/utils/index.ts @@ -0,0 +1,10 @@ +export const getFormattedDate = (date: Date = new Date()) => { + const year = date.getFullYear(); + const month = String(date.getMonth() + 1).padStart(2, "0"); // getMonth() returns 0-based month, hence add 1 + const day = String(date.getDate()).padStart(2, "0"); + + const dateTimeString = `${year}-${month}-${day}T00:00:00`; + const dateString = `${year}-${month}-${day}`; + + return { dateString, dateTimeString }; +}; diff --git a/packages/gfh-adapter/tsconfig.json b/packages/gfh-adapter/tsconfig.json new file mode 100644 index 0000000..f2a296b --- /dev/null +++ b/packages/gfh-adapter/tsconfig.json @@ -0,0 +1,12 @@ +{ + "extends": "@workspace/typescript-config/base.json", + "compilerOptions": { + "outDir": "./dist", + "baseUrl": "./src", + "module": "NodeNext", + "moduleResolution": "nodenext", + "experimentalDecorators": true, + }, + "include": ["src", "index.ts"], + "exclude": ["node_modules", "test", "dist", "**/*spec.ts"] +} \ No newline at end of file diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 85dd234..263f321 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -190,6 +190,9 @@ importers: '@lib/dhm-adapter': specifier: workspace:* version: link:../../packages/dhm-adapter + '@lib/gfh-adapter': + specifier: workspace:* + version: link:../../packages/gfh-adapter '@lib/glofas-adapter': specifier: workspace:* version: link:../../packages/glofas-adapter @@ -438,6 +441,40 @@ importers: specifier: ^8.15.0 version: 8.46.4(eslint@9.39.1)(typescript@5.9.2) + packages/gfh-adapter: + dependencies: + '@lib/core': + specifier: workspace:* + version: link:../core + '@lib/database': + specifier: workspace:* + version: link:../database + '@nestjs/axios': + specifier: ^4.0.1 + version: 4.0.1(@nestjs/common@10.4.20)(axios@1.13.2)(rxjs@7.8.2) + axios: + specifier: ^1.12.2 + version: 1.13.2 + cheerio: + specifier: ^1.1.2 + version: 1.1.2 + devDependencies: + '@types/jest': + specifier: ^27.0.2 + version: 27.5.2 + '@workspace/eslint-config': + specifier: workspace:* + version: link:../eslint-config + '@workspace/typescript-config': + specifier: workspace:* + version: link:../typescript-config + jest: + specifier: ^27.0.6 + version: 27.5.1(ts-node@10.9.2) + typescript: + specifier: 5.9.2 + version: 5.9.2 + packages/glofas-adapter: dependencies: '@lib/core':