forked from emoncms/postprocess
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpostprocess_run.php
More file actions
79 lines (62 loc) · 2.28 KB
/
postprocess_run.php
File metadata and controls
79 lines (62 loc) · 2.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
<?php
// Get script location
list($scriptPath) = get_included_files();
$basedir = str_replace("/postprocess_run.php","",$scriptPath);
define('EMONCMS_EXEC', 1);
chdir("/var/www/emoncms");
require "process_settings.php";
$dir = $settings["feed"]["phpfina"]["datadir"];
chdir($basedir);
$fp = fopen("/tmp/postprocess-runlock", "w");
if (! flock($fp, LOCK_EX | LOCK_NB)) { echo "Already running\n"; die; }
require "common.php";
require "request.php";
// Auto load processes
$processes = array();
$files = scandir($basedir."/processes");
for ($i=2; $i<count($files); $i++) {
// compile process list
$process = str_replace(".php","",$files[$i]);
$processes[] = $process;
// full file location and name
$process_file = $basedir."/processes/".$process.".php";
// Include the process file and check that process function exists
require $process_file;
if (!function_exists($process)) {
echo "Error: missing process function: $process\n"; die;
}
}
if (!$settings['redis']['enabled']) { echo "ERROR: Redis is not enabled"; die; }
$redis = new Redis();
$connected = $redis->connect($settings['redis']['host'], $settings['redis']['port']);
if (!$connected) { echo "Can't connect to redis at ".$settings['redis']['host'].":".$settings['redis']['port']; die; }
if (!empty($settings['redis']['prefix'])) $redis->setOption(Redis::OPT_PREFIX, $settings['redis']['prefix']);
if (!empty($settings['redis']['auth'])) {
if (!$redis->auth($settings['redis']['auth'])) {
echo "Can't connect to redis at ".$settings['redis']['host'].", autentication failed"; die;
}
}
while(true){
$len = $redis->llen("postprocessqueue");
if ($len>0) {
$processitem = $redis->lpop("postprocessqueue");
print $processitem."\n";
process($processitem);
} else {
break;
}
sleep(1);
}
function process($processitem) {
$processitem = json_decode($processitem);
if ($processitem==null) return false;
global $dir,$processes;
$process = $processitem->process;
if (array_search($process,$processes)!==false) {
$result = $process($dir,$processitem);
}
}
function updatetimevalue($id,$time,$value){
global $redis;
$redis->hMset("feed:$id", array('value' => $value, 'time' => $time));
}