-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsynology_activebackuplogs-example-2.py
More file actions
146 lines (125 loc) · 5.56 KB
/
synology_activebackuplogs-example-2.py
File metadata and controls
146 lines (125 loc) · 5.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# This example will return all events with backup_results in the JSON.
import datetime
import argparse
import logging
import traceback
# TRMM snippet for production
# {{synology_activebackuplogs_snippet.py}}
# Dev
import synology_activebackuplogs_snippet
def main(logger=logging.getLogger(), ago_unit='day', ago_value=1, log_path='', log_glob='log.txt*'):
# timedelta docs: https://docs.python.org/3/library/datetime.html#timedelta-objects
# Note: 'years' is not valid. Use 'days=365' to represent one year.
# Values include:
# weeks
# days
# hours
# minutes
# seconds
after = datetime.timedelta(**{ago_unit: ago_value})
logger.debug('Instantiating the synology_activebackuplogs_snippet class')
synology = synology_activebackuplogs_snippet.SynologyActiveBackupLogs(
# Search logs within the period specified.
# timedelta() will be off by 1 minute because 1 minute is added to detect if the log entry is last year vs.
# this year. This should be negligible.
after=after,
# Use different log location
log_path=log_path,
# Use different filename globbing
filename_glob=log_glob,
# Pass the logger
logger=logger
)
# Load the log entries
logger.debug(f'Loading log files in "{log_path}"')
synology.load()
# Search for entries that match the criteria.
find = {
'method_name': 'server-requester.cpp',
# 'json': {
# 'backup_result': {
# # Find all records with backup_results
# }
# },
}
logger.debug('Searching the log files')
found = synology.search(find=find)
ts = (datetime.datetime.now() - after).strftime('%Y-%m-%d %X')
if not found:
logger.info(f"No log entries found since {ts}")
return
# True if log entries were found
errors_found = False
# Print the log events
logger.debug('Printing the results')
print(f'Log entries were found since {ts}:')
for event in found:
try:
# Need to check if the keys are in the event. An error is thrown if a key is accessed that does not exist.
if 'json' not in event or event['json'] is None:
continue
if 'backup_result' not in event['json']:
continue
if 'last_success_time' not in event['json']['backup_result']:
continue
if 'last_backup_status' not in event['json']['backup_result']:
continue
# Nicely formatted timestamp
ts = event['datetime'].strftime('%Y-%m-%d %X')
ts_backup = datetime.datetime.fromtimestamp(event['json']['backup_result']['last_success_time'])
delta_backup = datetime.datetime.now() - ts_backup
# delta_backup.days is an integer and does not take into account hours.
if event['json']['backup_result']['last_backup_status'] == 'complete' and delta_backup.days >= 3:
errors_found = True
# Always print the output, so it's visible to the users.
task_name = ''
transferred = 0
if 'running_task_result' in event['json']:
if 'task_name' in event['json']['running_task_result']:
task_name = event['json']['running_task_result']['task_name']
if 'transfered_bytes' in event['json']['running_task_result']:
transferred = event['json']['running_task_result']['transfered_bytes']
print(f"{ts}: {event['json']['backup_result']} Task name: '{task_name}' Transferred: '{transferred}' Days/Hours ago: {delta_backup}")
except TypeError as err:
logging.warning(f'Failed to check for key before using. Skipping this event. ERR: {err}')
logging.warning(traceback.format_exc())
logging.warning(f'Event: {event}')
# raise
continue
if errors_found:
# Errors found. Exit with failure
exit(1)
else:
# No errors found. Exit successful
exit(0)
# Main entrance here...
if __name__ == '__main__':
# Parse command line arguments
parser = argparse.ArgumentParser(description='Parse the Synology Active Backup for Business logs.')
parser.add_argument('--log-level', default='info', dest='log_level',
choices=['debug', 'info', 'warning', 'error', 'critical'],
help='set log level for the Synology Active Backup for Business module')
parser.add_argument('--log-path', default='', type=str,
help='path to the Synology log files')
parser.add_argument('--log-glob', default='log.txt*', type=str,
help='filename glob for the log files')
parser.add_argument('--ago-unit', default='day', type=str,
help='time span unit, one of [seconds, minutes, hours, days, weeks]')
parser.add_argument('--ago-value', default='1', type=int,
help='time span value')
args = parser.parse_args()
# Change default log level to INFO
default_log_level = 'INFO'
if args.log_level:
default_log_level = args.log_level.upper()
log_format = '%(asctime)s %(funcName)s(%(lineno)d): %(message)s'
logging.basicConfig(format=log_format, level=default_log_level)
top_logger = logging.getLogger()
top_logger.setLevel(default_log_level)
main(**{
'logger': top_logger,
'log_path': args.log_path,
'log_glob': args.log_glob,
'ago_unit': args.ago_unit,
'ago_value': args.ago_value
})