Skip to content

Commit 145971d

Browse files
fix common connection issues for MQTT to support pub/sub
1 parent 6c910c9 commit 145971d

File tree

5 files changed

+64
-23
lines changed

5 files changed

+64
-23
lines changed

src/oshconnect/csapi4py/default_api_helpers.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,29 @@ def retrieve_resource(self, res_type: APIResourceTypes, res_id: str = None, pare
137137
headers=req_headers)
138138
return api_request.make_request()
139139

140+
def get_resource(self, resource_type: APIResourceTypes, resource_id: str = None,
141+
subresource_type: APIResourceTypes = None,
142+
req_headers: dict = None):
143+
144+
"""
145+
Helper to get resources by type, specifically by id, and optionally a sub-resource collection of a specified resource.
146+
:param resource_type:
147+
:param resource_id:
148+
:param subresource_type:
149+
:param req_headers:
150+
:return:
151+
"""
152+
if req_headers is None:
153+
req_headers = {}
154+
base_api_url = self.get_api_root_url()
155+
resource_type_str = resource_type_to_endpoint(resource_type)
156+
res_id_str = f'/{resource_id}' if resource_id else ""
157+
sub_res_type_str = f'/{resource_type_to_endpoint(subresource_type)}' if subresource_type else ""
158+
complete_url = f'{base_api_url}/{resource_type_str}{res_id_str}{sub_res_type_str}'
159+
api_request = ConnectedSystemAPIRequest(url=complete_url, request_method='GET', auth=self.get_helper_auth(),
160+
headers=req_headers)
161+
return api_request.make_request()
162+
140163
def update_resource(self, res_type: APIResourceTypes, res_id: str, json_data: any, parent_res_id: str = None,
141164
from_collection: bool = False, url_endpoint: str = None, req_headers: dict = None):
142165
"""

src/oshconnect/csapi4py/mqtt.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33

44
class MQTTCommClient:
5-
def __init__(self, url, port=1883, username=None, password=None, path='mqtt', client_id="", transport='tcp'):
5+
def __init__(self, url, port=1883, username=None, password=None, path='mqtt', client_id_suffix="", transport='tcp'):
66
"""
77
Wraps a paho mqtt client to provide a simple interface for interacting with the mqtt server that is customized
88
for this library.
@@ -17,10 +17,10 @@ def __init__(self, url, port=1883, username=None, password=None, path='mqtt', cl
1717
self.__url = url
1818
self.__port = port
1919
self.__path = path
20-
self.__client_id = client_id
20+
self.__client_id = f'oscapy_mqtt-{client_id_suffix}'
2121
self.__transport = transport
2222

23-
self.__client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id=client_id)
23+
self.__client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id=self.__client_id)
2424

2525
if self.__transport == 'websockets':
2626
self.__client.ws_set_options(path=self.__path)

src/oshconnect/eventbus.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@ class EventBus(ABC):
1212
"""
1313
A base class for an event bus system.
1414
"""
15-
_deque: collections.deque
15+
_deque: collections.deque

src/oshconnect/oshconnectapi.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,9 @@ def discover_datastreams(self):
121121
datastreams = list(
122122
map(lambda ds: Datastream(parent_node=system.get_parent_node(), id=ds.ds_id, datastream_resource=ds),
123123
res_datastreams))
124-
datastreams = [ds.set_parent_resource_id(system.get_underlying_resource().system_id) for ds in datastreams]
124+
for ds in datastreams:
125+
ds.set_parent_resource_id(system.get_underlying_resource().system_id)
126+
# datastreams = [ds.set_parent_resource_id(system.get_underlying_resource().system_id) for ds in datastreams]
125127
self._datastreams.extend(datastreams)
126128

127129
def discover_systems(self, nodes: list[str] = None):

src/oshconnect/streamableresource.py

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,9 @@ def __init__(self, protocol: str, address: str, port: int,
149149
if kwargs.get('enable_mqtt'):
150150
if kwargs.get('mqtt_port') is not None:
151151
self._mqtt_port = kwargs.get('mqtt_port')
152-
self._mqtt_client = MQTTCommClient(url=self.address, port=self._mqtt_port)
152+
self._mqtt_client = MQTTCommClient(url=self.address, port=self._mqtt_port, client_id_suffix=uuid.uuid4().hex,)
153+
self._mqtt_client.connect()
154+
self._mqtt_client.start()
153155
# self._mqtt_client = MQTTCommClient(url=self.address + self.server_root, port=self._mqtt_port,
154156
# username=username, password=password, )
155157

@@ -249,6 +251,7 @@ class Status(Enum):
249251
STOPPING = "stopping"
250252
STOPPED = "stopped"
251253

254+
252255
class StreamableModes(Enum):
253256
PUSH = "push"
254257
PULL = "pull"
@@ -340,7 +343,12 @@ def init_mqtt(self):
340343
logging.warning(f"No MQTT client configured for streamable resource {self._id}.")
341344
return
342345

343-
self.get_mqtt_topic()
346+
self._mqtt_client.set_on_subscribe(self._default_on_subscribe)
347+
348+
# self.get_mqtt_topic()
349+
350+
def _default_on_subscribe(self, client, userdata, mid, granted_qos, properties):
351+
print("OSH Subscribed: "+str(mid)+" "+str(granted_qos))
344352

345353
def get_mqtt_topic(self, subresource: APIResourceTypes | None = None):
346354
"""
@@ -468,7 +476,7 @@ def subscribe_mqtt(self, topic: str, qos: int = 0):
468476
if self._mqtt_client is None:
469477
logging.warning(f"No MQTT client configured for streamable resource {self._id}.")
470478
return
471-
self._mqtt_client.subscribe(topic, qos=qos, msg_callback=self._message_handler)
479+
self._mqtt_client.subscribe(topic, qos=qos, msg_callback=self._mqtt_sub_callback)
472480

473481
def _publish_mqtt(self, topic, payload):
474482
if self._mqtt_client is None:
@@ -502,6 +510,11 @@ def publish(self, payload):
502510
pass
503511

504512

513+
def _mqtt_sub_callback(self, client, userdata, msg):
514+
print(f"Received MQTT message on topic {msg.topic}: {msg.payload}")
515+
self._msg_reader_queue.put_nowait(msg.payload)
516+
517+
505518
class System(StreamableResource[SystemResource]):
506519
name: str
507520
label: str
@@ -534,8 +547,10 @@ def __init__(self, name: str, label: str, urn: str, parent_node: Node, **kwargs)
534547
# self.underlying_resource = self._sys_resource
535548

536549
def discover_datastreams(self) -> list[DatastreamResource]:
537-
res = self._parent_node.get_api_helper().retrieve_resource(
538-
APIResourceTypes.DATASTREAM, req_headers={})
550+
# res = self._parent_node.get_api_helper().retrieve_resource(
551+
# APIResourceTypes.DATASTREAM, req_headers={})
552+
res = self._parent_node.get_api_helper().get_resource(APIResourceTypes.SYSTEM, self._resource_id,
553+
APIResourceTypes.DATASTREAM)
539554
datastream_json = res.json()['items']
540555
ds_resources = []
541556

@@ -702,19 +717,20 @@ def insert_observation_dict(self, obs_data: dict):
702717
def start(self):
703718
super().start()
704719
if self._mqtt_client is not None:
705-
self._mqtt_client.connect()
706-
self._mqtt_client.start()
707-
self.subscribe_mqtt(self._topic)
720+
# self._mqtt_client.connect()
721+
708722
if self._connection_mode is StreamableModes.PULL or self._connection_mode is StreamableModes.BIDIRECTIONAL:
709723
self._mqtt_client.subscribe(self._topic, msg_callback=self._mqtt_sub_callback)
724+
else:
725+
try:
726+
loop = asyncio.get_event_loop()
727+
loop.create_task(self._write_to_mqtt())
728+
except Exception as e:
729+
# TODO: Use logging instead of print
730+
print(traceback.format_exc())
731+
print(f"Error starting MQTT write task: {e}")
710732

711-
try:
712-
loop = asyncio.get_event_loop()
713-
loop.create_task(self._write_to_mqtt())
714-
except Exception as e:
715-
# TODO: Use logging instead of print
716-
print(traceback.format_exc())
717-
print(f"Error starting MQTT write task: {e}")
733+
# self._mqtt_client.start()
718734

719735
def init_mqtt(self):
720736
super().init_mqtt()
@@ -731,9 +747,9 @@ def _queue_push(self, msg):
731747
def _queue_pop(self):
732748
return self._msg_reader_queue.get_nowait()
733749

734-
def _mqtt_sub_callback(self, client, userdata, msg):
735-
print(f"MQTT Message received on topic {msg.topic}: {msg.payload}")
736-
self._queue_push(msg.payload)
750+
# def _mqtt_sub_callback(self, client, userdata, msg):
751+
# print(f"MQTT Message received on topic {msg.topic}: {msg.payload}")
752+
# self._queue_push(msg.payload)
737753

738754
def insert(self, data: dict):
739755
# self._queue_push(data)

0 commit comments

Comments
 (0)