-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathauto_current.py
More file actions
executable file
·599 lines (512 loc) · 34.1 KB
/
auto_current.py
File metadata and controls
executable file
·599 lines (512 loc) · 34.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
#!/usr/bin/env python3
import dbus
import dbus.exceptions
import logging
import os
import sys
import configparser
from dbus.mainloop.glib import DBusGMainLoop
from gi.repository import GLib
sys.path.insert(1, "/opt/victronenergy/dbus-systemcalc-py/ext/velib_python")
from ve_utils import wrap_dbus_value
# Logging setup
logger = logging.getLogger()
for handler in logger.handlers[:]:
logger.removeHandler(handler)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger.setLevel(logging.INFO) # Default to DEBUG for better visibility
#logging.basicConfig(level=logging.INFO)
logging.info("Starting Generator Derating Monitor with file logging.")
# D-Bus service names and paths
VEBUS_SERVICE_BASE = "com.victronenergy.vebus"
GENERATOR_SERVICE_BASE = "com.victronenergy.generator"
TEMPERATURE_SERVICE_BASE = "com.victronenergy.temperature"
SETTINGS_SERVICE_NAME = "com.victronenergy.settings"
GPS_SERVICE_BASE = "com.victronenergy.gps"
DIGITAL_INPUT_SERVICE_BASE = "com.victronenergy.digitalinput"
SYSTEM_SERVICE = "com.victronenergy.system"
ALTITUDE_PATH = "/Altitude"
AC_ACTIVE_INPUT_CURRENT_LIMIT_PATH = "/Ac/ActiveIn/CurrentLimit"
TEMPERATURE_PATH = "/Temperature"
CUSTOM_NAME_PATH = "/CustomName"
STATE_PATH = "/State"
PRODUCT_NAME_PATH = "/ProductName"
BUS_ITEM_INTERFACE = "com.victronenergy.BusItem"
GENERATOR_CURRENT_LIMIT_PATH = "/Settings/TransferSwitch/GeneratorCurrentLimit"
# Transfer switch state values
GENERATOR_ON_VALUE = (12, 3)
SHORE_POWER_ON_VALUE = (13, 2)
# Gen Auto Current State Values
GEN_AUTO_CURRENT_OFF = 2
GEN_AUTO_CURRENT_ON = 3
# CORRECTED: Configuration file path
CONFIG_FILE_PATH = '/data/apps/auto_current/config.ini'
class GeneratorDeratingMonitor:
def __init__(self):
self.bus = dbus.SystemBus()
# Load settings from the config file
self._load_and_set_config()
self.vebus_service = None
self.outdoor_temp_service_name = None
self.generator_temp_service_name = None
self.gps_service_name = None
self.transfer_switch_service = None
self.settings_service_name = SETTINGS_SERVICE_NAME
self.gen_auto_current_service = None
self.gen_auto_current_state = None
self.previous_gen_auto_current_state = None
self.initial_derated_output_logged = False
self.initial_altitude = None
self.initial_outdoor_temp = None
self.initial_generator_temp = None
self.previous_ac_current_limit = None
self.previous_generator_current_limit_setting = None
self.outdoor_temp_fahrenheit = self.DEFAULT_OUTDOOR_TEMP_F
self.altitude_feet = self.DEFAULT_ALTITUDE_FEET
self.generator_temp_fahrenheit = self.DEFAULT_GENERATOR_TEMP_F
self.service_discovery_retries = 1
self.service_discovery_delay = 5
# Flags for Altitude conversion/array errors (existing logic)
self.altitude_warning_logged = False
self.altitude_value_logged_after_warning = False
# NEW: Flags for Generator Temperature Service D-Bus errors
self.generator_temp_warning_logged = False
self.generator_temp_value_logged_after_warning = False
# NEW: Flags for Outdoor Temperature Service D-Bus errors
self.outdoor_temp_warning_logged = False
self.outdoor_temp_value_logged_after_warning = False
# NEW: Flags for GPS/Altitude Service D-Bus errors
self.altitude_dbus_error_logged = False
self.altitude_dbus_value_logged_after_error = False
GLib.timeout_add_seconds(5, self._delayed_initialization)
def _load_and_set_config(self):
"""Loads settings from config file, with hardcoded defaults as fallback."""
config = configparser.ConfigParser()
# Set default values first
self.BASE_TEMPERATURE_THRESHOLD_F = 77.0
self.TEMP_COEFFICIENT = 0.00055
self.ALTITUDE_COEFFICIENT = 0.00003
self.BASE_GENERATOR_OUTPUT_AMPS = 60.5
self.OUTPUT_BUFFER = 0.9
self.HIGH_GENTEMP_THRESHOLD_F = 220.0
self.MEDIUM_GENTEMP_THRESHOLD_F = 212.0
self.HIGH_GENTEMP_REDUCTION = 0.85
self.MEDIUM_GENTEMP_REDUCTION = 0.90
self.DEFAULT_ALTITUDE_FEET = 1000.0
self.DEFAULT_GENERATOR_TEMP_F = 180.0
self.DEFAULT_OUTDOOR_TEMP_F = 77.0
if not os.path.exists(CONFIG_FILE_PATH):
logging.warning(f"Config file not found at {CONFIG_FILE_PATH}. Using default settings.")
return
try:
config.read(CONFIG_FILE_PATH)
logging.info(f"Successfully loaded settings from {CONFIG_FILE_PATH}")
# Read DeratingConstants
self.BASE_TEMPERATURE_THRESHOLD_F = config.getfloat('DeratingConstants', 'BaseTemperatureThresholdF', fallback=self.BASE_TEMPERATURE_THRESHOLD_F)
self.TEMP_COEFFICIENT = config.getfloat('DeratingConstants', 'TempCoefficient', fallback=self.TEMP_COEFFICIENT)
self.ALTITUDE_COEFFICIENT = config.getfloat('DeratingConstants', 'AltitudeCoefficient', fallback=self.ALTITUDE_COEFFICIENT)
self.BASE_GENERATOR_OUTPUT_AMPS = config.getfloat('DeratingConstants', 'BaseGeneratorOutputAmps', fallback=self.BASE_GENERATOR_OUTPUT_AMPS)
self.OUTPUT_BUFFER = config.getfloat('DeratingConstants', 'OutputBuffer', fallback=self.OUTPUT_BUFFER)
self.HIGH_GENTEMP_THRESHOLD_F = config.getfloat('DeratingConstants', 'HighGenTempThresholdF', fallback=self.HIGH_GENTEMP_THRESHOLD_F)
self.MEDIUM_GENTEMP_THRESHOLD_F = config.getfloat('DeratingConstants', 'MediumGenTempThresholdF', fallback=self.MEDIUM_GENTEMP_THRESHOLD_F)
self.HIGH_GENTEMP_REDUCTION = config.getfloat('DeratingConstants', 'HighGenTempReduction', fallback=self.HIGH_GENTEMP_REDUCTION)
self.MEDIUM_GENTEMP_REDUCTION = config.getfloat('DeratingConstants', 'MediumGenTempReduction', fallback=self.MEDIUM_GENTEMP_REDUCTION)
# Read DefaultSensorValues
self.DEFAULT_ALTITUDE_FEET = config.getfloat('DefaultSensorValues', 'DefaultAltitudeFeet', fallback=self.DEFAULT_ALTITUDE_FEET)
self.DEFAULT_GENERATOR_TEMP_F = config.getfloat('DefaultSensorValues', 'DefaultGeneratorTempF', fallback=self.DEFAULT_GENERATOR_TEMP_F)
self.DEFAULT_OUTDOOR_TEMP_F = config.getfloat('DefaultSensorValues', 'DefaultOutdoorTempF', fallback=self.DEFAULT_OUTDOOR_TEMP_F)
except (configparser.Error, ValueError) as e:
logging.error(f"Error reading config file {CONFIG_FILE_PATH}: {e}. Using default settings.")
def _find_service_once(self, find_function, service_name_attribute, service_description):
"""Attempts to find a service once and logs the result."""
find_function()
if getattr(self, service_name_attribute):
logging.info(f"Found {service_description}: {getattr(self, service_name_attribute)}")
return True
else:
logging.warning(f"Could not find {service_description}. Will retry in periodic monitoring.")
return False
def _delayed_initialization(self):
# Initial attempts to find services (without extensive retries here)
self._find_service_once(self._find_vebus_service, 'vebus_service', 'VE.Bus service')
self._find_service_once(self._find_outdoor_temperature_service, 'outdoor_temp_service_name', 'outdoor temperature service')
self._find_service_once(self._find_generator_temperature_service, 'generator_temp_service_name', 'generator temperature service')
self._find_service_once(self._find_gps_service_internal, 'gps_service_name', 'GPS service')
self._find_service_once(self._find_transfer_switch_input_internal, 'transfer_switch_service', 'transfer switch input service')
self._find_service_once(self._find_gen_auto_current_input_internal, 'gen_auto_current_service', "'Gen Auto Current' input service")
self._read_initial_values()
GLib.timeout_add(5000, self._periodic_monitoring)
return GLib.SOURCE_REMOVE
def _read_initial_values(self):
self._update_outdoor_temperature(log_update=False, log_initial=True)
self._update_altitude(log_update=False, log_initial=True)
self._update_generator_temperature(log_update=False, log_initial=True)
self._update_gen_auto_current_state(initial_read=True)
# Initial read of the generator current limit setting
current_limit, _ = self._get_dbus_value(self.settings_service_name, GENERATOR_CURRENT_LIMIT_PATH) # Unpack the tuple
if current_limit is not None:
self.previous_generator_current_limit_setting = round(float(current_limit), 1)
logging.info(f"Initial Generator Current Limit setting: {self.previous_generator_current_limit_setting:.1f} Amps")
# Initial read of the AC active input current limit
ac_limit, _ = self._get_dbus_value(self.vebus_service, AC_ACTIVE_INPUT_CURRENT_LIMIT_PATH) # Unpack the tuple
if ac_limit is not None:
self.previous_ac_current_limit = round(float(ac_limit), 1)
logging.info(f"Initial VE.Bus AC Active Input Current Limit: {self.previous_ac_current_limit:.1f} Amps")
def _find_service(self, service_base):
services = [name for name in self.bus.list_names() if name.startswith(service_base)]
return services[0] if services else None
def _find_vebus_service(self):
self.vebus_service = self._find_service(VEBUS_SERVICE_BASE)
def _get_dbus_value(self, service_name, path):
"""Returns (value, is_service_unknown_error)"""
if not service_name:
return None, False
try:
obj = self.bus.get_object(service_name, path)
interface = dbus.Interface(obj, BUS_ITEM_INTERFACE)
return interface.GetValue(), False
except dbus.exceptions.DBusException as e:
error_message = str(e)
is_service_unknown = "DBus.Error.ServiceUnknown" in error_message
if not is_service_unknown:
# Log other D-Bus errors normally (not ServiceUnknown)
logging.error(f"D-Bus error getting value from {service_name}{path}: {e}")
# Return None and the error flag
return None, is_service_unknown
except Exception as e:
# Catch other unexpected errors
logging.error(f"Unexpected error getting value from {service_name}{path}: {e}")
return None, False
def _set_dbus_value(self, service_name, path, value):
if not service_name: # Added check
logging.warning(f"Attempted to set D-Bus value for {path} but service_name is None.")
return
try:
obj = self.bus.get_object(service_name, path)
interface = dbus.Interface(obj, BUS_ITEM_INTERFACE)
interface.SetValue(wrap_dbus_value(value))
except dbus.exceptions.DBusException as e: # More specific exception
logging.error(f"D-Bus error setting value for {service_name}{path} to {value}: {e}")
except Exception as e: # Catch other unexpected errors
logging.error(f"Unexpected error setting value for {service_name}{path} to {value}: {e}")
def _find_outdoor_temperature_service(self):
self.outdoor_temp_service_name = None # Reset before search
temperature_services = [name for name in self.bus.list_names() if name.startswith(TEMPERATURE_SERVICE_BASE)]
for service_name in temperature_services:
try:
obj = self.bus.get_object(service_name, CUSTOM_NAME_PATH)
interface = dbus.Interface(obj, BUS_ITEM_INTERFACE)
custom_name = interface.GetValue()
logging.debug(f"Checking service: {service_name}, CustomName: '{custom_name}' for outdoor temperature.")
if custom_name and "Outdoor" in custom_name:
self.outdoor_temp_service_name = service_name
return
except dbus.exceptions.DBusException as e:
logging.debug(f"D-Bus error checking CustomName for {service_name}: {e}")
except Exception as e:
logging.debug(f"Unexpected error checking CustomName for {service_name}: {e}")
def _find_generator_temperature_service(self):
self.generator_temp_service_name = None # Reset before search
temperature_services = [name for name in self.bus.list_names() if name.startswith(TEMPERATURE_SERVICE_BASE)]
for service_name in temperature_services:
try:
obj = self.bus.get_object(service_name, CUSTOM_NAME_PATH)
interface = dbus.Interface(obj, BUS_ITEM_INTERFACE)
custom_name = interface.GetValue()
logging.debug(f"Checking service: {service_name}, CustomName: '{custom_name}' for generator temperature.")
if custom_name and any(keyword in custom_name for keyword in ["gen", "Gen", "generator", "Generator"]):
self.generator_temp_service_name = service_name
return
except dbus.exceptions.DBusException as e:
logging.debug(f"D-Bus error checking CustomName for {service_name}: {e}")
except Exception as e:
logging.debug(f"Unexpected error checking CustomName for {service_name}: {e}")
try:
obj = self.bus.get_object(service_name, PRODUCT_NAME_PATH)
interface = dbus.Interface(obj, BUS_ITEM_INTERFACE)
product_name = interface.GetValue()
logging.debug(f"Checking service: {service_name}, ProductName: '{product_name}' for generator temperature.")
if product_name and any(keyword in product_name for keyword in ["gen", "Gen", "generator", "Generator"]):
self.generator_temp_service_name = service_name
return
except dbus.exceptions.DBusException as e:
logging.debug(f"D-Bus error checking ProductName for {service_name}: {e}")
except Exception as e:
logging.debug(f"Unexpected error checking ProductName for {service_name}: {e}")
def _find_gps_service_internal(self): # Renamed to internal
self.gps_service_name = self._find_service(GPS_SERVICE_BASE)
def _find_transfer_switch_input_internal(self): # Renamed to internal
self.transfer_switch_service = None # Reset before search
service_names = [name for name in self.bus.list_names() if name.startswith(DIGITAL_INPUT_SERVICE_BASE)]
for service_name in service_names:
try:
obj = self.bus.get_object(service_name, PRODUCT_NAME_PATH)
interface = dbus.Interface(obj, BUS_ITEM_INTERFACE)
product_name = interface.GetValue()
logging.debug(f"Checking service: {service_name}, ProductName: '{product_name}' for transfer switch.")
if product_name and ("Transfer Switch" in product_name or "transfer switch" in product_name):
self.transfer_switch_service = service_name
return
except dbus.exceptions.DBusException as e:
logging.debug(f"D-Bus error checking product name for {service_name}: {e}")
except Exception as e:
logging.debug(f"Unexpected error checking product name for {service_name}: {e}")
def _find_gen_auto_current_input_internal(self): # Renamed to internal
self.gen_auto_current_service = None # Reset before search
service_names = [name for name in self.bus.list_names() if name.startswith(DIGITAL_INPUT_SERVICE_BASE)]
for service_name in service_names:
try:
obj = self.bus.get_object(service_name, PRODUCT_NAME_PATH)
interface = dbus.Interface(obj, BUS_ITEM_INTERFACE)
product_name = interface.GetValue()
logging.debug(f"Checking service: {service_name}, ProductName: '{product_name}' for Gen Auto Current.")
if product_name and ("Gen Auto Current" in product_name or "gen auto current" in product_name):
self.gen_auto_current_service = service_name
return
except dbus.exceptions.DBusException as e:
logging.debug(f"D-Bus error checking product name for {service_name}: {e}")
except Exception as e:
logging.debug(f"Unexpected error checking product name for {service_name}: {e}")
def _update_outdoor_temperature(self, log_update=True, log_initial=False):
if self.outdoor_temp_service_name:
temp_celsius, is_service_unknown = self._get_dbus_value(self.outdoor_temp_service_name, TEMPERATURE_PATH)
if temp_celsius is not None:
self.outdoor_temp_fahrenheit = (temp_celsius * 9/5) + 32
if log_initial and self.initial_outdoor_temp is None:
self.initial_outdoor_temp = self.outdoor_temp_fahrenheit
logging.info(f"Initial Outdoor Temperature: {self.initial_outdoor_temp:.2f} F")
elif log_update:
if self.outdoor_temp_warning_logged or not self.outdoor_temp_value_logged_after_warning:
logging.info(f"Outdoor Temperature: {self.outdoor_temp_fahrenheit:.2f} F. Valid value received.")
self.outdoor_temp_warning_logged = False # Reset warning flag
self.outdoor_temp_value_logged_after_warning = True # Set flag to prevent continuous info logs
else:
logging.debug(f"Updated outdoor temperature: {self.outdoor_temp_fahrenheit:.2f} F")
elif is_service_unknown:
if not self.outdoor_temp_warning_logged:
logging.warning(f"Outdoor Temperature Service '{self.outdoor_temp_service_name}' not available. Using previous or default value.")
self.outdoor_temp_warning_logged = True
self.outdoor_temp_value_logged_after_warning = False # Reset flag for next valid value
else:
logging.debug("Could not retrieve outdoor temperature from D-Bus. Service might be gone or path invalid.")
self.outdoor_temp_value_logged_after_warning = False
else:
logging.debug("Outdoor temperature service not found. Using default value.")
self.outdoor_temp_value_logged_after_warning = False
def _update_altitude(self, log_update=True, log_initial=False):
if self.gps_service_name:
altitude_raw, is_service_unknown = self._get_dbus_value(self.gps_service_name, ALTITUDE_PATH)
altitude_meters = None # Initialize to None
if altitude_raw is not None:
try:
# Handle dbus.Array case (sometimes returned by GPS)
if isinstance(altitude_raw, dbus.Array):
if altitude_raw:
altitude_meters = float(altitude_raw[0])
else:
if not self.altitude_warning_logged:
logging.warning("Received empty dbus.Array for altitude. Using previous or default altitude.")
self.altitude_warning_logged = True
self.altitude_value_logged_after_warning = False
else:
altitude_meters = float(altitude_raw)
if altitude_meters is not None:
self.altitude_feet = altitude_meters * 3.28084
# Log altitude update status
if log_initial and self.initial_altitude is None:
self.initial_altitude = self.altitude_feet
logging.info(f"Initial Altitude: {self.initial_altitude:.2f} feet")
elif log_update:
if self.altitude_dbus_error_logged or not self.altitude_dbus_value_logged_after_error:
logging.info(f"Updated altitude: {self.altitude_feet:.2f} feet. Valid value received.")
self.altitude_dbus_error_logged = False # Reset D-Bus warning flag
self.altitude_dbus_value_logged_after_error = True # Set flag to prevent continuous info logs
else:
logging.debug(f"Updated altitude: {self.altitude_feet:.2f} feet")
# Reset conversion/array warning flags on successful read
self.altitude_warning_logged = False
self.altitude_value_logged_after_warning = True
except (ValueError, TypeError) as e:
if not self.altitude_warning_logged:
logging.warning(f"Error converting altitude_raw '{altitude_raw}' to float: {e}. Using previous or default altitude.")
self.altitude_warning_logged = True
self.altitude_value_logged_after_warning = False
self.altitude_dbus_value_logged_after_error = False # Failed due to value, not D-Bus service
elif is_service_unknown:
if not self.altitude_dbus_error_logged:
logging.warning(f"GPS Service '{self.gps_service_name}' not available. Using previous or default altitude.")
self.altitude_dbus_error_logged = True
self.altitude_dbus_value_logged_after_error = False
else:
if not self.altitude_dbus_error_logged:
logging.warning("Could not retrieve altitude from D-Bus (Non-ServiceUnknown error). Using previous or default altitude.")
self.altitude_dbus_error_logged = True
self.altitude_dbus_value_logged_after_error = False
self.altitude_warning_logged = False # Clear this since it's a D-Bus path/read issue
else:
logging.debug("GPS service not found for altitude. Using default value.")
self.altitude_dbus_value_logged_after_error = False
self.altitude_warning_logged = False
def _update_generator_temperature(self, log_update=True, log_initial=False):
if self.generator_temp_service_name:
temp_celsius, is_service_unknown = self._get_dbus_value(self.generator_temp_service_name, TEMPERATURE_PATH)
if temp_celsius is not None:
self.generator_temp_fahrenheit = (temp_celsius * 9/5) + 32
if log_initial and self.initial_generator_temp is None:
self.initial_generator_temp = self.generator_temp_fahrenheit
logging.info(f"Initial Generator Temperature: {self.initial_generator_temp:.2f} F")
elif log_update:
if self.generator_temp_warning_logged or not self.generator_temp_value_logged_after_warning:
logging.info(f"Generator Temperature: {self.generator_temp_fahrenheit:.2f} F. Valid value received.")
self.generator_temp_warning_logged = False # Reset warning flag
self.generator_temp_value_logged_after_warning = True # Set flag to prevent continuous info logs
elif self.generator_temp_fahrenheit > 212.0:
logging.debug(f"Generator temperature above threshold: {self.generator_temp_fahrenheit:.2f} F")
else:
logging.debug(f"Generator temperature: {self.generator_temp_fahrenheit:.2f} F (below threshold)")
elif is_service_unknown:
if not self.generator_temp_warning_logged:
logging.warning(f"Generator Temperature Service '{self.generator_temp_service_name}' not available. Using previous or default value.")
self.generator_temp_warning_logged = True
self.generator_temp_value_logged_after_warning = False # Reset flag for next valid value
else:
logging.debug("Could not retrieve generator temperature from D-Bus. Service might be gone or path invalid.")
self.generator_temp_value_logged_after_warning = False
else:
logging.debug("Generator temperature service not found. Using default value.")
self.generator_temp_value_logged_after_warning = False
def _update_gen_auto_current_state(self, initial_read=False):
if self.gen_auto_current_service:
state, _ = self._get_dbus_value(self.gen_auto_current_service, STATE_PATH)
if state is not None:
state = int(state)
if initial_read:
self.gen_auto_current_state = state
self.previous_gen_auto_current_state = state
logging.info(f"Initial 'Gen Auto Current' state: {self.gen_auto_current_state} (ON: {GEN_AUTO_CURRENT_ON}, OFF: {GEN_AUTO_CURRENT_OFF})")
elif state != self.previous_gen_auto_current_state:
self.previous_gen_auto_current_state = self.gen_auto_current_state
self.gen_auto_current_state = state
logging.info(f"'Gen Auto Current' state changed to: {self.gen_auto_current_state} (ON: {GEN_AUTO_CURRENT_ON}, OFF: {GEN_AUTO_CURRENT_OFF})")
else:
self.gen_auto_current_state = state
logging.debug(f"'Gen Auto Current' state remains: {self.gen_auto_current_state} (ON: {GEN_AUTO_CURRENT_ON}, OFF: {GEN_AUTO_CURRENT_OFF})")
else:
logging.debug("Could not retrieve 'Gen Auto Current' state from D-Bus.")
else:
logging.debug("'Gen Auto Current' input service not found. Cannot read state.")
def _is_generator_running(self):
if self.transfer_switch_service:
state, _ = self._get_dbus_value(self.transfer_switch_service, STATE_PATH)
return state in GENERATOR_ON_VALUE
return False
def calculate_derating_factor(self, temperature_fahrenheit, altitude_feet, generator_temperature_fahrenheit):
temperature_multiplier = 1.0
altitude_multiplier = 1.0
generator_temp_multiplier = 1.0
if temperature_fahrenheit is not None:
if temperature_fahrenheit > self.BASE_TEMPERATURE_THRESHOLD_F:
temperature_multiplier = 1.0 - ((temperature_fahrenheit - self.BASE_TEMPERATURE_THRESHOLD_F) * self.TEMP_COEFFICIENT)
temperature_multiplier = max(0.0, temperature_multiplier)
if altitude_feet is not None:
altitude_multiplier = 1.0 - (altitude_feet * self.ALTITUDE_COEFFICIENT)
altitude_multiplier = max(0.0, altitude_multiplier)
if generator_temperature_fahrenheit is not None:
if generator_temperature_fahrenheit >= self.HIGH_GENTEMP_THRESHOLD_F:
generator_temp_multiplier = self.HIGH_GENTEMP_REDUCTION
elif generator_temperature_fahrenheit >= self.MEDIUM_GENTEMP_THRESHOLD_F:
generator_temp_multiplier = self.MEDIUM_GENTEMP_REDUCTION
return temperature_multiplier * altitude_multiplier * generator_temp_multiplier * self.OUTPUT_BUFFER
def _perform_derating(self):
if self.outdoor_temp_fahrenheit is not None and self.altitude_feet is not None and self.generator_temp_fahrenheit is not None:
derating_factor = self.calculate_derating_factor(
self.outdoor_temp_fahrenheit, self.altitude_feet, self.generator_temp_fahrenheit
)
derated_output_amps = self.BASE_GENERATOR_OUTPUT_AMPS * derating_factor
rounded_output = round(derated_output_amps, 1)
current_generator_limit_setting, _ = self._get_dbus_value(self.settings_service_name, GENERATOR_CURRENT_LIMIT_PATH)
if not self.initial_derated_output_logged:
self._set_dbus_value(self.settings_service_name, GENERATOR_CURRENT_LIMIT_PATH, rounded_output)
logging.info(f"Initial Transfer Switch Generator Current Limit set to: {rounded_output:.1f} Amps (due to auto derating)")
self.initial_derated_output_logged = True
elif current_generator_limit_setting is None or abs(float(current_generator_limit_setting) - rounded_output) > 0.01:
self._set_dbus_value(self.settings_service_name, GENERATOR_CURRENT_LIMIT_PATH, rounded_output)
logging.debug(f"Transfer Switch Generator Current Limit updated to: {rounded_output:.1f} Amps (due to auto derating)")
else:
logging.debug(f"Transfer Switch Generator Current Limit remains: {rounded_output:.1f} Amps")
else:
logging.warning("Not all temperature or altitude data available for derating. Skipping calculation.")
def _sync_generator_limit_to_ac_input(self):
if self.vebus_service and self._is_generator_running():
current_generator_limit_setting, _ = self._get_dbus_value(self.settings_service_name, GENERATOR_CURRENT_LIMIT_PATH)
if current_generator_limit_setting is not None:
rounded_gen_limit = round(float(current_generator_limit_setting), 1)
if self.previous_generator_current_limit_setting is None or abs(self.previous_generator_current_limit_setting - rounded_gen_limit) > 0.01:
self._set_dbus_value(self.vebus_service, AC_ACTIVE_INPUT_CURRENT_LIMIT_PATH, rounded_gen_limit)
logging.debug(f"Generator running: Synced VE.Bus AC Active Input Current Limit to Generator Current Limit ({rounded_gen_limit:.1f} Amps).")
self.previous_ac_current_limit = rounded_gen_limit
self.previous_generator_current_limit_setting = rounded_gen_limit
else:
logging.debug(f"Generator running: VE.Bus AC Active Input Current Limit already matches Generator Current Limit ({rounded_gen_limit:.1f} Amps).")
else:
logging.warning("Could not retrieve Generator Current Limit setting. Cannot sync to AC input.")
elif self.vebus_service:
logging.debug("Generator not running, AC Active Input Current Limit not synced from generator current limit setting.")
def _sync_generator_limit_from_ac_input(self):
if self.vebus_service and self._is_generator_running():
current_ac_limit, _ = self._get_dbus_value(self.vebus_service, AC_ACTIVE_INPUT_CURRENT_LIMIT_PATH)
if current_ac_limit is not None:
rounded_ac_limit = round(float(current_ac_limit), 1)
if self.previous_ac_current_limit is None or abs(rounded_ac_limit - self.previous_ac_current_limit) > 0.01:
current_gen_limit, _ = self._get_dbus_value(self.settings_service_name, GENERATOR_CURRENT_LIMIT_PATH)
if current_gen_limit is None or abs(float(current_gen_limit) - rounded_ac_limit) > 0.01:
self._set_dbus_value(self.settings_service_name, GENERATOR_CURRENT_LIMIT_PATH, rounded_ac_limit)
logging.info(f"Generator running and Active AC Current Limit has been manually changed: Synced Generator Current Limit to VE.Bus AC Active Input Current Limit ({rounded_ac_limit:.1f} Amps).")
self.previous_generator_current_limit_setting = rounded_ac_limit
self.previous_ac_current_limit = rounded_ac_limit
else:
logging.debug(f"Generator running: VE.Bus AC Active Input Current Limit ({rounded_ac_limit:.1f} Amps) has not changed.")
else:
logging.warning("Could not retrieve VE.Bus AC Active Input Current Limit. Cannot sync to generator current limit.")
elif self.vebus_service:
if not self._is_generator_running():
logging.debug("Generator not running, AC Active Input Current Limit not synced to generator current limit.")
elif self.gen_auto_current_state == GEN_AUTO_CURRENT_ON:
logging.debug(f"'Gen Auto Current' is ON ({GEN_AUTO_CURRENT_ON}), AC Active Input Current Limit not synced to generator current limit.")
def _periodic_monitoring(self):
if not self.vebus_service:
self._find_service_once(self._find_vebus_service, 'vebus_service', 'VE.Bus service')
if not self.outdoor_temp_service_name:
self._find_service_once(self._find_outdoor_temperature_service, 'outdoor_temp_service_name', 'outdoor temperature service')
if not self.generator_temp_service_name:
self._find_service_once(self._find_generator_temperature_service, 'generator_temp_service_name', 'generator temperature service')
if not self.gps_service_name:
self._find_service_once(self._find_gps_service_internal, 'gps_service_name', 'GPS service')
if not self.transfer_switch_service:
self._find_service_once(self._find_transfer_switch_input_internal, 'transfer_switch_service', 'transfer switch input service')
if not self.gen_auto_current_service:
self._find_service_once(self._find_gen_auto_current_input_internal, 'gen_auto_current_service', "'Gen Auto Current' input service")
self._update_outdoor_temperature()
self._update_altitude()
self._update_generator_temperature()
self._update_gen_auto_current_state()
self._sync_generator_limit_to_ac_input()
if self._is_generator_running() and self.gen_auto_current_state == GEN_AUTO_CURRENT_OFF:
self._sync_generator_limit_from_ac_input()
else:
logging.debug(f"Generator not running or 'Gen Auto Current' is ON ({self.gen_auto_current_state}). Skipping sync from AC input.")
if self.gen_auto_current_state == GEN_AUTO_CURRENT_ON:
self._perform_derating()
else:
logging.debug(f"Gen Auto Current state is not ON ({GEN_AUTO_CURRENT_ON}). Current state: {self.gen_auto_current_state}")
return True
def main():
DBusGMainLoop(set_as_default=True)
GeneratorDeratingMonitor()
mainloop = GLib.MainLoop()
mainloop.run()
if __name__ == "__main__":
main()