Skip to content

Commit 0c9cfd8

Browse files
committed
add tasks and jobs
1 parent 33a4367 commit 0c9cfd8

File tree

7 files changed

+65
-16
lines changed

7 files changed

+65
-16
lines changed

src/_common/abstracts/schemas/abstract.schema.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ import { Types, Document } from 'mongoose';
33
import { MetadataPart, MetadataPartSchema } from '~/_common/abstracts/schemas/parts/metadata.part.schema';
44

55
export abstract class AbstractSchema extends Document {
6-
public readonly _id: Types.ObjectId | any // eslint-disable-line
6+
public readonly _id?: Types.ObjectId | any // eslint-disable-line
77

88
@Prop({ type: MetadataPartSchema })
9-
public metadata: MetadataPart;
9+
public metadata?: MetadataPart;
1010
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import { JobsOptions } from 'bullmq';
2+
import { Types } from 'mongoose';
23

34
export interface ExecuteJobOptions {
45
job?: JobsOptions;
56
async?: boolean;
67
syncTimeout?: number;
78
timeoutDiscard?: boolean;
89
comment?: string;
10+
task?: Types.ObjectId;
911
}

src/core/backends/backends.controller.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import { Public } from '~/_common/decorators/public.decorator';
1919
import { ExecuteJobDto } from './_dto/execute-job.dto';
2020
import { BackendsService } from './backends.service';
2121
import { SyncIdentitiesDto } from './_dto/sync-identities.dto';
22+
import { Types } from 'mongoose';
2223

2324
function fireMessage(observer: Subscriber<MessageEvent>, channel: string, message: any, loggername: string) {
2425
try {
@@ -72,11 +73,16 @@ export class BackendsController {
7273
): Promise<Response> {
7374
const async = /true|on|yes|1/i.test(asyncQuery);
7475
const timeoutDiscard = /true|on|yes|1/i.test(timeoutDiscardQuery);
75-
const [job, response] = await this.backendsService.executeJob(body.action, body.id, body.payload, {
76-
async,
77-
syncTimeout,
78-
timeoutDiscard,
79-
});
76+
const [job, response] = await this.backendsService.executeJob(
77+
body.action,
78+
new Types.ObjectId(body.id),
79+
body.payload,
80+
{
81+
async,
82+
syncTimeout,
83+
timeoutDiscard,
84+
},
85+
);
8086
return res.status(HttpStatus.ACCEPTED).json({ async, job, response });
8187
}
8288

src/core/backends/backends.service.ts

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { JobsService } from '../jobs/jobs.service';
1111
import { TasksService } from '../tasks/tasks.service';
1212
import { ActionType } from './_enum/action-type.enum';
1313
import { ExecuteJobOptions } from './_interfaces/execute-job-options.interface';
14+
import { Tasks } from '../tasks/_schemas/tasks.schema';
1415

1516
const DEFAULT_SYNC_TIMEOUT = 30_000;
1617

@@ -28,7 +29,6 @@ export class BackendsService extends AbstractQueueProcessor {
2829
public async onModuleInit() {
2930
await super.onModuleInit();
3031

31-
//TODO: resync failed + completed
3232
const jobsCompleted = await this._queue.getCompleted();
3333
for (const job of jobsCompleted) {
3434
const isSyncedJob = await this.jobsService.model.findOneAndUpdate<Jobs>(
@@ -43,6 +43,11 @@ export class BackendsService extends AbstractQueueProcessor {
4343
{ new: true },
4444
);
4545
if (isSyncedJob) {
46+
await this.identitiesService.model.findByIdAndUpdate(isSyncedJob.concernedTo, {
47+
$set: {
48+
state: IdentityState.SYNCED,
49+
},
50+
});
4651
this.logger.warn(`Job already completed, syncing... [${job.id}::COMPLETED]`);
4752
}
4853
}
@@ -64,7 +69,7 @@ export class BackendsService extends AbstractQueueProcessor {
6469

6570
this.queueEvents.on('failed', async (payload) => {
6671
this.logger.debug(`Job failed ! [${payload.jobId}]`);
67-
await this.jobsService.model.findOneAndUpdate<Jobs>(
72+
const failedJob = await this.jobsService.model.findOneAndUpdate<Jobs>(
6873
{ jobId: payload.jobId, state: { $ne: JobState.COMPLETED } },
6974
{
7075
$set: {
@@ -75,10 +80,15 @@ export class BackendsService extends AbstractQueueProcessor {
7580
},
7681
{ new: true },
7782
);
83+
await this.identitiesService.model.findByIdAndUpdate(failedJob.concernedTo, {
84+
$set: {
85+
state: IdentityState.ON_ERROR,
86+
},
87+
});
7888
});
7989

8090
this.queueEvents.on('completed', async (payload) => {
81-
await this.jobsService.model.findOneAndUpdate<Jobs>(
91+
const completedJob = await this.jobsService.model.findOneAndUpdate<Jobs>(
8292
{ jobId: payload.jobId, state: { $ne: JobState.COMPLETED } },
8393
{
8494
$set: {
@@ -87,8 +97,13 @@ export class BackendsService extends AbstractQueueProcessor {
8797
result: payload.returnvalue,
8898
},
8999
},
90-
{ upsert: true },
100+
{ upsert: true, new: true },
91101
);
102+
await this.identitiesService.model.findByIdAndUpdate(completedJob.concernedTo, {
103+
$set: {
104+
state: IdentityState.SYNCED,
105+
},
106+
});
92107
this.logger.log(`Job completed... Syncing [${payload.jobId}]`);
93108
});
94109
}
@@ -114,9 +129,21 @@ export class BackendsService extends AbstractQueueProcessor {
114129
});
115130
}
116131

132+
const task: Tasks = await this.tasksService.create<Tasks>({
133+
jobs: identities.map((identity) => identity.identity._id),
134+
});
135+
117136
const result = {};
118137
for (const identity of identities) {
119-
const [executedJob] = await this.executeJob(identity.action, identity.identity._id, { identity }, options);
138+
const [executedJob] = await this.executeJob(
139+
identity.action,
140+
identity.identity._id,
141+
{ identity },
142+
{
143+
...options,
144+
task: task._id,
145+
},
146+
);
120147
result[identity.identity._id] = executedJob;
121148
}
122149
return result;
@@ -148,6 +175,7 @@ export class BackendsService extends AbstractQueueProcessor {
148175
params: payload,
149176
concernedTo: concernedTo,
150177
comment: options?.comment,
178+
task: options?.task,
151179
state: JobState.CREATED,
152180
...optionals,
153181
});
@@ -163,6 +191,11 @@ export class BackendsService extends AbstractQueueProcessor {
163191
result: response,
164192
},
165193
});
194+
await this.identitiesService.model.findByIdAndUpdate(concernedTo, {
195+
$set: {
196+
state: IdentityState.SYNCED,
197+
},
198+
});
166199
return [jobStoreUpdated as unknown as Jobs, response];
167200
} catch (err) {
168201
error = err;
@@ -174,6 +207,11 @@ export class BackendsService extends AbstractQueueProcessor {
174207
result: error,
175208
},
176209
});
210+
await this.identitiesService.model.findByIdAndUpdate(concernedTo, {
211+
$set: {
212+
state: IdentityState.ON_ERROR,
213+
},
214+
});
177215
if (options?.timeoutDiscard !== false) {
178216
job.discard();
179217
throw new BadRequestException({

src/core/jobs/_schemas/jobs.schema.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ export class Jobs extends AbstractSchema {
2525
})
2626
public concernedTo?: Types.ObjectId;
2727

28+
@Prop({ type: Types.ObjectId })
29+
public task?: Types.ObjectId;
30+
2831
@Prop({ type: String })
2932
public comment?: string;
3033

src/core/tasks/tasks.service.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import { Injectable } from '@nestjs/common';
22
import { InjectModel } from '@nestjs/mongoose';
33
import { Model } from 'mongoose';
4-
import { AbstractService } from '~/_common/abstracts/abstract.service';
4+
import { AbstractServiceSchema } from '~/_common/abstracts/abstract.service.schema';
55
import { Tasks } from './_schemas/tasks.schema';
66

77
@Injectable()
8-
export class TasksService extends AbstractService {
9-
constructor(@InjectModel(Tasks.name) protected _model: Model<Tasks>) {
8+
export class TasksService extends AbstractServiceSchema {
9+
public constructor(@InjectModel(Tasks.name) protected _model: Model<Tasks>) {
1010
super();
1111
}
1212
}

src/management/passwd/passwd.service.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ export class PasswdService extends AbstractService {
3030
const iv = crypto.randomBytes(12).toString('base64');
3131
const key = crypto.randomBytes(16).toString('hex');
3232
const cipher = crypto.createCipheriv('aes-256-gcm', key, iv);
33-
const dataStruct = { uid: askToken.uid, mail: askToken.mail };
33+
const dataStruct = { uid: askToken.id, mail: askToken.mail };
3434
let ciphertext = cipher.update(JSON.stringify(dataStruct), 'utf8', 'base64');
3535
ciphertext += cipher.final('base64');
3636
const tag = cipher.getAuthTag();

0 commit comments

Comments
 (0)