-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathfixed_interface.py
More file actions
executable file
·55 lines (43 loc) · 1.81 KB
/
fixed_interface.py
File metadata and controls
executable file
·55 lines (43 loc) · 1.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
"""
The meshtastic devs, in their infinite wisdom, decided to put the meshtastic heartbeat on a separate thread.
This is fine and dandy until the connection drops and the heartbeat fails, which does raise an exception, but is never caught.
Here, we setup a communication channel that exposes an 'is_alive' method that can be used to reset the interface on error.
Very useful over unstable wifi connections.
If you are still having issues, you may need to switch to a serial (USB) connection to the meshtastic node.
"""
from log_f import logger
import meshtastic
import threading, time
import meshtastic.mesh_interface
from meshtastic.protobuf import mesh_pb2
import meshtastic.tcp_interface
class Injector:
def __init__(self, interface_generator):
self.interface_generator = interface_generator
self.inject_interface()
def inject_interface(self):
while True:
try:
self.interface:meshtastic.tcp_interface.TCPInterface = self.interface_generator()
break
except:
logger.warning("Meshtastic interface creation failed, retrying...")
time.sleep(10)
self.interface.sendHeartbeat = self.customsendHeartbeat
logger.info(f"Successfully injected custom heartbeat")
def customsendHeartbeat(self):
try:
self.interface.socket = None
p = mesh_pb2.ToRadio()
p.heartbeat.CopyFrom(mesh_pb2.Heartbeat())
self.interface._sendToRadio(p)
except Exception as e:
del self.interface
self.inject_interface()
if __name__ == "__main__":
def generator():
interface = meshtastic.tcp_interface.TCPInterface("192.168.2.169")
return interface
Injector(generator)
while True:
time.sleep(1)