-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmove-math.js
More file actions
executable file
·167 lines (151 loc) · 4.99 KB
/
move-math.js
File metadata and controls
executable file
·167 lines (151 loc) · 4.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
#!/usr/bin/env nodejs
'use strict';
const cassandra = require('cassandra-driver');
const P = require('bluebird');
const preq = require('preq');
const yargs = require('yargs');
const argv = yargs
.usage('Usage: $0 <restbase-url>')
.options('h', {alias: 'help'})
.options('H', {
alias: 'hostname',
default: 'localhost',
describe: 'Contact hostname',
type: 'string'
})
.options('P', {
alias: 'port',
default: 9042,
describe: 'Contact port number',
type: 'number'
})
.options('u', {
alias: 'username',
default: 'cassandra',
describe: 'Cassandra username',
type: 'string'
})
.options('p', {
alias: 'password',
default: 'cassandra',
describe: 'Cassandra password',
type: 'string'
})
.argv;
if (argv.help || !argv._[0]) {
yargs.showHelp();
process.exit(0);
}
const host = argv.hostname;
const port = argv.port;
const rbUri = `${argv._[0]}/wikimedia.org/v1/media/math`;
const contact = `${host}:${port}`;
const user = argv.username;
const pass = argv.password;
/** Creates a single connection pool. */
function connect() {
const client = new cassandra.Client({
contactPoints: [ contact ],
authProvider: new cassandra.auth.PlainTextAuthProvider(user, pass),
sslOptions: { ca: '/dev/null' },
promiseFactory: P.fromCallback,
queryOptions: { consistency: cassandra.types.consistencies.one },
});
return client.connect().then(() => client);
}
function _nextPage(client, query, params, pageState, options) {
return P.try(() => client.execute(query, params, {
prepare: true,
fetchSize: options.fetchSize || 5,
pageState,
}))
.catch((err) => {
if (!options.retries) {
throw err;
}
options.retries--;
return _nextPage(client, query, params, pageState, options);
});
}
/**
* Async-safe Cassandra query execution
*
* Client#eachRow in the Cassandra driver relies upon a synchronous callback
* to provide back-pressure during paging; This function can safely execute
* async callback handlers.
*
* @param {object} cassandra-driver Client instance
* @param {string} CQL query string
* @param {array} CQL query params
* @param {object} options map
* @param {function} function to invoke for each row result
*/
function eachRow(client, query, params, options, handler) {
options.log = options.log || (() => {});
const origOptions = Object.assign({}, options);
function processPage(pageState) {
options.retries = origOptions.retries;
return _nextPage(client, query, params, pageState, options)
.then((res) => P.try(() => P.map(res.rows, row => handler(row), { concurrency: 32 }))
.then(() => {
if (!res || !res.pageState) {
return P.resolve();
} else {
return processPage(res.pageState);
// Break the promise chain, so that we don't hold onto a
// previous page's memory.
//process.nextTick(() => P.try(() => processPage(res.pageState)).catch((e) => {
// there's something going on, ignore
//}).then(() => resolve()));
}
}));
}
return processPage(null);
}
let count = 0;
let cc;
let startTime = Date.now();
return connect().then((client) => {
cc = client;
return eachRow(
client,
'SELECT key, value FROM "local_group_globaldomain_T_mathoid_input".data',
{},
{
retries: 10,
fetchSize: 512,
log: console.log
},
(row) => {
let value;
try {
value = JSON.parse(row.value);
} catch(e) {
console.error(`${row.key}: Cannot parse value - ${e.message}`);
return P.resolve();
}
return P.try(() => preq.post({
uri: `${rbUri}/check/${value.type}`,
headers: { 'content-type': 'application/json' },
body: { q: value.q },
encoding: null
}).then((res) => preq.get({
uri: `${rbUri}/render/svg/${res.headers['x-resource-location']}`,
encoding: null
}))).catch((e) => {
console.error(`(${count}) ${row.key}: Error while requesting: ${e.message}`);
}).then(() => {
value = undefined;
count++;
if(count % 10000 === 0) {
console.log(`- ${count}\t${(Date.now() - startTime) / 1000.0}`);
startTime = Date.now();
}
if(count % 128 === 0) {
return new P((resolve) => process.nextTick(() => resolve()));
}
return P.resolve();
});
}
);
}).then(() => console.log(`Total count: ${count}`)).finally(() => cc.shutdown());