Skip to content

Commit 33a4367

Browse files
committed
add jobs and tasks
1 parent 8a94957 commit 33a4367

24 files changed

+428
-83
lines changed

src/_common/abstracts/abstract.queue.processor.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,5 @@ export abstract class AbstractQueueProcessor extends AbstractService implements
3131
this.queueEvents = new QueueEvents(this.config.get<string>('application.nameQueue'), {
3232
connection: this.redis,
3333
});
34-
35-
this.queueEvents.on('failed', (errors) => {
36-
this.logger.warn('Queue failed', errors);
37-
});
3834
}
3935
}

src/core/backends/_dto/execute-job.dto.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { IsEnum, IsNotEmpty, IsObject, IsOptional } from 'class-validator';
1+
import { IsEnum, IsMongoId, IsNotEmpty, IsObject, IsOptional } from 'class-validator';
22
import { ActionType } from '../_enum/action-type.enum';
33
import { ApiProperty } from '@nestjs/swagger';
44

@@ -8,6 +8,10 @@ export class ExecuteJobDto {
88
@ApiProperty({ type: String })
99
public action: ActionType;
1010

11+
@IsMongoId()
12+
@ApiProperty({ example: 'paul.bismuth', description: 'User object id', type: String })
13+
public id: string;
14+
1115
@IsOptional()
1216
@IsObject()
1317
@ApiProperty({ type: Object })
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import { ApiProperty } from '@nestjs/swagger';
2+
import { IsArray } from 'class-validator';
3+
4+
export class SyncIdentitiesDto {
5+
@IsArray()
6+
@ApiProperty({ type: Array })
7+
public payload: string[];
8+
}

src/core/backends/_interfaces/execute-job-options.interface.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ export interface ExecuteJobOptions {
55
async?: boolean;
66
syncTimeout?: number;
77
timeoutDiscard?: boolean;
8+
comment?: string;
89
}

src/core/backends/backends.controller.ts

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import { Observable, Subscriber } from 'rxjs';
1818
import { Public } from '~/_common/decorators/public.decorator';
1919
import { ExecuteJobDto } from './_dto/execute-job.dto';
2020
import { BackendsService } from './backends.service';
21+
import { SyncIdentitiesDto } from './_dto/sync-identities.dto';
2122

2223
function fireMessage(observer: Subscriber<MessageEvent>, channel: string, message: any, loggername: string) {
2324
try {
@@ -40,17 +41,38 @@ export class BackendsController {
4041
@InjectRedis() protected readonly redis: Redis,
4142
) {}
4243

44+
@Post('sync')
45+
public async syncIdentities(
46+
@Res() res: Response,
47+
@Body() body: SyncIdentitiesDto,
48+
@Query('async') asyncQuery: string,
49+
) {
50+
const async = /true|on|yes|1/i.test(asyncQuery);
51+
const data = await this.backendsService.syncIdentities(body.payload, {
52+
async,
53+
});
54+
55+
return res.status(HttpStatus.ACCEPTED).json({ async, data });
56+
}
57+
4358
@Post('execute')
4459
public async executeJob(
4560
@Res() res: Response,
4661
@Body() body: ExecuteJobDto,
4762
@Query('async') asyncQuery: string,
4863
@Query('timeoutDiscard') timeoutDiscardQuery: string,
49-
@Query('syncTimeout', new ParseIntPipe({ optional: true, errorHttpStatusCode: 406 })) syncTimeout?: number,
64+
@Query(
65+
'syncTimeout',
66+
new ParseIntPipe({
67+
optional: true,
68+
errorHttpStatusCode: HttpStatus.NOT_ACCEPTABLE,
69+
}),
70+
)
71+
syncTimeout?: number,
5072
): Promise<Response> {
5173
const async = /true|on|yes|1/i.test(asyncQuery);
5274
const timeoutDiscard = /true|on|yes|1/i.test(timeoutDiscardQuery);
53-
const [job, response] = await this.backendsService.executeJob(body.action, body.payload, {
75+
const [job, response] = await this.backendsService.executeJob(body.action, body.id, body.payload, {
5476
async,
5577
syncTimeout,
5678
timeoutDiscard,

src/core/backends/backends.module.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@ import { Module } from '@nestjs/common';
22
import { BackendsService } from './backends.service';
33
import { BackendsController } from './backends.controller';
44
import { ConfigModule } from '@nestjs/config';
5+
import { IdentitiesModule } from '~/management/identities/identities.module';
6+
import { JobsModule } from '../jobs/jobs.module';
7+
import { TasksModule } from '../tasks/tasks.module';
58
@Module({
6-
imports: [ConfigModule],
9+
imports: [ConfigModule, IdentitiesModule, JobsModule, TasksModule],
710
controllers: [BackendsController],
811
providers: [BackendsService],
912
})
Lines changed: 154 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,195 @@
11
import { BadRequestException, HttpStatus, Injectable, RequestTimeoutException } from '@nestjs/common';
22
import { ModuleRef } from '@nestjs/core';
3-
import { Job } from 'bullmq';
3+
import { Types } from 'mongoose';
44
import { AbstractQueueProcessor } from '~/_common/abstracts/abstract.queue.processor';
5+
import { IdentityState } from '~/management/identities/_enums/states.enum';
6+
import { Identities } from '~/management/identities/_schemas/identities.schema';
7+
import { IdentitiesService } from '~/management/identities/identities.service';
8+
import { JobState } from '../jobs/_enums/state.enum';
9+
import { Jobs } from '../jobs/_schemas/jobs.schema';
10+
import { JobsService } from '../jobs/jobs.service';
11+
import { TasksService } from '../tasks/tasks.service';
512
import { ActionType } from './_enum/action-type.enum';
613
import { ExecuteJobOptions } from './_interfaces/execute-job-options.interface';
714

815
const DEFAULT_SYNC_TIMEOUT = 30_000;
916

1017
@Injectable()
1118
export class BackendsService extends AbstractQueueProcessor {
12-
public constructor(protected moduleRef: ModuleRef) {
19+
public constructor(
20+
protected moduleRef: ModuleRef,
21+
protected identitiesService: IdentitiesService,
22+
protected jobsService: JobsService,
23+
protected tasksService: TasksService,
24+
) {
1325
super({ moduleRef });
1426
}
1527

28+
public async onModuleInit() {
29+
await super.onModuleInit();
30+
31+
//TODO: resync failed + completed
32+
const jobsCompleted = await this._queue.getCompleted();
33+
for (const job of jobsCompleted) {
34+
const isSyncedJob = await this.jobsService.model.findOneAndUpdate<Jobs>(
35+
{ jobId: job.id, state: { $ne: JobState.COMPLETED } },
36+
{
37+
$set: {
38+
state: JobState.COMPLETED,
39+
finishedAt: new Date(),
40+
result: job.returnvalue,
41+
},
42+
},
43+
{ new: true },
44+
);
45+
if (isSyncedJob) {
46+
this.logger.warn(`Job already completed, syncing... [${job.id}::COMPLETED]`);
47+
}
48+
}
49+
50+
this.queueEvents.on('waiting', (payload) => this.logger.debug(`Job is now waiting... [${payload.jobId}]`));
51+
this.queueEvents.on('active', async (payload) => {
52+
this.logger.debug(`Job is now active... [${payload.jobId}]`);
53+
await this.jobsService.model.findOneAndUpdate<Jobs>(
54+
{ jobId: payload.jobId, state: { $ne: JobState.COMPLETED } },
55+
{
56+
$set: {
57+
state: JobState.IN_PROGRESS,
58+
processedAt: new Date(),
59+
},
60+
},
61+
{ new: true },
62+
);
63+
});
64+
65+
this.queueEvents.on('failed', async (payload) => {
66+
this.logger.debug(`Job failed ! [${payload.jobId}]`);
67+
await this.jobsService.model.findOneAndUpdate<Jobs>(
68+
{ jobId: payload.jobId, state: { $ne: JobState.COMPLETED } },
69+
{
70+
$set: {
71+
state: JobState.FAILED,
72+
finishedAt: new Date(),
73+
result: payload.failedReason,
74+
},
75+
},
76+
{ new: true },
77+
);
78+
});
79+
80+
this.queueEvents.on('completed', async (payload) => {
81+
await this.jobsService.model.findOneAndUpdate<Jobs>(
82+
{ jobId: payload.jobId, state: { $ne: JobState.COMPLETED } },
83+
{
84+
$set: {
85+
state: JobState.COMPLETED,
86+
finishedAt: new Date(),
87+
result: payload.returnvalue,
88+
},
89+
},
90+
{ upsert: true },
91+
);
92+
this.logger.log(`Job completed... Syncing [${payload.jobId}]`);
93+
});
94+
}
95+
96+
public async syncIdentities(payload: string[], options?: ExecuteJobOptions): Promise<any> {
97+
const identities: {
98+
action: ActionType;
99+
identity: Identities;
100+
}[] = [];
101+
102+
for (const key of payload) {
103+
const identity = await this.identitiesService.findById<Identities>(key);
104+
if (identity.state !== IdentityState.TO_SYNC) {
105+
throw new BadRequestException({
106+
status: HttpStatus.BAD_REQUEST,
107+
message: `Identity ${key} is not in state TO_SYNC`,
108+
identity,
109+
});
110+
}
111+
identities.push({
112+
action: ActionType.IDENTITY_UPDATE,
113+
identity,
114+
});
115+
}
116+
117+
const result = {};
118+
for (const identity of identities) {
119+
const [executedJob] = await this.executeJob(identity.action, identity.identity._id, { identity }, options);
120+
result[identity.identity._id] = executedJob;
121+
}
122+
return result;
123+
}
124+
16125
public async executeJob(
17126
actionType: ActionType,
127+
concernedTo: Types.ObjectId,
18128
// eslint-disable-next-line @typescript-eslint/no-unused-vars
19129
payload?: Record<string | number, any>,
20130
options?: ExecuteJobOptions,
21-
): Promise<[Job, any]> {
22-
const job = await this.queue.add(actionType, payload, options?.job);
131+
): Promise<[Jobs, any]> {
132+
const job = await this.queue.add(
133+
actionType,
134+
{
135+
concernedTo,
136+
payload,
137+
},
138+
options?.job,
139+
);
140+
const optionals = {};
141+
if (!options?.async) {
142+
optionals['processedAt'] = new Date();
143+
optionals['state'] = JobState.IN_PROGRESS;
144+
}
145+
const jobStore = await this.jobsService.create<Jobs>({
146+
jobId: job.id,
147+
action: actionType,
148+
params: payload,
149+
concernedTo: concernedTo,
150+
comment: options?.comment,
151+
state: JobState.CREATED,
152+
...optionals,
153+
});
23154
if (!options?.async) {
24155
let error: Error;
25156
try {
26157
const response = await job.waitUntilFinished(this.queueEvents, options.syncTimeout || DEFAULT_SYNC_TIMEOUT);
27-
return [job, response];
158+
const jobStoreUpdated = await this.jobsService.update<Jobs>(jobStore._id, {
159+
$set: {
160+
state: JobState.COMPLETED,
161+
processedAt: new Date(),
162+
finishedAt: new Date(),
163+
result: response,
164+
},
165+
});
166+
return [jobStoreUpdated as unknown as Jobs, response];
28167
} catch (err) {
29168
error = err;
30169
}
170+
const jobFailed = await this.jobsService.update<Jobs>(jobStore._id, {
171+
$set: {
172+
state: JobState.FAILED,
173+
finishedAt: new Date(),
174+
result: error,
175+
},
176+
});
31177
if (options?.timeoutDiscard !== false) {
32178
job.discard();
33179
throw new BadRequestException({
34180
status: HttpStatus.BAD_REQUEST,
35181
message: `Sync job ${job.id} failed to finish in time`,
36182
error,
37-
job,
183+
job: jobFailed as unknown as Jobs,
38184
});
39185
}
40186
throw new RequestTimeoutException({
41187
status: HttpStatus.REQUEST_TIMEOUT,
42188
message: `Job now continue to run in background ${job.id}, timeout wait until finish reached`,
43189
error,
44-
job,
190+
job: jobFailed as unknown as Jobs,
45191
});
46192
}
47-
return [job, null];
193+
return [jobStore.toObject(), null];
48194
}
49195
}

src/core/core.module.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
import { DynamicModule, Module } from '@nestjs/common';
22
import { RouterModule } from '@nestjs/core';
3+
import { AgentsModule } from './agents/agents.module';
34
import { AuthModule } from './auth/auth.module';
4-
import { CoreService } from './core.service';
5-
import { CoreController } from './core.controller';
65
import { BackendsModule } from './backends/backends.module';
7-
import { LoggerModule } from './logger/logger.module';
6+
import { CoreController } from './core.controller';
7+
import { CoreService } from './core.service';
8+
import { JobsModule } from './jobs/jobs.module';
89
import { KeyringsModule } from './keyrings/keyrings.module';
9-
import { AgentsModule } from './agents/agents.module';
10+
import { LoggerModule } from './logger/logger.module';
11+
import { TasksModule } from './tasks/tasks.module';
1012

1113
@Module({
12-
imports: [AuthModule, BackendsModule, LoggerModule, KeyringsModule, AgentsModule],
14+
imports: [AuthModule, BackendsModule, LoggerModule, KeyringsModule, AgentsModule, JobsModule, TasksModule],
1315
providers: [CoreService],
1416
controllers: [CoreController],
1517
})

src/core/jobs/_enums/state.enum.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
export enum JobState {
2+
COMPLETED = 9,
3+
IN_PROGRESS = 1,
4+
CREATED = 0,
5+
FAILED = -1,
6+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import { Prop, Schema, SchemaFactory } from '@nestjs/mongoose';
2+
import { Document, Types } from 'mongoose';
3+
import { AbstractSchema } from '~/_common/abstracts/schemas/abstract.schema';
4+
import { JobState } from '../_enums/state.enum';
5+
6+
export type JobsDocument = Jobs & Document;
7+
8+
@Schema({ versionKey: false })
9+
export class Jobs extends AbstractSchema {
10+
@Prop({
11+
type: String,
12+
required: true,
13+
})
14+
public jobId: string;
15+
16+
@Prop({
17+
type: String,
18+
required: true,
19+
})
20+
public action: string;
21+
22+
@Prop({
23+
type: Types.ObjectId,
24+
required: true,
25+
})
26+
public concernedTo?: Types.ObjectId;
27+
28+
@Prop({ type: String })
29+
public comment?: string;
30+
31+
@Prop({ type: Object, default: {} })
32+
public params?: object;
33+
34+
@Prop({ type: Object, default: {} })
35+
public result?: object;
36+
37+
@Prop({ type: Date })
38+
public processedAt?: Date;
39+
40+
@Prop({ type: Date })
41+
public finishedAt?: Date;
42+
43+
@Prop({ type: Number, enum: JobState, default: JobState.CREATED })
44+
state: JobState;
45+
}
46+
47+
export const JobsSchema = SchemaFactory.createForClass(Jobs);

0 commit comments

Comments
 (0)