forked from pushkin-consortium-deprecated/deprecated2
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindex.js
More file actions
112 lines (108 loc) · 4.68 KB
/
index.js
File metadata and controls
112 lines (108 loc) · 4.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
const amqp = require('amqplib');
const Worker = require('./worker');
const DB_WRITE_QUEUE = '_db_write';
const DB_RPC_WORKER = '_rpc_worker';
const fs = require('fs');
const path = require('path');
const logger = require('./logger');
const quizzes = fs
.readdirSync(path.resolve(__dirname, './models'))
.filter(folder =>
fs.lstatSync(path.resolve(__dirname, './models', folder)).isDirectory()
);
// create the connection
amqp
.connect(process.env.AMPQ_ADDRESS)
.then(conn => {
// create seperate channel for this instance of the worker
return conn.createChannel().then(ch => {
logger.info('channel created');
// on SIGINT ensure the channel is closed
process.once('SIGINT', function() {
conn.close();
});
// create a rpc listener on both quizzed
return Promise.all(
quizzes.map(quizName => {
const rpc_queue = quizName + DB_RPC_WORKER;
const write_queue = quizName + DB_WRITE_QUEUE;
logger.info('in promise all');
// name of the queue
// this queue is not marked as durable, as it is an rpc
// if rabbitmq goes down, theres really nothing to replyTo
let durable = false;
return ch
.assertQueue(rpc_queue, { durable })
.then(() => {
logger.log('queue asserted', rpc_queue);
// set the maximum amount of messages that can be waiting before this channel accepts more
// more info here http://www.squaremobius.net/amqp.node/channel_api.html#channel_prefetch
ch.prefetch(1);
// consume on the specific queue
return ch.consume(rpc_queue, msg => {
// parse the message into a javascript object
const rpc = JSON.parse(msg.content.toString('utf8'));
const method = `${quizName}.${rpc.method}`;
logger.log('db_rpc_worker', { rpc });
logger.log('correlation_id', msg.correlationId);
logger.log('routing_key', msg.routingKey);
logger.log('reply_to', msg.replyTo);
logger.log('method', method);
// check that this method is defined on the Worker
if (typeof Worker[method] === 'undefined') {
logger.error(rpc);
// throw an error otherwise
// TODO: put this in a seperate queue of bad messages.
// possible dead letter exchange: http://www.rabbitmq.com/dlx.html
throw new Error(
`This method ${method} is not defined on the worker`
);
}
// call the Worker with the defined method
return Worker[method].apply(Worker, rpc.params).then(data => {
logger.log('Worker result', data);
// send to the queue with the original replyTo,
// pass it back with the correlation ID given
logger.log('replyTo', msg.properties.replyTo);
logger.log('correlationId', msg.properties.correlationId);
ch.sendToQueue('rob', new Buffer('garbage'));
ch.sendToQueue(
msg.properties.replyTo,
new Buffer(JSON.stringify(data)),
{ correlationId: msg.properties.correlationId }
);
// acknowledge receipt
ch.ack(msg);
});
});
})
.then(() => {
// this queue is marked as durable, we want DB writes to persist through a rabbitmq crash
durable = false;
logger.info('asserting queue', write_queue, 'durable', durable);
return ch.assertQueue(write_queue, { durable }).then(() => {
logger.log('queue asserted', write_queue);
ch.prefetch(1);
return ch.consume(write_queue, msg => {
logger.log('DB WRITE QUEUE', msg.content.toString('utf8'));
const rpc = JSON.parse(msg.content.toString('utf8'));
// Data is returned but not used for anything
const method = `${quizName}.${rpc.method}`;
logger.log('method', method);
return Worker[method].apply(Worker, rpc.params).then(data => {
logger.log('was saved in db', data);
return ch.ack(msg);
});
});
});
});
})
).then(() => {
logger.log('All queues executued');
});
});
})
.catch(err => {
console.log(err.stack);
logger.error(err.message);
});