-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
196 lines (154 loc) · 7.96 KB
/
server.py
File metadata and controls
196 lines (154 loc) · 7.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
#!/usr/bin/env python3
"""
WebSocket client to Prometheus metrics server
Connects to a WebSocket server to receive sensor data and exposes it as Prometheus metrics
written mainly by claude.ai
"""
import asyncio
import json
from aiohttp import web, ClientSession, WSMsgType
import logging
import argparse
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global state
current_metrics = {}
ws_connected = False
def get_location(model, channel, sensors_config):
"""Get location name for a given model and channel"""
return sensors_config.get(model, {}).get(channel, "unknown")
def format_prometheus_metrics():
"""Format current metrics in Prometheus exposition format"""
lines = []
# Connection status
lines.append("# HELP sensor_status WebSocket connection status (1=connected, 0=disconnected)")
lines.append("# TYPE sensor_status gauge")
lines.append(f"sensor_status {1 if ws_connected else 0}")
if current_metrics:
# Temperature
lines.append("# HELP sensor_temperature_celsius Temperature in Celsius")
lines.append("# TYPE sensor_temperature_celsius gauge")
for key, data in current_metrics.items():
location = data.get('location', 'unknown')
temp = data.get('temperature_C', 0)
lines.append(f'sensor_temperature_celsius{{location="{location}"}} {temp}')
# Humidity
lines.append("# HELP sensor_humidity_percent Relative humidity percentage")
lines.append("# TYPE sensor_humidity_percent gauge")
for key, data in current_metrics.items():
location = data.get('location', 'unknown')
humidity = data.get('humidity', 0)
lines.append(f'sensor_humidity_percent{{location="{location}"}} {humidity}')
# Battery
lines.append("# HELP sensor_battery_ok Battery status (1=ok, 0=low)")
lines.append("# TYPE sensor_battery_ok gauge")
for key, data in current_metrics.items():
location = data.get('location', 'unknown')
battery = data.get('battery_ok', 0)
lines.append(f'sensor_battery_ok{{location="{location}"}} {battery}')
# Last update timestamp
lines.append("# HELP sensor_last_update_timestamp Unix timestamp of last update")
lines.append("# TYPE sensor_last_update_timestamp gauge")
for key, data in current_metrics.items():
location = data.get('location', 'unknown')
timestamp = data.get('timestamp', 0)
lines.append(f'sensor_last_update_timestamp{{location="{location}"}} {timestamp}')
return "\n".join(lines) + "\n"
async def metrics_handler(request):
"""HTTP handler for /metrics endpoint"""
metrics = format_prometheus_metrics()
return web.Response(text=metrics, content_type="text/plain; version=0.0.4")
async def websocket_client(ws_url, sensors_config):
"""WebSocket client that connects to the remote server"""
global ws_connected
while True:
try:
logger.info(f"Connecting to WebSocket server at {ws_url}")
async with ClientSession() as session:
async with session.ws_connect(ws_url) as ws:
ws_connected = True
logger.info("Connected to WebSocket server")
# Send get_protocols command after connection
await ws.send_json({"cmd": "get_protocols"})
logger.info("Sent get_protocols command")
# Listen for messages
async for msg in ws:
if msg.type == WSMsgType.TEXT:
try:
data = json.loads(msg.data)
# Skip protocol responses
if 'protocols' in data:
logger.info("Received protocols response")
continue
# Process sensor data
model = data.get('model')
channel = data.get('channel')
if model and channel is not None:
location = get_location(model, str(channel), sensors_config)
key = f"{model}_{channel}"
# Store metrics
current_metrics[key] = {
'location': location,
'temperature_C': data.get('temperature_C', 0),
'humidity': data.get('humidity', 0),
'battery_ok': data.get('battery_ok', 0),
'time': data.get('time', ''),
'timestamp': asyncio.get_event_loop().time()
}
logger.info(f"Updated metrics for {location}: "
f"temp={data.get('temperature_C')}°C, "
f"humidity={data.get('humidity')}%, "
f"battery_ok={data.get('battery_ok')}")
except json.JSONDecodeError:
logger.error(f"Invalid JSON received: {msg.data}")
except Exception as e:
logger.error(f"Error processing message: {e}")
elif msg.type == WSMsgType.ERROR:
logger.error(f"WebSocket error: {ws.exception()}")
break
elif msg.type == WSMsgType.CLOSED:
logger.warning("WebSocket connection closed")
break
except Exception as e:
logger.error(f"WebSocket connection error: {e}")
finally:
ws_connected = False
logger.info("Disconnected from WebSocket server")
# Wait before reconnecting
logger.info("Reconnecting in 5 seconds...")
await asyncio.sleep(5)
async def start_background_tasks(app):
"""Start background tasks"""
app['websocket_task'] = asyncio.create_task(websocket_client(app['ws_url'], app['sensors_config']))
async def cleanup_background_tasks(app):
"""Cleanup background tasks"""
app['websocket_task'].cancel()
await app['websocket_task']
def main(host, port, rtl433_ws, sensors_config):
"""Main entry point"""
app = web.Application()
app['ws_url'] = rtl433_ws
with open(sensors_config) as f:
data = json.load(f)
app['sensors_config'] = data
app.router.add_get('/metrics', metrics_handler)
# Setup background tasks
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
logger.info("Starting WebSocket client to Prometheus server")
logger.info(f"WebSocket server: {rtl433_ws}")
logger.info(f"Metrics endpoint: http://{host}:{port}/metrics")
logger.info(f"Configured sensors: {app['sensors_config']}")
web.run_app(app, host=host, port=port)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
"--host", required=False, default="0.0.0.0")
parser.add_argument(
"--port", required=False, default=9100)
parser.add_argument(
"--rtl433-ws", required=False, default="ws://127.0.0.1:8433")
parser.add_argument(
"--sensors-config", required=True)
args = parser.parse_args()
main(args.host, args.port, args.rtl433_ws, args.sensors_config)