-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
73 lines (52 loc) · 2.5 KB
/
database.py
File metadata and controls
73 lines (52 loc) · 2.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import pandas as pd
import numpy as np
from influxdb import InfluxDBClient, DataFrameClient
from datetime import datetime, timedelta
import logging
logger = logging.getLogger('influxDB')
# Constants
INFLUXDB_ADDRESS = 'influxdb' # for Docker Compose
INFLUXDB_ADDRESS = 'localhost' # otherwise
INFLUXDB_USER = 'mimose'
INFLUXDB_PASSWORD = 'demo'
INFLUXDB_DATABASE = 'sensornode'
class influxDB():
def __init__(self):
''' Initialize the influxDB database. Create new or connecte to exisiting.
'''
self.client = InfluxDBClient(INFLUXDB_ADDRESS, 8086, INFLUXDB_USER, INFLUXDB_PASSWORD, None)
self.df_client = DataFrameClient(INFLUXDB_ADDRESS, 8086, INFLUXDB_USER, INFLUXDB_PASSWORD, INFLUXDB_DATABASE)
databases = self.client.get_list_database()
if len(list(filter(lambda x: x['name'] == INFLUXDB_DATABASE, databases))) == 0:
self.client.create_database(INFLUXDB_DATABASE) # Create new DB
logger.info("Successfully created new database: " + INFLUXDB_DATABASE)
# Switch to database
self.client.switch_database(INFLUXDB_DATABASE)
logger.info("Successfully connected to existing database: " + INFLUXDB_DATABASE)
def read(self, measurement:str, field_key:str, start_time:datetime, end_time:datetime) -> pd.DataFrame:
# Convert to UTC time
start_time = ( start_time - timedelta(hours=2) ).strftime('%Y-%m-%dT%H:%M:%SZ')
end_time = ( end_time - timedelta(hours=2) ).strftime('%Y-%m-%dT%H:%M:%SZ')
query = "SELECT %s FROM %s WHERE time >= '%s' AND time <= '%s'" %(field_key, measurement, start_time, end_time)
result = self.df_client.query(query)
points = pd.DataFrame(result[measurement])
points.index.name = 'datetime'
return points
def save_dict(self, measurement:str, fields:dict):
''' Save data dict fields to influxDB measurement. Time stamps are in UTC time. '''
# Return if field is empty
if not fields:
logger.error('Field dict of measurement %s is empty. Nothing written to DB.' %measurement)
return
try:
json_body = [
{
"measurement": measurement,
"tags": {},
"fields": fields
}
]
self.client.write_points(json_body)
logger.debug('Sensordata: ' + str(json_body))
except Exception as err:
logger.error('Could not write data to InfluxDB: ' + str(err))