Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CommonLib/SocketHelper.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def sendData(self, simState, simTime, sock):
byte_index = byte_index + self.msg_header_size
vehicle_byte_index = 0
vehicle_data_buffer = bytearray(65536)
# TODO: add traffic light and detector data
# Not yet implemented - see GitHub issue #130
traffic_light_data_buffer = bytearray(8096)
detector_data_buffer = bytearray(8096)
data_buffer = bytearray(81728)
Expand Down
4 changes: 4 additions & 0 deletions CommonLib/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .ConfigHelper import ConfigHelper
from .MsgHelper import MsgHelper
from .SocketHelper import SocketHelper
from .VehDataMsgDefs import VehData
4 changes: 3 additions & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ dependencies:
- plotly
- pygame
- protobuf
- pytest
- pyyaml
- vs2015_runtime
- vc14_runtime
- pip
- pip:
- ./Carla/carla-0.9.15-cp310-cp310-win_amd64.whl
- ./Carla/carla-0.9.15-cp310-cp310-win_amd64.whl
7 changes: 1 addition & 6 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
# External software (not installed via pip)
# vissim==11.0
# sumo==1.13.0
# carmaker==10.0.1
# matlab==9.7

# Python packages
easydict>=1.9
matplotlib>=3.5.0
numpy>=1.23.0
pandas>=1.5.0
pytest>=7.0.0
pyyaml>=6.0
ruamel.yaml>=0.17.0
sphinx>=5.0.0
Expand Down
5 changes: 3 additions & 2 deletions tests/Python/SimpleEchoClient/simple_echo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
import socket
import sys
import os
import pathlib

# Add parent directory to path to import CommonLib
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..')))
# Add repo root to path to import CommonLib
sys.path.insert(0, str(pathlib.Path(__file__).parents[3]))

from CommonLib.SocketHelper import SocketHelper
from CommonLib.MsgHelper import MsgHelper
Expand Down
5 changes: 3 additions & 2 deletions tests/Python/SimpleTrafficLight/echo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
import socket
import sys
import os
import pathlib

# Add CommonLib to path
repo_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..'))
# Add repo root to path to import CommonLib
repo_root = str(pathlib.Path(__file__).parents[3])
if repo_root not in sys.path:
sys.path.insert(0, repo_root)

Expand Down
4 changes: 4 additions & 0 deletions tests/Python/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import sys
import pathlib
# Add repo root to path for all tests
sys.path.insert(0, str(pathlib.Path(__file__).parents[2]))
5 changes: 5 additions & 0 deletions tests/Python/pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[pytest]
testpaths = .
python_files = test_*.py
python_classes = Test*
python_functions = test_*
Empty file added tests/Python/unit/__init__.py
Empty file.
85 changes: 85 additions & 0 deletions tests/Python/unit/test_config_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import pytest
import os
from CommonLib.ConfigHelper import ConfigHelper

SIMPLE_ECHO_CONFIG = os.path.join(
os.path.dirname(__file__), '..', 'SimpleEchoClient', 'config.yaml'
)


def test_load_simple_echo_config():
"""ConfigHelper can load the SimpleEchoClient config without errors."""
if not os.path.exists(SIMPLE_ECHO_CONFIG):
pytest.skip("config file not found")
ch = ConfigHelper()
ch.getConfig(SIMPLE_ECHO_CONFIG)
assert ch.simulation_setup is not None


def test_config_simulation_setup_keys():
"""Required SimulationSetup fields are populated after loading config."""
if not os.path.exists(SIMPLE_ECHO_CONFIG):
pytest.skip("config file not found")
ch = ConfigHelper()
ch.getConfig(SIMPLE_ECHO_CONFIG)

assert ch.simulation_setup['SelectedTrafficSimulator'] == 'SUMO'
assert ch.simulation_setup['TrafficSimulatorIP'] == '127.0.0.1'
assert isinstance(ch.simulation_setup['TrafficSimulatorPort'], int)
assert ch.simulation_setup['EnableRealSim'] is True
assert ch.simulation_setup['EnableExternalDynamics'] is True


def test_config_vehicle_message_field():
"""VehicleMessageField is a non-empty list of strings."""
if not os.path.exists(SIMPLE_ECHO_CONFIG):
pytest.skip("config file not found")
ch = ConfigHelper()
ch.getConfig(SIMPLE_ECHO_CONFIG)

fields = ch.simulation_setup['VehicleMessageField']
assert isinstance(fields, list)
assert len(fields) > 0
for f in fields:
assert isinstance(f, str)


def test_config_application_setup():
"""ApplicationSetup is populated with VehicleSubscription list."""
if not os.path.exists(SIMPLE_ECHO_CONFIG):
pytest.skip("config file not found")
ch = ConfigHelper()
ch.getConfig(SIMPLE_ECHO_CONFIG)

assert ch.application_setup['EnableApplicationLayer'] is True
subs = ch.application_setup['VehicleSubscription']
assert isinstance(subs, list)
assert len(subs) > 0
first = subs[0]
assert 'type' in first
assert 'ip' in first
assert 'port' in first


def test_config_defaults_for_missing_fields():
"""ConfigHelper returns sensible defaults for fields not present in yaml."""
if not os.path.exists(SIMPLE_ECHO_CONFIG):
pytest.skip("config file not found")
ch = ConfigHelper()
ch.getConfig(SIMPLE_ECHO_CONFIG)

# SimulationMode not in config — should default to 0
assert ch.simulation_setup['SimulationMode'] == 0
# SimulationEndTime not in config — should default to 90000
assert ch.simulation_setup['SimulationEndTime'] == 90000.0


def test_reset_config():
"""resetConfig clears all setup dicts."""
if not os.path.exists(SIMPLE_ECHO_CONFIG):
pytest.skip("config file not found")
ch = ConfigHelper()
ch.getConfig(SIMPLE_ECHO_CONFIG)
ch.resetConfig()
assert len(ch.simulation_setup) == 0
assert len(ch.application_setup) == 0
172 changes: 172 additions & 0 deletions tests/Python/unit/test_msg_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import struct
import pytest
from CommonLib.MsgHelper import MsgHelper, MessageType
from CommonLib.VehDataMsgDefs import VehData


def make_msg_helper_with_fields(*fields):
"""Return a MsgHelper with the given vehicle fields enabled."""
mh = MsgHelper()
mh.set_vehicle_message_field(list(fields))
return mh


class TestMsgHelperInit:
def test_default_fields_all_false(self):
mh = MsgHelper()
for key, val in mh.vehicle_msg_field_valid.items():
assert val is False, f"Expected {key} to be False by default"

def test_header_sizes(self):
mh = MsgHelper()
assert mh.msg_header_size == 9
assert mh.msg_each_header_size == 3


class TestSetVehicleMessageField:
def test_set_fields_marks_valid(self):
mh = MsgHelper()
mh.set_vehicle_message_field(['id', 'speed', 'positionX'])
assert mh.vehicle_msg_field_valid['id'] is True
assert mh.vehicle_msg_field_valid['speed'] is True
assert mh.vehicle_msg_field_valid['positionX'] is True

def test_unset_fields_remain_false(self):
mh = MsgHelper()
mh.set_vehicle_message_field(['id', 'speed'])
assert mh.vehicle_msg_field_valid['positionY'] is False
assert mh.vehicle_msg_field_valid['heading'] is False


class TestPackUnpackString:
def test_roundtrip_short_string(self):
buf = bytearray(64)
packed, idx = MsgHelper.pack_string(buf, 0, "hello")
assert idx == 6 # 1 byte length + 5 bytes string
result, length, new_idx, arr = MsgHelper.depack_string(packed, 0)
assert result == "hello"
assert length == 5
assert new_idx == 6

def test_roundtrip_empty_string(self):
buf = bytearray(16)
packed, idx = MsgHelper.pack_string(buf, 0, "")
assert idx == 1
result, length, new_idx, arr = MsgHelper.depack_string(packed, 0)
assert result == ""
assert length == 0

def test_roundtrip_with_offset(self):
buf = bytearray(64)
# Put some bytes at start then pack string at offset 4
MsgHelper.pack_string(buf, 4, "abc")
result, length, _, _ = MsgHelper.depack_string(buf, 4)
assert result == "abc"
assert length == 3


class TestUnpackHelpers:
def test_unpack_float(self):
raw = struct.pack('f', 3.14)
val, idx = MsgHelper.unpack_float(raw, 0)
assert abs(val - 3.14) < 0.001
assert idx == 4

def test_unpack_int32(self):
raw = struct.pack('i', -42)
val, idx = MsgHelper.unpack_int32(raw, 0)
assert val == -42
assert idx == 4

def test_unpack_uint32(self):
raw = struct.pack('I', 999999)
val, idx = MsgHelper.unpack_uint32(raw, 0)
assert val == 999999

def test_unpack_uint16(self):
raw = struct.pack('H', 1234)
val, idx = MsgHelper.unpack_uint16(raw, 0)
assert val == 1234
assert idx == 2

def test_unpack_uint8(self):
raw = bytes([255])
val, idx = MsgHelper.unpack_uint8(raw, 0)
assert val == 255
assert idx == 1

def test_unpack_int8(self):
raw = struct.pack('b', -1)
val, idx = MsgHelper.unpack_int8(raw, 0)
assert val == -1
assert idx == 1


class TestPackDepackVehData:
def test_roundtrip_id_and_speed(self):
mh = make_msg_helper_with_fields('id', 'speed')
veh = VehData()
veh.id = 'car1'
veh.speed = 15.5

buf = bytearray(256)
packed, msg_size, end_idx = mh.pack_veh_data(buf, 0, veh)

# depack_veh_data skips the 3-byte sub-header
unpacked = mh.depack_veh_data(bytes(packed[mh.msg_each_header_size:end_idx]))
assert unpacked.id == 'car1'
assert abs(unpacked.speed - 15.5) < 0.001

def test_roundtrip_position_fields(self):
mh = make_msg_helper_with_fields('positionX', 'positionY', 'positionZ', 'heading')
veh = VehData()
veh.positionX = 100.0
veh.positionY = 200.0
veh.positionZ = 5.0
veh.heading = 1.57

buf = bytearray(256)
packed, msg_size, end_idx = mh.pack_veh_data(buf, 0, veh)
unpacked = mh.depack_veh_data(bytes(packed[mh.msg_each_header_size:end_idx]))

assert abs(unpacked.positionX - 100.0) < 0.01
assert abs(unpacked.positionY - 200.0) < 0.01
assert abs(unpacked.positionZ - 5.0) < 0.01
assert abs(unpacked.heading - 1.57) < 0.01

def test_roundtrip_multiple_string_fields(self):
mh = make_msg_helper_with_fields('id', 'type', 'vehicleClass')
veh = VehData()
veh.id = 'ego'
veh.type = 'car'
veh.vehicleClass = 'passenger'

buf = bytearray(256)
packed, msg_size, end_idx = mh.pack_veh_data(buf, 0, veh)
unpacked = mh.depack_veh_data(bytes(packed[mh.msg_each_header_size:end_idx]))

assert unpacked.id == 'ego'
assert unpacked.type == 'car'
assert unpacked.vehicleClass == 'passenger'

def test_message_type_byte_is_vehicle_data(self):
mh = make_msg_helper_with_fields('id')
veh = VehData()
veh.id = 'v1'

buf = bytearray(256)
packed, _, _ = mh.pack_veh_data(buf, 0, veh)

# byte 2 of sub-header is the message type
assert packed[2] == MessageType.vehicle_data

def test_pack_header(self):
mh = MsgHelper()
buf = bytearray(256)
packed, idx = mh.pack_msg_header(buf, 1, 5.5, 100)
assert idx == mh.msg_header_size

sim_state, sim_time, total = mh.depack_msg_header(bytes(buf[:mh.msg_header_size]))
assert sim_state == 1
assert abs(sim_time - 5.5) < 0.001
assert total == 100
Loading