-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.py
More file actions
157 lines (119 loc) · 5.09 KB
/
main.py
File metadata and controls
157 lines (119 loc) · 5.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""pycluon microservice for a ARS-300 radar"""
import json
import logging
import warnings
from pathlib import Path
from typing import Callable, Dict
from datetime import datetime
from environs import Env
from streamz import Stream
from pycluon import Envelope, OD4Session
from pycluon.importer import import_odvd
import cantools
import can
# Reading config from environment variables
env = Env()
CLUON_CID = env.int("CLUON_CID", 111)
CLUON_SENDER_ID = env.int("CLUON_SENDER_ID", 1)
LOG_LEVEL = env.log_level("LOG_LEVEL", logging.WARNING)
CANBUS_CHANNEL = env("CANBUS_CHANNEL")
CANBUS_TYPE = env("CANBUS_TYPE")
# Setup logger
logging.basicConfig(level=LOG_LEVEL)
logging.captureWarnings(True)
LOGGER = logging.getLogger("cluon-ARS300")
## Import and generate code for memo
THIS_DIR = Path(__file__).parent
memo = import_odvd(THIS_DIR / "memo" / "memo.odvd")
# OD4 session setup
session = OD4Session(CLUON_CID)
# Load can message database
db = cantools.database.Database()
db.add_dbc_file(THIS_DIR / "can_database_ch0.dbc")
# And fetch specifications
CAN1_Target_Status = db.get_message_by_name("CAN1_Target_Status")
CAN1_Target_1 = db.get_message_by_name("CAN1_Target_1")
CAN1_Target_2 = db.get_message_by_name("CAN1_Target_2")
stateOutput = db.get_message_by_name("RadarState")
def receive_from_canbus(bus: can.interface.Bus, injector: Callable):
"""Blocking function which receives from a can bus infinetly and
injects into an injector callable"""
# maxTargets = 96 ## Not used?
max_near = 32
# maxFar = 64 ## Not used?
n_near = 0
n_far = 0
time_stamp = 0
frame = []
while True:
try:
# Receive from bus
message = bus.recv()
LOGGER.debug("CANBUS received: %s", message)
# If this is a target status message
if message.arbitration_id == CAN1_Target_Status.frame_id: # 0x600:
# Send all targets in previous frame (if any)
if frame:
injector({"timestamp": time_stamp, "targets": frame})
frame.clear() # Clear data from previous frame
# Decode this message
target_status = db.decode_message(message.arbitration_id, message.data)
# Fetch timestamp for the whole frame
time_stamp = message.timestamp
# Fetch info about number of valid targets
n_near = target_status.get("NoOfTargetsNear")
n_far = target_status.get("NoOfTargetsFar")
# If this is a target 1 message
if message.arbitration_id == CAN1_Target_1.frame_id:
# Decode
target1 = db.decode_message(message.arbitration_id, message.data)
# Try to fetch the subsequent target 2 message
message = bus.recv()
LOGGER.debug("CANBUS received: %s", message)
if message.arbitration_id == CAN1_Target_2.frame_id:
# Decode
target2 = db.decode_message(message.arbitration_id, message.data)
# Merge the messages
target = {
**target1,
**target2,
}
# Check for mismatch (enough with a warning?)
if not target.get("NoOfTarget_1") == target.get("NoOfTarget_2"):
warnings.warn("Message mismatch!")
# Only append to frame if this is a valid target
if (target.get("NoOfTarget_1") <= n_near) or (
max_near < target.get("NoOfTarget_1") <= (max_near + n_far)
):
frame.append(target)
except Exception: # pylint: disable=broad-except
LOGGER.exception("Something went wrong in the reception from the CAN bus!")
def frame_handler(frame: Dict):
"""Handle a single frame"""
LOGGER.debug("Frame handler received frame to handle: %s", frame)
sample_time = datetime.fromtimestamp(frame.get("timestamp"))
# Insert into a memo.raw.Brefv message
msg_id = 10000 # Acording to odvd-file
msg = memo.memo_raw_Raw()
msg.data = json.dumps(frame)
return [(msg_id, sample_time, msg)]
def cluon_send(message_requests):
"""Send opendlv messages according to requests"""
for request in message_requests:
msg_id, sample_time, msg = request
envelope = Envelope()
envelope.sampled_at = sample_time
envelope.sent_at = datetime.now()
envelope.serialized_data = msg.SerializeToString()
envelope.data_type = msg_id
envelope.sender_stamp = CLUON_SENDER_ID
session.send(envelope)
if __name__ == "__main__":
# Building processing pipeline
source = Stream()
source.map(frame_handler).sink(cluon_send)
# Open can bus. Make sure it is the right channel!
channel = int(CANBUS_CHANNEL) if CANBUS_TYPE in ("kvaser",) else CANBUS_CHANNEL
can_bus = can.interface.Bus(bustype=CANBUS_TYPE, channel=channel, bitrate=500000)
# Start processing messages
receive_from_canbus(can_bus, source.emit)