@@ -6,19 +6,19 @@ import {
66 RequestTimeoutException ,
77 UnprocessableEntityException ,
88} from '@nestjs/common' ;
9- import { ModuleRef } from '@nestjs/core' ;
10- import { Document , ModifyResult , Query , Types } from 'mongoose' ;
11- import { AbstractQueueProcessor } from '~/_common/abstracts/abstract.queue.processor' ;
12- import { IdentityState } from '~/management/identities/_enums/states.enum' ;
13- import { Identities } from '~/management/identities/_schemas/identities.schema' ;
14- import { IdentitiesService } from '~/management/identities/identities.service' ;
15- import { JobState } from '../jobs/_enums/state.enum' ;
16- import { Jobs } from '../jobs/_schemas/jobs.schema' ;
17- import { JobsService } from '../jobs/jobs.service' ;
18- import { Tasks } from '../tasks/_schemas/tasks.schema' ;
19- import { TasksService } from '../tasks/tasks.service' ;
20- import { ActionType } from './_enum/action-type.enum' ;
21- import { ExecuteJobOptions } from './_interfaces/execute-job-options.interface' ;
9+ import { ModuleRef } from '@nestjs/core' ;
10+ import { Document , ModifyResult , Query , Types } from 'mongoose' ;
11+ import { AbstractQueueProcessor } from '~/_common/abstracts/abstract.queue.processor' ;
12+ import { IdentityState } from '~/management/identities/_enums/states.enum' ;
13+ import { Identities } from '~/management/identities/_schemas/identities.schema' ;
14+ import { IdentitiesService } from '~/management/identities/identities.service' ;
15+ import { JobState } from '../jobs/_enums/state.enum' ;
16+ import { Jobs } from '../jobs/_schemas/jobs.schema' ;
17+ import { JobsService } from '../jobs/jobs.service' ;
18+ import { Tasks } from '../tasks/_schemas/tasks.schema' ;
19+ import { TasksService } from '../tasks/tasks.service' ;
20+ import { ActionType } from './_enum/action-type.enum' ;
21+ import { ExecuteJobOptions } from './_interfaces/execute-job-options.interface' ;
2222
2323const DEFAULT_SYNC_TIMEOUT = 30_000 ;
2424
@@ -45,7 +45,7 @@ export class BackendsService extends AbstractQueueProcessor {
4545 const jobsCompleted = await this . _queue . getCompleted ( ) ;
4646 for ( const job of jobsCompleted ) {
4747 const isSyncedJob = await this . jobsService . model . findOneAndUpdate < Jobs > (
48- { jobId : job . id , state : { $ne : JobState . COMPLETED } } ,
48+ { jobId : job . id , state : { $nin : [ JobState . COMPLETED , JobState . FAILED ] } } ,
4949 {
5050 $set : {
5151 state : JobState . COMPLETED ,
@@ -105,11 +105,17 @@ export class BackendsService extends AbstractQueueProcessor {
105105 } ) ;
106106
107107 this . queueEvents . on ( 'completed' , async ( payload ) => {
108+ let jState = JobState . COMPLETED ;
109+ let iState = IdentityState . SYNCED ;
110+ if ( payload . returnvalue . status !== 0 ) {
111+ jState = JobState . FAILED ;
112+ iState = IdentityState . ON_ERROR ;
113+ }
108114 const completedJob = await this . jobsService . model . findOneAndUpdate < Jobs > (
109- { jobId : payload . jobId , state : { $ne : JobState . COMPLETED } } ,
115+ { jobId : payload . jobId } ,
110116 {
111117 $set : {
112- state : JobState . COMPLETED ,
118+ state : jState ,
113119 finishedAt : new Date ( ) ,
114120 result : payload . returnvalue ,
115121 } ,
@@ -119,10 +125,14 @@ export class BackendsService extends AbstractQueueProcessor {
119125
120126 await this . identitiesService . model . findByIdAndUpdate ( completedJob ?. concernedTo ?. id , {
121127 $set : {
122- state : IdentityState . SYNCED ,
128+ state : iState ,
123129 } ,
124130 } ) ;
125- this . logger . log ( `Job completed... Syncing [${ payload . jobId } ]` ) ;
131+ if ( jState === JobState . COMPLETED ) {
132+ this . logger . log ( `Job completed... Syncing [${ payload . jobId } ]` ) ;
133+ } else {
134+ this . logger . error ( `Job FAILED... Syncing [${ payload . jobId } ]` ) ;
135+ }
126136 } ) ;
127137 }
128138
0 commit comments