-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathzabbix_processor.go
More file actions
376 lines (313 loc) · 11 KB
/
zabbix_processor.go
File metadata and controls
376 lines (313 loc) · 11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
package main
import (
"bufio"
"bytes"
"flag"
"fmt"
"github.com/cbuehlmann/zabbixtools/zabbix"
log "github.com/inconshreveable/log15"
"io"
"io/ioutil"
"math"
"os"
"os/exec"
"strconv"
"strings"
"time"
"sort"
)
/**
* Concept:
* 1 Find Host ID's
* 1a filter by Template
* 1b filter by Host-Filter
* 2 Find Items (filter by Host ID's)
* 3 Process items -> store processing value
* [4] push data back to server
*/
var Log = log.New()
// zabbix_sender format
var zabbixSenderBytes bytes.Buffer
// Template ID to Template Name
var templates map[string]string = make(map[string]string, 0)
// Host ID to Host Name
var hosts map[string]string = make(map[string]string, 0)
func main() {
var err error
// configuration
configfile := flag.String("config", "~/.zabbix_processor.yml", "configuration file")
apiUrl := flag.String("url", "", "ZABBIX frontend/API URL")
username := flag.String("username", "", "ZABBIX username")
password := flag.String("password", "", "ZABBIX password")
// operation
allhosts := flag.Bool("all", false, "if no hosts are found, work on all hosts. this might block your server for a long time")
verbose := flag.Bool("verbose", false, "be verbose (log level debug)")
quiet := flag.Bool("quiet", false, "just print result. overrides -verbose")
output := flag.String("output", "-", "destination for processed values")
nop := flag.Bool("nop", false, "do not publish values, even when zabbix_sender is configured")
flag.Parse()
var configuration zabbix.Configuration
if *configfile != "" {
var err error
configuration, err = zabbix.ReadConfigurationFromFile(*configfile)
if err != nil {
_, _ = fmt.Fprintln(os.Stderr, "unable to parse configuration file", *configfile, err)
os.Exit(2)
}
} else {
configuration = zabbix.Configuration{}
}
handler := log.StdoutHandler
if *verbose == false {
handler = log.LvlFilterHandler(log.LvlInfo, log.StdoutHandler)
}
if *quiet {
handler = log.DiscardHandler()
}
// without filter
Log.SetHandler(handler)
zabbix.Log.SetHandler(handler)
if *username != "" {
if configuration.Zabbix.Api.Username != "" {
Log.Debug("username from command line overrides configuration value")
}
configuration.Zabbix.Api.Username = *username
}
if *password != "" {
if configuration.Zabbix.Api.Password != "" {
Log.Debug("password from command line overrides configuration value")
}
configuration.Zabbix.Api.Username = *password
}
if *apiUrl != "" {
if configuration.Zabbix.Api.URL != "" {
Log.Debug("api uri from command line overrides configuration value")
}
configuration.Zabbix.Api.URL = *apiUrl
}
session := zabbix.Session{URL: configuration.Zabbix.Api.URL}
Log.Info("authenticating", "server", session.URL)
err = zabbix.Login(&session, configuration.Zabbix.Api.Username, configuration.Zabbix.Api.Password)
if err != nil {
_, _ = fmt.Fprintln(os.Stderr, "login failed", err)
os.Exit(3)
}
Log.Info("login successful", "token", session.Token)
collectHostsByTemplate(session, configuration)
collectHosts(session, configuration)
if len(hosts) == 0 && *allhosts == false {
Log.Warn("no hosts found by filter. to process all hosts, use the --all command line option")
return
}
findItems(session, configuration)
if *output != "-" {
err := ioutil.WriteFile(*output, zabbixSenderBytes.Bytes(), 0644)
if err != nil {
_, _ = fmt.Fprintln(os.Stderr, "cannot write file ", *output, err)
}
} else {
// write to stdout
io.Copy(os.Stdout, bytes.NewReader(zabbixSenderBytes.Bytes()))
}
if *nop == false && len(configuration.Zabbix.Sender.Host) > 0 {
exitCode := sendItemData(configuration, *output, *verbose)
os.Exit(exitCode)
}
}
func sendItemData(configuration zabbix.Configuration, filename string, verbose bool) int {
Log.Info("publishing data to ZABBIX server", "host", configuration.Zabbix.Sender.Host)
senderPath := configuration.Zabbix.Sender.Binary
if len(senderPath) < 1 {
senderPath = "zabbix_sender"
}
commandline := []string{"--zabbix-server", configuration.Zabbix.Sender.Host, "--with-timestamps"}
if configuration.Zabbix.Sender.Port != 0 {
commandline = append(commandline, "--port", strconv.Itoa(configuration.Zabbix.Sender.Port))
}
commandline = append(commandline, "--input-file", filename)
if verbose {
commandline = append(commandline, "-vv")
}
Log.Info("starting transmission with", "binary", senderPath, "arguments", commandline)
command := exec.Command(senderPath, commandline...)
var b bytes.Buffer
writer := bufio.NewWriter(&b)
command.Stdout = writer
command.Stderr = os.Stderr
if filename == "-" {
// in memory transfer
command.Stdin = bytes.NewReader(zabbixSenderBytes.Bytes())
}
err := command.Run()
result := b.String()
if err != nil {
_, _ = fmt.Fprintln(os.Stderr, "zabbix_sender failed ", err, result)
return 5
}
Log.Info("transmitted", "result", result)
return 0
}
func fetch(session zabbix.Session, item zabbix.ItemResponseElement, date time.Time, window time.Duration) []zabbix.HistoryValue {
query := session.NewHistoryQuery()
query.ValueType = item.ValueType
query.Items = []string{item.ItemID}
query.From = date.Add(-window).Unix()
query.To = date.Add(+window).Unix()
Log.Debug("loading history for item", "item", item,
"from", time.Unix(query.From, 0).Format("Mon 01-02 15:04:05"),
"to", time.Unix(query.To, 0).Format("Mon 01-02 15:04:05"))
values := query.Query()
return values
}
func getClosestValue(timepoint time.Time, values []zabbix.HistoryValue) zabbix.HistoryValue {
closest := 3600.0 * 24 * 356 // 1Y
index := -1
for i, value := range values {
diff := math.Abs(float64(timepoint.Unix() - value.Clock))
if closest > diff {
index = i
closest = diff
}
}
if index >= 0 {
return values[index]
} else {
return zabbix.HistoryValue{}
}
}
/**
* Fetch n weeks back.
*/
func compareWeeks(session zabbix.Session, item zabbix.ItemResponseElement, weeks int, window time.Duration) (float64, time.Time) {
now := time.Now()
// now fetch latest value
values := fetch(session, item, now.Add(-window), window)
if len(values) == 0 {
Log.Info("no current value found in window",
"from", now.Add(-window).Format("01-02 15:04:05"),
"to", now.Add(window).Format("01-02 15:04:05"))
return math.NaN(), now
}
current, _ := strconv.ParseFloat(values[0].Value, 64)
// Sample timepoint
timestamp := time.Unix(values[0].Clock, values[0].Nano)
Log.Info("current value", "value", current, "exact timestamp", timestamp.Format("Mon 01-02 15:04:05"))
historicValues := make([]float64, 0)
// search with the exact timestamp of most recent sample
tp := timestamp
oneWeek := time.Hour * 24 * 7
//oneWeek := time.Hour * 24
for i := 0; i < weeks; i++ {
tp = tp.Add(-oneWeek) // step one week back
closest := getClosestValue(tp, fetch(session, item, tp, window))
if closest.Clock != 0 {
value, _ := strconv.ParseFloat(closest.Value, 64)
historicValues = append(historicValues, value)
when := time.Unix(closest.Clock, closest.Nano)
Log.Info("historic value", "value", value, "date", when.Format("Mon 01-02 15:04:05"))
} else {
Log.Warn("missing historic value", "around", tp.Format("Mon 01-02 15:04:05"))
}
}
historic := average(historicValues)
Log.Info("calculation done", log.Ctx{"average": historic, "current": current, "difference": current - historic})
return current - historic, timestamp
}
func average(values []float64) float64 {
if len(values) > 2 {
sort.Float64s(values)
values = values[1 : len(values)-1]
}
sum := float64(0)
for _, value := range values{
sum = sum + value
}
return sum / float64(len(values))
}
/**
* Find matching Hosts by template filter
*/
func collectHostsByTemplate(session zabbix.Session, configuration zabbix.Configuration) {
for index, templateConfiguration := range configuration.Templates {
Log.Debug("filtering templateHits with", "filter", templateConfiguration, "index", index)
req := session.NewTemplateQuery(templateConfiguration.Filter, templateConfiguration.Search)
templateHits := req.Query()
Log.Debug("processing matching templateHits", "templateHits", templateHits)
for _, template := range templateHits {
Log.Debug("adding template", "id", template.TemplateId, "name", template.Name)
templates[template.TemplateId] = template.Name
}
Log.Info("collected templates", "templates", templates)
}
}
/**
* Collect host details
*/
func collectHosts(session zabbix.Session, configuration zabbix.Configuration) {
// collect hosts linked with templates
if len(templates) > 0 {
keys := keysFromMap(templates)
hostQuery := session.NewHostQuery(keys, nil, nil)
hostElements := hostQuery.Query()
for _, hostElement := range hostElements {
hosts[hostElement.HostID] = hostElement.Name
}
Log.Debug("collected hosts via template lookup", "hosts", hosts)
}
// collect hosts by filters
if len(configuration.Hosts) > 0 {
for index, hostConfiguration := range configuration.Hosts {
hostQuery := session.NewHostQuery([]string{}, hostConfiguration.Filter, hostConfiguration.Search)
hostElements := hostQuery.Query()
for _, hostElement := range hostElements {
hosts[hostElement.HostID] = hostElement.Name
}
Log.Debug("collected hosts via host filter", "hosts", hosts, "index", index)
}
}
Log.Info("working with the following hosts", "hosts", hosts)
}
func keysFromMap(input map[string]string) []string {
keys := make([]string, 0)
for key := range input {
keys = append(keys, key)
}
return keys
}
func findItems(session zabbix.Session, configuration zabbix.Configuration) {
for index, itemFilter := range configuration.Items {
Log.Debug("processing items of filter", "index", index)
query := session.NewItemQuery(keysFromMap(hosts), itemFilter.Filter, itemFilter.Search)
query.SearchWildcardsEnabled = true
items := query.Query()
if len(items) > 0 {
// find all active hosts
processItems(session, items, itemFilter)
} else {
Log.Warn("no items found", "hosts", keysFromMap(hosts))
}
}
}
func processItems(session zabbix.Session, items []zabbix.ItemResponseElement, itemConfiguration zabbix.ItemConfiguration) {
for index, item := range items {
Log.Info(fmt.Sprintf("processing item %d/%d", index, len(items)), "itemid", item.ItemID, "key", item.Key, "data", item)
if itemConfiguration.PastWeeks.Weeks > 0 {
halfWindow := time.Duration(itemConfiguration.PastWeeks.Window / 2)
value, timestamp := compareWeeks(session, item, itemConfiguration.PastWeeks.Weeks, halfWindow*time.Second)
if math.IsNaN(value) == false {
addSenderLine(hosts[item.HostID], item.Key, itemConfiguration.Postfix, timestamp, value)
} else {
Log.Warn("skipping item due to missing data", "item", item)
}
}
}
}
func addSenderLine(hostname string, key string, postfix string, timestamp time.Time, value float64) {
newKey := strings.Replace(key, "[", postfix + "[" ,1)
line := fmt.Sprintf("\"%s\" %s %d %f\n", hostname, newKey, timestamp.Unix(), value)
_, err := zabbixSenderBytes.WriteString(line)
if err != nil {
Log.Warn("error writing item data", "error", err)
}
Log.Info("appending zabbix_sender line", "line", line)
}