Skip to content

Commit b4128ac

Browse files
authored
Merge pull request #54 from bmwcarit/add_dlt_file_spinner
Add DltFileSpinner to fetch message for DltBroker
2 parents cfd6ccd + d233bdc commit b4128ac

9 files changed

Lines changed: 920 additions & 69 deletions

dlt/dlt.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import struct
1010
import time
1111
import threading
12+
import multiprocessing
1213

1314
from dlt.core import (
1415
cDLTFilter,
@@ -613,7 +614,10 @@ def __init__(self, **kwords):
613614
self.indexed = False
614615
self.end = False
615616
self.live_run = kwords.pop("is_live", False)
617+
# Stop event for threading usage in caller
616618
self.stop_reading = threading.Event()
619+
# Stop event for process usage in caller
620+
self.stop_reading_proc = multiprocessing.Event()
617621

618622
def __repr__(self):
619623
# pylint: disable=bad-continuation
@@ -764,7 +768,7 @@ def __getitem__(self, index):
764768
def _open_file(self):
765769
"""Open the configured file for processing"""
766770
file_opened = False
767-
while not self.stop_reading.is_set():
771+
while not self.stop_reading.is_set() and not self.stop_reading_proc.is_set():
768772
if dltlib.dlt_file_open(ctypes.byref(self), self.filename, self.verbose) >= DLT_RETURN_OK:
769773
file_opened = True
770774
break
@@ -801,7 +805,9 @@ def __iter__(self): # pylint: disable=too-many-branches
801805
self._open_file()
802806

803807
found_data = False
804-
while not self.stop_reading.is_set() or corruption_check_try: # pylint: disable=too-many-nested-blocks
808+
while (
809+
not self.stop_reading.is_set() and not self.stop_reading_proc.is_set()
810+
) or corruption_check_try: # pylint: disable=too-many-nested-blocks
805811
os_stat = os.stat(self.filename)
806812
mtime = os_stat.st_mtime
807813

@@ -1039,6 +1045,34 @@ def client_loop(self):
10391045
dltlib.dlt_client_main_loop(ctypes.byref(self), None, self.verbose)
10401046

10411047

1048+
def py_dlt_file_main_loop(dlt_reader, limit=None, callback=None):
1049+
"""Main loop to read dlt messages from dlt file."""
1050+
try:
1051+
for msg in dlt_reader:
1052+
logger.debug(
1053+
"Message from position %d and counter %d: %s", dlt_reader.file_position, dlt_reader.counter, msg
1054+
)
1055+
1056+
# send the message to the callback and check whether we
1057+
# need to continue
1058+
if callback and not callback(msg):
1059+
logger.debug("callback returned 'False'. Stopping main loop")
1060+
return False
1061+
1062+
if limit:
1063+
limit -= 1
1064+
if limit == 0:
1065+
break
1066+
except IOError as err:
1067+
# If the dlt file is empty, main_loop should not break, so it returns True
1068+
if str(err) == DLT_EMPTY_FILE_ERROR:
1069+
logger.debug("Dlt file is empty now. Wait until content is written")
1070+
return True
1071+
raise err
1072+
1073+
return True
1074+
1075+
10421076
# pylint: disable=too-many-arguments,too-many-return-statements,too-many-branches
10431077
def py_dlt_client_main_loop(client, limit=None, verbose=0, dumpfile=None, callback=None):
10441078
"""Reimplementation of dlt_client.c:dlt_client_main_loop() in order to handle callback

dlt/dlt_broker.py

Lines changed: 68 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
DLT_DAEMON_TCP_PORT,
1212
DLTContextHandler,
1313
DLTFilterAckMessageHandler,
14+
DLTMessageDispatcherBase,
1415
DLTMessageHandler,
16+
DLTFileSpinner,
1517
DLTTimeValue,
1618
)
1719

@@ -39,7 +41,7 @@ class DLTBroker(object):
3941

4042
def __init__(
4143
self,
42-
ip_address,
44+
ip_address=None,
4345
port=DLT_DAEMON_TCP_PORT,
4446
use_proxy=False,
4547
enable_dlt_time=False,
@@ -50,8 +52,10 @@ def __init__(
5052
):
5153
"""Initialize the DLT Broker
5254
53-
:param str ip_address: IP address of the DLT Daemon. Defaults to TCP connection, unless a multicast address is
54-
used. In that case an UDP multicast connection will be used
55+
:param str | None ip_address: IP address of the DLT Daemon.
56+
If None, then dlt does not come with any ip listening, in other words, it comes from dlt log directly;
57+
Else, dlt comes from listening to some ip address. Defaults to TCP connection,
58+
unless a multicast address is used. In that case an UDP multicast connection will be used
5559
:param str post: Port of the DLT Daemon
5660
:param bool use_proxy: Ignored - compatibility option
5761
:param bool enable_dlt_time: Record the latest dlt message timestamp if enabled.
@@ -82,33 +86,59 @@ def __init__(
8286
self.filter_ack_queue = None
8387
self.filter_ack_msg_handler = None
8488

85-
kwargs["ip_address"] = ip_address
86-
kwargs["port"] = port
87-
kwargs["timeout"] = kwargs.get("timeout", DLT_CLIENT_TIMEOUT)
88-
self.msg_handler = DLTMessageHandler(
89+
self.msg_handler = self.create_dlt_message_dispather(ip_address, port, kwargs)
90+
91+
self.context_handler = DLTContextHandler(self.filter_queue, self.message_queue)
92+
93+
self._ip_address = ip_address
94+
self._port = port
95+
self._filename = kwargs.get("filename")
96+
97+
def create_dlt_message_dispather(self, ip_address, port, client_cfg) -> DLTMessageDispatcherBase:
98+
if ip_address:
99+
# If ip_address is given, then messages are retrieved from dlt client at run-time
100+
return self._create_dlt_message_handler(ip_address, port, client_cfg)
101+
else:
102+
# If not ip_address is given, then messages are retrieved from the given filename
103+
# The logs are written to the given filename from another process
104+
return self._create_dlt_file_spinner(client_cfg.get("filename"))
105+
106+
def _create_dlt_message_handler(self, ip_address, port, client_cfg):
107+
client_cfg["ip_address"] = ip_address
108+
client_cfg["port"] = port
109+
client_cfg["timeout"] = client_cfg.get("timeout", DLT_CLIENT_TIMEOUT)
110+
return DLTMessageHandler(
89111
self.filter_queue,
90112
self.message_queue,
91113
self.mp_stop_flag,
92-
kwargs,
114+
client_cfg,
93115
dlt_time_value=self._dlt_time_value,
94116
filter_ack_queue=self.filter_ack_queue,
95117
)
96-
self.context_handler = DLTContextHandler(self.filter_queue, self.message_queue)
97118

98-
self._ip_address = ip_address
99-
self._port = port
100-
self._filename = kwargs.get("filename")
119+
def _create_dlt_file_spinner(self, file_name):
120+
return DLTFileSpinner(
121+
self.filter_queue,
122+
self.message_queue,
123+
self.mp_stop_flag,
124+
file_name,
125+
dlt_time_value=self._dlt_time_value,
126+
filter_ack_queue=self.filter_ack_queue,
127+
)
101128

102129
def start(self):
103130
"""DLTBroker main worker method"""
104-
logger.debug(
105-
"Starting DLTBroker with parameters: use_proxy=%s, ip_address=%s, port=%s, filename=%s, multicast=%s",
106-
False,
107-
self._ip_address,
108-
self._port,
109-
self._filename,
110-
ip.ip_address(self._ip_address).is_multicast,
111-
)
131+
if isinstance(self.msg_handler, DLTMessageHandler):
132+
logger.debug(
133+
"Starting DLTBroker with parameters: use_proxy=%s, ip_address=%s, port=%s, filename=%s, multicast=%s",
134+
False,
135+
self._ip_address,
136+
self._port,
137+
self._filename,
138+
ip.ip_address(self._ip_address).is_multicast,
139+
)
140+
else:
141+
logger.debug("Starting DLTBroker by reading %s", self._filename)
112142

113143
if self._dlt_time_value:
114144
logger.debug("Enable dlt time for DLTBroker.")
@@ -166,12 +196,23 @@ def add_context(self, context_queue, filters=None):
166196
)
167197

168198
if not self._recv_filter_set_ack(context_filter_ack_queue, True):
199+
failure_reason = ""
200+
if isinstance(self.msg_handler, DLTMessageHandler):
201+
failure_reason = (
202+
"It's possible that DLTClient client does not start."
203+
" If it's a test case, it might be an error"
204+
)
205+
elif isinstance(self.msg_handler, DLTFileSpinner):
206+
failure_reason = (
207+
f"It's possible that dlt file {self._filename} is empty now. No big issue, "
208+
f"filters would be added once after new message is available in dlt file"
209+
)
169210
logger.warning(
170211
(
171-
"Could not receive filter-setting messge ack. It's possible that DLTClient client does "
172-
"not start. If it's a test case. It might be an error. For now, Run it anyway. "
212+
"Could not receive filter-setting message ack. %s. For now, Run it anyway. "
173213
"filters: %s, queue_id: %s"
174214
),
215+
failure_reason,
175216
filters,
176217
id(context_queue),
177218
)
@@ -187,9 +228,11 @@ def remove_context(self, context_queue):
187228

188229
def stop(self):
189230
"""Stop the broker"""
190-
logger.info("Stopping DLTContextHandler and DLTMessageHandler")
231+
logger.info("Stopping DLTContextHandler and %s", type(self.msg_handler).__name__)
232+
233+
self.msg_handler.break_blocking_main_loop()
191234

192-
logger.debug("Stop DLTMessageHandler")
235+
logger.debug("Stop %s", type(self.msg_handler).__name__)
193236
self.mp_stop_flag.set()
194237

195238
logger.debug("Stop DLTContextHandler")
@@ -205,7 +248,7 @@ def stop(self):
205248
logger.debug("Waiting on DLTFilterAckMessageHandler ending")
206249
self.filter_ack_msg_handler.join()
207250

208-
logger.debug("Waiting on DLTMessageHandler ending")
251+
logger.debug("Waiting on %s ending", type(self.msg_handler).__name__)
209252
if self.msg_handler.is_alive():
210253
try:
211254
self.msg_handler.terminate()

0 commit comments

Comments
 (0)