-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathSdk.php
More file actions
119 lines (104 loc) · 3.55 KB
/
Sdk.php
File metadata and controls
119 lines (104 loc) · 3.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
<?php
/**
* Leo Innovation Platform API Connection
* Requires aws sdk version 3 which is installed into
* a separate directory called aws3.
*/
namespace Leo;
use Aws\Signature\SignatureV4;
use GuzzleHttp\Client as GuzzleClient;
use GuzzleHttp\Psr7\Request;
use Aws\Credentials\CredentialProvider;
use Aws\Credentials\Credentials;
use Aws\Firehose\FirehoseClient;
use Aws\Kinesis\KinesisClient;
use Aws\S3\S3Client;
use Aws\CloudWatchLogs\CloudWatchLogsClient;
use Aws\Firehose\Exception\FirehoseException;
use Leo\lib\Config;
class Sdk {
private $config;
private $id;
public function __construct($id) {
$this->id = $id;
$this->config = array_merge([
'uploader' => 'kinesis',
'server' => gethostname(),
], Config::get());
if (empty($this->config['leosdk'])) {
die('leosdk config is not defined in the leo config. See LEO PHP SDK documentation for how the config should be setup.');
} else if (empty($this->config['leoaws'])) {
die('leoaws config is not defined in the leo config. See LEO PHP SDK documentation for how the config should be setup.');
} else if (empty($this->config['leoauth'])) {
die('leoauth is not defined in the leo config is not defined. See LEO PHP SDK documentation for how the config should be setup.');
}
if(!empty($this->config['enableLogging'])) {
$this->enableLogging();
}
}
/**
* @return Loader
**/
public function createLoader($checkpointer, $opts = []) {
if (empty($opts['config'])) {
$opts['config'] = [];
}
$opts['config'] = array_merge($this->config, $opts['config']);
if(!$this->id) {
throw new \Exception("You must specify a bot id");
}
$massuploader = $uploader = null;
switch($opts['config']['uploader']) {
case "firehose":
$uploader = new lib\Firehose($this->id, $opts['config']);
// $massuploader = new lib\Mass($this->id, $opts['config'], $uploader);
break;
case "kinesis":
$uploader = new lib\Kinesis($this->id, $opts['config']);
// $massuploader = new lib\Mass($this->id, $opts['config'], $uploader);
break;
case "mass":
throw new \Exception('Mass loading is not implemented yet');
// $kinesis = new lib\Kinesis($this->id, $opts['config']);
// $uploader = new lib\Mass($this->id, $opts['config'], $kinesis);
break;
}
return new lib\Combiner($this->id, $opts, $uploader, $massuploader,$checkpointer);
}
public function createOffloader($queue, $opts=[]) {
$opts = array_merge([
"buffer"=>1000,
"loops"=>100,
"debug"=>false,
"run_time"=>new \DateInterval('P4M')
],$opts);
if($opts['run_time'] instanceof \DateInterval) {
$opts['end_time'] = (new \DateTime())->add($opts['run_time']);
} else if(!empty($opts['run_time'])) {
$opts['end_time'] = new \DateTime("+ " . $opts['run_time']);
}
$events = new lib\Events($this->config);
return $events->getEventReader($this->id, $queue, $events->getEventRange($this->id, $queue,$opts),$opts);
}
public function createEnrichment($queue, $transform, $toQueue, $opts=[]) {
$reader = $this->createOffloader($queue,$opts);
$stream = $this->createLoader(function ($checkpoint) use ($reader){
lib\Utils::info($checkpoint);
$reader->checkpoint($checkpoint);
});
$lastEvent = null;
foreach($reader->events as $i=>$event) {
$newEvent = $transform($event['payload'], $event);
if($newEvent) {
$stream->write($toQueue,$newEvent, ["source"=>$queue, "start"=>$event['eid']]);
}
}
$stream->end();
}
public function enableLogging() {
if(!$this->id) {
throw new \Exception("You must specify a bot id");
}
return new lib\Logger($this->id, $this->config);
}
}