-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathklyft.js
More file actions
172 lines (141 loc) · 5.26 KB
/
klyft.js
File metadata and controls
172 lines (141 loc) · 5.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
const fork = require('child_process').fork
const rndStr = require('randomstring').generate
const debugLog = require('./lib/helper.js')
let debugEnabled = false
let debugImportant = false
//TODO: Add possibility to "pipe-down" return values from finishing job to the next one in the same thread. This could be done by providing the option to queue not only one job at once (which is then automatically assigned to some thread) but by enqueueing an array of jobs which will then be placed in the given order into the same thread.
class Worker {
constructor(moduleName, logName, threads, allowDuplicates, killIfIdle) {
this.logName = logName || 'klyft'
debugLog(debugEnabled, this.logName, 'initializing worker for '+moduleName)
this.jobQueueHandler = fork(__dirname + '/lib/queue.js')
debugLog(debugImportant, this.logName, 'worker live at pid '+this.jobQueueHandler.pid)
const EventEmitter = require('events')
class WorkerEmitter extends EventEmitter{}
this.emitter = new WorkerEmitter()
this.jobQueueHandler.send({
type: 'init-queue',
module: moduleName,
debugImportant: debugImportant,
debugEnabled: debugEnabled,
logName: this.logName,
jobsToRunParallel: threads,
allowDuplicates: allowDuplicates,
killIfIdle: killIfIdle,
pid: this.jobQueueHandler.pid
})
this.jobQueueHandler.on('message', m => {
if(killIfIdle && m.type === 'status' && m.data === 'queue-completed') {
debugLog(debugImportant, this.logName, 'terminating worker '+this.jobQueueHandler.pid)
this.jobQueueHandler.kill()
} else {
this.emitter.emit('message', m)
}
})
this.threadCount = threads
this.inProgress = []
this.killIfIdle = killIfIdle || false
}
queue(jobName, args) {
const dateString = Date.now().toString().split('').splice(8).join('')
const ID = rndStr(8) + dateString
debugLog(debugEnabled, this.logName, 'queueing '+jobName+' ('+ID+')')
this.inProgress.push(ID)
// Each job need a listener to react when it is done, a suboptimal way to do it but whoever pushes 900 jobs at once should expect problems.
this.emitter.setMaxListeners(this.inProgress.length)
return new Promise((resolve, rej) => {
this.emitter.on('message', msg => {
if(msg.type === 'job-done') {
if(msg.id === ID) {
this._updateWorker(ID)
this.emitter.removeAllListeners('message')
resolve(msg.data)
}
}
})
try {
if(this.jobQueueHandler.connected) {
this.jobQueueHandler.send({
type: 'init-job',
id: ID,
// Since only this will be accessible for the Job, the ID is passed here as well
data: {
name: jobName,
args: args,
id: ID
}
})
} else {
throw new Error(`Job ${ID} could not be executed, worker ${this.jobQueueHandler.pid} is not connected.`)
}
} catch(err) {
console.log(err)
}
})
}
kill() {
this.jobQueueHandler.send({
type: 'kill-queue'
})
let killedThreads = 0
this.jobQueueHandler.on('message', m => {
if(m.type === 'kill-queue' && m.data === 'success') {
killedThreads++
if(killedThreads == this.threadCount) {
debugLog(debugImportant, this.logName, 'terminating worker '+this.jobQueueHandler.pid)
this.jobQueueHandler.kill()
}
}
})
}
_updateWorker(ID) {
// remove the finished job ids so we can find out if the queue is idling or not
this.inProgress = this.inProgress.filter(id => {
return id !== ID
})
}
}
class Job {
constructor(name, callback) {
this.name = name
this.callback = callback
this.listen()
}
listen() {
process.on('message', m => {
if(m.name === this.name) {
// Confirm that the job request was received, so that the Queue knows if the requested job even exists.
process.send({
type: 'status',
data: 'starting',
id: m.id
})
new Promise((resolve, rej) => {
// run the actual task
this.callback(m.args, function(result) {
// do NOT arrow-ize this function!
resolve(result)
})
}).then(result => {
// send the results back to the main thread after callback completes
if(result === undefined) {
result = 'K_undef'
} else if(result === null) {
result = 'K_null'
}
process.send({
type: 'result',
data: result,
id: m.id
})
})
}
})
}
}
module.exports = {
Worker, Job, enableDebug: onlyImportant => {
debugEnabled = onlyImportant ? false : true
debugImportant = true // always on
}
}