|
1 | 1 | import { Injectable, OnApplicationBootstrap, OnModuleInit } from '@nestjs/common'; |
2 | 2 | import { OnEvent } from '@nestjs/event-emitter'; |
3 | 3 | import { InjectModel } from '@nestjs/mongoose'; |
4 | | -import { FilterOptions } from '~/_common/restools'; |
| 4 | +import { SchedulerRegistry } from '@nestjs/schedule'; |
| 5 | +import { plainToInstance } from 'class-transformer'; |
| 6 | +import { validateOrReject, ValidationError } from 'class-validator'; |
| 7 | +import { CronJob } from 'cron'; |
5 | 8 | import { Model, Query, Types } from 'mongoose'; |
6 | | -import { AbstractServiceSchema } from '~/_common/abstracts/abstract.service.schema'; |
7 | | -import { Identities } from '../identities/_schemas/identities.schema'; |
8 | | -import { Lifecycle, LifecycleRefId } from './_schemas/lifecycle.schema'; |
9 | 9 | import { readdirSync, readFileSync, writeFileSync } from 'node:fs'; |
10 | 10 | import { parse } from 'yaml'; |
11 | | -import { plainToInstance } from 'class-transformer'; |
12 | | -import { ConfigObjectIdentitiesDTO, ConfigObjectSchemaDTO } from './_dto/config.dto'; |
13 | | -import { validateOrReject, ValidationError } from 'class-validator'; |
14 | | -import { omit } from 'radash'; |
| 11 | +import { AbstractServiceSchema } from '~/_common/abstracts/abstract.service.schema'; |
| 12 | +import { FilterOptions } from '~/_common/restools'; |
| 13 | +import { Identities } from '../identities/_schemas/identities.schema'; |
15 | 14 | import { IdentitiesCrudService } from '../identities/identities-crud.service'; |
16 | | -import { SchedulerRegistry } from '@nestjs/schedule'; |
17 | | -import { CronJob } from 'cron'; |
| 15 | +import { ConfigObjectIdentitiesDTO, ConfigObjectSchemaDTO } from './_dto/config.dto'; |
| 16 | +import { Lifecycle, LifecycleRefId } from './_schemas/lifecycle.schema'; |
18 | 17 |
|
19 | 18 | interface LifecycleSource { |
20 | 19 | [source: string]: Partial<ConfigObjectIdentitiesDTO>[]; |
21 | 20 | } |
22 | 21 |
|
23 | | -type ConfigObjectIdentitiesDTOWithSource = Omit<ConfigObjectIdentitiesDTO, 'sources'> & { |
24 | | - source: string; |
25 | | -}; |
26 | | - |
27 | 22 | @Injectable() |
28 | 23 | export class LifecycleService extends AbstractServiceSchema implements OnApplicationBootstrap, OnModuleInit { |
29 | 24 | protected lifecycleSources: LifecycleSource = {}; |
@@ -84,136 +79,65 @@ export class LifecycleService extends AbstractServiceSchema implements OnApplica |
84 | 79 |
|
85 | 80 | const lifecycleRules = await this.loadLifecycleRules(); |
86 | 81 |
|
87 | | - /** |
88 | | - * Create a map of lifecycle sources |
89 | | - * This map will help to quickly find which identities are associated with each lifecycle source. |
90 | | - * It will be used to optimize the lifecycle processing logic. |
91 | | - * The structure will be: |
92 | | - * { |
93 | | - * 'source1': [identityRule1, identityRule2, ...], |
94 | | - * 'source2': [identityRule3, identityRule4, ...], |
95 | | - * ... |
96 | | - * } |
97 | | - * This will allow us to quickly access all identity rules associated with a specific lifecycle source. |
98 | | - */ |
99 | | - for (const lfr of lifecycleRules) { |
100 | | - for (const idRule of lfr.identities) { |
101 | | - for (const source of idRule.sources) { |
102 | | - if (!this.lifecycleSources[source]) { |
103 | | - this.lifecycleSources[source] = []; |
104 | | - } |
105 | | - |
106 | | - const rule = omit(idRule, ['sources']); |
107 | | - if (rule.trigger) { |
108 | | - this.logger.log(`Trigger found for source <${source}>: ${-rule.trigger}, installing cron job !`); |
109 | | - |
110 | | - if (this.schedulerRegistry.doesExist('cron', `lifecycle-trigger-${source}`)) { |
111 | | - this.logger.warn(`Cron job for source <${source}> already exists, skipping creation.`); |
112 | | - continue; |
113 | | - } |
114 | | - |
115 | | - const cronExpression = this.convertSecondsToCron(-rule.trigger); |
116 | | - this.logger.debug(`Creating cron job with pattern: ${cronExpression}`); |
117 | | - |
118 | | - const job = new CronJob(cronExpression, this.runJob.bind(this, { |
119 | | - source, // Pass the source to the job for context |
120 | | - ...rule, |
121 | | - })); |
122 | | - |
123 | | - this.schedulerRegistry.addCronJob(`lifecycle-trigger-${source}`, job); |
124 | | - job.start(); |
125 | | - } |
126 | | - |
127 | | - this.lifecycleSources[source].push(rule); |
128 | | - } |
129 | | - } |
130 | | - } |
| 82 | + const job = new CronJob('*/5 * * * * *', this.handleCron.bind(this, { lifecycleRules })); |
| 83 | + this.schedulerRegistry.addCronJob(`lifecycle-trigger`, job); |
| 84 | + job.start(); |
131 | 85 |
|
132 | 86 | this.logger.log('LifecycleService bootstraped'); |
133 | 87 | } |
134 | 88 |
|
135 | | - protected async runJob(rule: ConfigObjectIdentitiesDTOWithSource): Promise<void> { |
136 | | - this.logger.debug(`Running LifecycleService job: <${JSON.stringify(rule)}>`); |
137 | | - |
138 | | - try { |
139 | | - const identities = await this.identitiesService.model.find({ |
140 | | - ...rule.rules, |
141 | | - lifecycle: rule.source, |
142 | | - ignoreLifecycle: { $ne: true }, |
143 | | - }); |
144 | | - |
145 | | - this.logger.log(`Found ${identities.length} identities to process for trigger in source <${rule.source}>`); |
| 89 | + private async handleCron({ lifecycleRules }: { lifecycleRules: ConfigObjectSchemaDTO[] }): Promise<void> { |
| 90 | + this.logger.debug(`Running lifecycle trigger cron job...`); |
146 | 91 |
|
147 | | - for (const identity of identities) { |
148 | | - const updated = await this.identitiesService.model.findOneAndUpdate( |
149 | | - { _id: identity._id }, |
150 | | - { $set: { lifecycle: rule.target } }, |
151 | | - { new: true } |
152 | | - ); |
153 | | - |
154 | | - if (updated) { |
155 | | - await this.create({ |
156 | | - refId: identity._id, |
157 | | - lifecycle: rule.target, |
158 | | - date: new Date(), |
159 | | - }); |
| 92 | + for (const lfr of lifecycleRules) { |
| 93 | + for (const idRule of lfr.identities) { |
| 94 | + if (idRule.trigger) { |
| 95 | + const dateKey = idRule.dateKey || 'lastSync'; |
| 96 | + |
| 97 | + try { |
| 98 | + const identities = await this.identitiesService.model.find({ |
| 99 | + ...idRule.rules, |
| 100 | + lifecycle: { |
| 101 | + $in: idRule.sources, |
| 102 | + }, |
| 103 | + ignoreLifecycle: { $ne: true }, |
| 104 | + [dateKey]: { |
| 105 | + $lte: new Date(Date.now() - (idRule.trigger * 1000)), |
| 106 | + }, |
| 107 | + }); |
| 108 | + this.logger.log(`Found ${identities.length} identities to process for trigger in source <${idRule.sources}>`); |
| 109 | + |
| 110 | + for (const identity of identities) { |
| 111 | + const updated = await this.identitiesService.model.findOneAndUpdate( |
| 112 | + { _id: identity._id }, |
| 113 | + { |
| 114 | + $set: { |
| 115 | + ...idRule.mutation, |
| 116 | + lifecycle: idRule.target, |
| 117 | + }, |
| 118 | + }, |
| 119 | + { new: true }, |
| 120 | + ); |
| 121 | + |
| 122 | + if (updated) { |
| 123 | + await this.create({ |
| 124 | + refId: identity._id, |
| 125 | + lifecycle: idRule.target, |
| 126 | + date: new Date(), |
| 127 | + }); |
| 128 | + |
| 129 | + this.logger.log(`Identity <${identity._id}> updated to lifecycle <${idRule.target}> by trigger from source <${idRule.sources}>`); |
| 130 | + } |
| 131 | + } |
160 | 132 |
|
161 | | - this.logger.log(`Identity <${identity._id}> updated to lifecycle <${rule.target}> by trigger from source <${rule.source}>`); |
| 133 | + } catch (error) { |
| 134 | + this.logger.error(`Error in lifecycle trigger job for source <${idRule.sources}>:`, error.message, error.stack); |
| 135 | + } |
162 | 136 | } |
163 | 137 | } |
164 | | - } catch (error) { |
165 | | - this.logger.error(`Error in lifecycle trigger job for source <${rule.source}>:`, error.message, error.stack); |
166 | | - } |
167 | | - } |
168 | | - |
169 | | - /** |
170 | | - * Convert seconds to a proper cron expression |
171 | | - * This method converts a duration in seconds to the most appropriate cron expression. |
172 | | - * It optimizes for readability and performance by using the largest possible time unit. |
173 | | - * |
174 | | - * @param seconds - The number of seconds for the interval |
175 | | - * @returns A cron expression string in the format "second minute hour day month dayOfWeek" |
176 | | - */ |
177 | | - private convertSecondsToCron(seconds: number): string { |
178 | | - // Ensure we have a valid positive number |
179 | | - const intervalSeconds = Math.max(1, Math.floor(seconds)); |
180 | | - |
181 | | - // If it's less than 60 seconds, use seconds |
182 | | - if (intervalSeconds < 60) { |
183 | | - return `*/${intervalSeconds} * * * * *`; |
184 | 138 | } |
185 | 139 |
|
186 | | - // If it's exactly divisible by 60 and less than 3600, use minutes |
187 | | - const minutes = intervalSeconds / 60; |
188 | | - if (intervalSeconds % 60 === 0 && minutes < 60) { |
189 | | - return `0 */${Math.floor(minutes)} * * * *`; |
190 | | - } |
191 | | - |
192 | | - // If it's exactly divisible by 3600 and less than 86400, use hours |
193 | | - const hours = intervalSeconds / 3600; |
194 | | - if (intervalSeconds % 3600 === 0 && hours < 24) { |
195 | | - return `0 0 */${Math.floor(hours)} * * *`; |
196 | | - } |
197 | | - |
198 | | - // If it's exactly divisible by 86400, use days |
199 | | - const days = intervalSeconds / 86400; |
200 | | - if (intervalSeconds % 86400 === 0 && days <= 30) { |
201 | | - return `0 0 0 */${Math.floor(days)} * *`; |
202 | | - } |
203 | | - |
204 | | - // For very large intervals or non-standard intervals, fall back to the most appropriate unit |
205 | | - if (intervalSeconds >= 3600) { |
206 | | - // Use hours for intervals >= 1 hour |
207 | | - const hourInterval = Math.max(1, Math.floor(intervalSeconds / 3600)); |
208 | | - return `0 0 */${hourInterval} * * *`; |
209 | | - } else if (intervalSeconds >= 60) { |
210 | | - // Use minutes for intervals >= 1 minute |
211 | | - const minuteInterval = Math.max(1, Math.floor(intervalSeconds / 60)); |
212 | | - return `0 */${minuteInterval} * * * *`; |
213 | | - } else { |
214 | | - // Fall back to seconds |
215 | | - return `*/${intervalSeconds} * * * * *`; |
216 | | - } |
| 140 | + this.logger.log(`Lifecycle trigger cron job completed.`); |
217 | 141 | } |
218 | 142 |
|
219 | 143 | /** |
|
0 commit comments