From c2526c71c5b64475257e83468fea11bee8d138c9 Mon Sep 17 00:00:00 2001 From: William Yang Date: Sun, 1 Feb 2026 21:25:40 +0100 Subject: [PATCH 01/12] feat(ffi): python aio --- .github/workflows/ci.yml | 18 ++++ .github/workflows/release_pypi.yml | 57 ++++++++++++ .gitignore | 6 ++ python/MANIFEST.in | 3 + python/README.md | 18 ++++ python/flowsdk/__init__.py | 4 + python/pyproject.toml | 23 +++++ python/setup.py | 9 ++ python_bindings/async_example.py | 143 +++++++++++++++++++++++++++++ python_bindings/test_binding.py | 130 ++++++++++++++++++++++++++ scripts/build_python_bindings.sh | 45 +++++++++ 11 files changed, 456 insertions(+) create mode 100644 .github/workflows/release_pypi.yml create mode 100644 python/MANIFEST.in create mode 100644 python/README.md create mode 100644 python/flowsdk/__init__.py create mode 100644 python/pyproject.toml create mode 100644 python/setup.py create mode 100644 python_bindings/async_example.py create mode 100644 python_bindings/test_binding.py create mode 100755 scripts/build_python_bindings.sh diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8145d8b0..13635f7a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -134,6 +134,24 @@ jobs: - name: Run security audit run: cargo audit + python-bindings: + name: Python Bindings + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.10' + + - name: Build and Test Bindings + run: ./scripts/build_python_bindings.sh --test + coverage: name: Code Coverage runs-on: ubuntu-latest diff --git a/.github/workflows/release_pypi.yml b/.github/workflows/release_pypi.yml new file mode 100644 index 00000000..2973548f --- /dev/null +++ b/.github/workflows/release_pypi.yml @@ -0,0 +1,57 @@ +name: Publish to PyPI + +on: + release: + types: [published] + +jobs: + build_wheels: + name: Build wheels on ${{ matrix.os }} + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [ubuntu-latest, windows-latest, macos-latest] + + steps: + - uses: actions/checkout@v4 + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + + - name: Build and Prepare Bindings + shell: bash + run: | + ./scripts/build_python_bindings.sh --release + + - name: Build wheels + uses: pypa/cibuildwheel@v2.16.5 + with: + package-dir: python + env: + CIBW_BEFORE_BUILD: "pip install setuptools wheel" + # Linux specific: ensure library is copied correctly if needed + # But our script handles the dylib placement. + + - uses: actions/upload-artifact@v4 + with: + name: cibw-wheels-${{ matrix.os }}-${{ strategy.job-index }} + path: ./wheelhouse/*.whl + + upload_pypi: + needs: [build_wheels] + runs-on: ubuntu-latest + environment: + name: pypi + url: https://pypi.org/p/flowsdk + permissions: + id-token: write + steps: + - uses: actions/download-artifact@v4 + with: + pattern: cibw-wheels-* + path: dist + merge-multiple: true + + - uses: pypa/gh-action-pypi-publish@release/v1 + with: + password: ${{ secrets.PYPI_API_TOKEN }} diff --git a/.gitignore b/.gitignore index a03e0511..0badb2a7 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,9 @@ Cargo.lock *.profraw *.profdata + +# Python Bindings +python_bindings/flowsdk_ffi.py +python_bindings/*.dylib +python_bindings/*.so +python_bindings/*.dll diff --git a/python/MANIFEST.in b/python/MANIFEST.in new file mode 100644 index 00000000..8d9b384a --- /dev/null +++ b/python/MANIFEST.in @@ -0,0 +1,3 @@ +include flowsdk/*.dylib +include flowsdk/*.so +include flowsdk/*.dll diff --git a/python/README.md b/python/README.md new file mode 100644 index 00000000..203a2b99 --- /dev/null +++ b/python/README.md @@ -0,0 +1,18 @@ +# FlowSDK Python Bindings + +Python bindings for [FlowSDK](https://github.com/emqx/flowsdk). + +## Installation + +```bash +pip install flowsdk +``` + +## Usage + +```python +import flowsdk + +engine = flowsdk.MqttEngineFfi("client_id", 5) +engine.connect() +``` diff --git a/python/flowsdk/__init__.py b/python/flowsdk/__init__.py new file mode 100644 index 00000000..3509d4e9 --- /dev/null +++ b/python/flowsdk/__init__.py @@ -0,0 +1,4 @@ +try: + from .flowsdk_ffi import * +except ImportError: + pass diff --git a/python/pyproject.toml b/python/pyproject.toml new file mode 100644 index 00000000..dfa1a1ca --- /dev/null +++ b/python/pyproject.toml @@ -0,0 +1,23 @@ +[build-system] +requires = ["setuptools>=42", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "flowsdk" +version = "0.3.1" +description = "Python bindings for FlowSDK utilizing Uniffi" +readme = "README.md" +authors = [ + { name = "EMQ Technologies Co., Ltd.", email = "contact@emqx.io" } +] +license = { text = "MPL-2.0" } +classifiers = [ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: Mozilla Public License 2.0 (MPL 2.0)", + "Operating System :: OS Independent", +] +requires-python = ">=3.7" + +[tool.setuptools.packages.find] +where = ["."] +include = ["flowsdk*"] diff --git a/python/setup.py b/python/setup.py new file mode 100644 index 00000000..2ba7d88d --- /dev/null +++ b/python/setup.py @@ -0,0 +1,9 @@ +from setuptools import setup, find_packages + +setup( + packages=find_packages(), + include_package_data=True, + package_data={ + "flowsdk": ["*.dylib", "*.so", "*.dll"], + }, +) diff --git a/python_bindings/async_example.py b/python_bindings/async_example.py new file mode 100644 index 00000000..f9bdc567 --- /dev/null +++ b/python_bindings/async_example.py @@ -0,0 +1,143 @@ +import asyncio +import flowsdk_ffi +import time +import sys + +async def main(): + print("šŸš€ FlowSDK No-IO Async Example") + print("=" * 60) + + # 1. Create the engine + import random + client_id = f"python_async_no_io_{random.randint(1000, 9999)}" + opts = flowsdk_ffi.MqttOptionsFfi( + client_id=client_id, + mqtt_version=5, + clean_start=True, + keep_alive=60, + username=None, + password=None, + reconnect_base_delay_ms=1000, + reconnect_max_delay_ms=30000, + max_reconnect_attempts=0 + ) + engine = flowsdk_ffi.MqttEngineFfi.new_with_opts(opts) + print(f"āœ… Created MqttEngineFfi (Client ID: {client_id})") + + # 2. Establish TCP connection using asyncio + broker_host = "broker.emqx.io" + broker_port = 1883 + print(f"šŸ“” Connecting to {broker_host}:{broker_port}...") + try: + reader, writer = await asyncio.open_connection(broker_host, broker_port) + except Exception as e: + print(f"āŒ Connection failed: {e}") + return + + print("āœ… TCP Connected") + + # 3. Initiate MQTT connection + engine.connect() + + # 4. Main orchestration loop + try: + # Run for 15 seconds + start_time = time.monotonic() + end_time = start_time + 15 + + # Subscribe/Publish state + subscribed = False + published = False + + while time.monotonic() < end_time: + # now_ms should be the time elapsed since the engine was created. + # Since we create the engine just before this, monotonic() - start_time is a good proxy. + now_ms = int((time.monotonic() - start_time) * 1000) + + # 1. Handle protocol timers + engine.handle_tick(now_ms) + + # 2. Handle network input (non-blocking-ish) + try: + # Use a small timeout to keep the loop pumping + data = await asyncio.wait_for(reader.read(4096), timeout=0.05) + if data: + print(f"šŸ“„ Received {len(data)} bytes from network") + engine.handle_incoming(data) + elif reader.at_eof(): + print("šŸ“„ Reader got EOF") + engine.handle_connection_lost() + break + except asyncio.TimeoutError: + # No data available in this tick, that's fine + pass + except Exception as e: + print(f"āŒ Read error: {e}") + engine.handle_connection_lost() + break + + # 3. Handle network output + outgoing = engine.take_outgoing() + if outgoing: + print(f"šŸ“¤ Sending {len(outgoing)} bytes to network...") + writer.write(outgoing) + await writer.drain() + + # 4. Process all accumulated events + events = engine.take_events() + for event in events: + if event.is_connected(): + res = event[0] + print(f"āœ… MQTT Connected! (Reason: {res.reason_code}, Session Present: {res.session_present})") + elif event.is_disconnected(): + print(f"šŸ’” MQTT Disconnected! (Reason: {event.reason_code})") + elif event.is_message_received(): + msg = event[0] + print(f"šŸ“Ø Message on '{msg.topic}': {msg.payload.decode()} (QoS: {msg.qos})") + elif event.is_subscribed(): + res = event[0] + print(f"āœ… Subscribed (PID: {res.packet_id}, Reasons: {res.reason_codes})") + elif event.is_published(): + res = event[0] + print(f"āœ… Published (PID: {res.packet_id}, Reason: {res.reason_code})") + elif event.is_error(): + print(f"āŒ Engine Error: {event[0].message}") + elif event.is_reconnect_needed(): + print("šŸ”„ Engine signaled ReconnectNeeded") + elif event.is_reconnect_scheduled(): + print(f"ā° Engine scheduled reconnect (Attempt {event.attempt}, Delay {event.delay_ms}ms)") + elif event.is_ping_response(): + print(f"šŸ“ Ping Response (Success: {event.success})") + + # 5. Application logic + if engine.is_connected(): + if not subscribed: + pid = engine.subscribe("test/python/no_io", 1) + print(f"šŸ“‘ API -> subscribe('test/python/no_io', qos=1) -> PID: {pid}") + subscribed = True + elif not published: + pid = engine.publish("test/python/no_io", b"Hello from Python No-IO!", 1, None) + print(f"šŸ“¤ API -> publish('test/python/no_io', payload='...', qos=1) -> PID: {pid}") + published = True + + except Exception as e: + print(f"āŒ Loop error: {e}") + finally: + # 6. Disconnect gracefully + print("\nšŸ‘‹ Disconnecting...") + if not writer.is_closing(): + engine.disconnect() + outgoing = engine.take_outgoing() + if outgoing: + writer.write(outgoing) + await writer.drain() + + writer.close() + await writer.wait_closed() + print("āœ… Done") + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + pass diff --git a/python_bindings/test_binding.py b/python_bindings/test_binding.py new file mode 100644 index 00000000..9df7888e --- /dev/null +++ b/python_bindings/test_binding.py @@ -0,0 +1,130 @@ +import flowsdk_ffi +import time +import json + +def test_mqtt_engine(): + print("--- Testing Standard MqttEngineFfi ---") + opts = flowsdk_ffi.MqttOptionsFfi( + client_id="python_client", + mqtt_version=5, + clean_start=True, + keep_alive=60, + username=None, + password=None, + reconnect_base_delay_ms=1000, + reconnect_max_delay_ms=30000, + max_reconnect_attempts=0 + ) + engine = flowsdk_ffi.MqttEngineFfi.new_with_opts(opts) + + print(f"MQTT Version: {engine.get_version()}") + + # Simulate connection + engine.connect() + + # Subscribe + pid = engine.subscribe("test/topic", 1) + print(f"Subscribe packet_id: {pid}") + + # Publish + pid = engine.publish("test/topic", b"hello world", 1, None) + print(f"Publish packet_id: {pid}") + + # Simulate receiving a message (loopback) + print("Simulating message reception...") + msg = flowsdk_ffi.MqttMessageFfi(topic="test/topic", payload=b"hello loopback", qos=1, retain=False) + event = flowsdk_ffi.MqttEventFfi.MESSAGE_RECEIVED(msg) + engine.push_event_ffi(event) + + # Check events + events = engine.take_events() + for ev in events: + if ev.is_message_received(): + m = ev[0] # The MqttMessageFfi is the first element in the variant tuple + print(f"RECEIVED PUBLISH: topic={m.topic}, payload={m.payload.decode()}, qos={m.qos}") + else: + print(f"Other Event: {ev}") + + # Disconnect + engine.disconnect() + print("Disconnected.") + +def test_tls_engine(): + print("\n--- Testing TlsMqttEngineFfi ---") + opts = flowsdk_ffi.MqttOptionsFfi( + client_id="tls_client", + mqtt_version=5, + clean_start=True, + keep_alive=60, + username=None, + password=None, + reconnect_base_delay_ms=1000, + reconnect_max_delay_ms=30000, + max_reconnect_attempts=0 + ) + tls_opts = flowsdk_ffi.MqttTlsOptionsFfi( + ca_cert_file=None, + client_cert_file=None, + client_key_file=None, + insecure_skip_verify=True, + alpn_protocols=["mqtt"] + ) + + engine = flowsdk_ffi.TlsMqttEngineFfi(opts, tls_opts, "localhost") + engine.connect() + + # Check for outgoing socket data + data = engine.take_socket_data() + print(f"Outgoing TLS data: {len(data)} bytes") + + engine.disconnect() + print("Disconnected.") + +def test_quic_engine(): + print("\n--- Testing QuicMqttEngineFfi ---") + opts = flowsdk_ffi.MqttOptionsFfi( + client_id="quic_client", + mqtt_version=5, + clean_start=True, + keep_alive=60, + username=None, + password=None, + reconnect_base_delay_ms=1000, + reconnect_max_delay_ms=30000, + max_reconnect_attempts=0 + ) + + engine = flowsdk_ffi.QuicMqttEngineFfi(opts) + + tls_opts = flowsdk_ffi.MqttTlsOptionsFfi( + ca_cert_file=None, + client_cert_file=None, + client_key_file=None, + insecure_skip_verify=True, + alpn_protocols=["mqtt"] + ) + + engine.connect("127.0.0.1:1883", "localhost", tls_opts, 0) + + # Check for outgoing datagrams + datagrams = engine.take_outgoing_datagrams() + print(f"Outgoing QUIC datagrams: {len(datagrams)}") + + # Simulate a tick + events = engine.handle_tick(100) + print(f"Tick events: {len(events)}") + + engine.disconnect() + print("Disconnected.") + +if __name__ == "__main__": + try: + test_mqtt_engine() + test_tls_engine() + test_quic_engine() + print("\nAll verification tests passed!") + except Exception as e: + print(f"\nVerification Failed: {e}") + import traceback + traceback.print_exc() + exit(1) diff --git a/scripts/build_python_bindings.sh b/scripts/build_python_bindings.sh new file mode 100755 index 00000000..2744b27f --- /dev/null +++ b/scripts/build_python_bindings.sh @@ -0,0 +1,45 @@ +#!/bin/bash +set -e + +# Default to debug build +PROFILE="debug" +CARGO_PROFILE="dev" +TARGET_DIR="target/debug" + +if [[ "$1" == "--release" ]]; then + PROFILE="release" + CARGO_PROFILE="release" + TARGET_DIR="target/release" + shift +fi + +# Platforms +OS="$(uname -s)" +case "${OS}" in + Linux*) EXT="so";; + Darwin*) EXT="dylib";; + CYGWIN*|MINGW*|MSYS*) EXT="dll";; # Windows-ish + *) EXT="so";; +esac + +echo "Building flowsdk_ffi ($PROFILE)..." +cargo build -p flowsdk_ffi --profile $CARGO_PROFILE + +echo "Generating Python bindings..." +# Output direct to python/flowsdk +cargo run -p flowsdk_ffi --features=uniffi/cli --bin uniffi-bindgen generate \ + --library "$TARGET_DIR/libflowsdk_ffi.$EXT" \ + --language python \ + --out-dir python/flowsdk + +echo "Copying library for Python package..." +cp "$TARGET_DIR/libflowsdk_ffi.$EXT" python/flowsdk/ + +if [[ "$1" == "--test" ]]; then + echo "Running Python verification..." + export PYTHONPATH=$PWD/python + # We use a temporary test script that imports from flowsdk + python3 -c "import flowsdk; print('Import successful'); engine = flowsdk.MqttEngineFfi('test', 5); print('Engine created')" +fi + +echo "Done!" From 35fef6ff0edb937f385e18193aebf88b57525600 Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 2 Feb 2026 10:01:37 +0100 Subject: [PATCH 02/12] feat: Add Python FFI bindings for flowsdk and include asyncio MQTT client examples. --- python_bindings/async_interop_example.py | 161 ++++++++++++++++ python_bindings/proper_async_client.py | 232 +++++++++++++++++++++++ python_bindings/select_example.py | 192 +++++++++++++++++++ 3 files changed, 585 insertions(+) create mode 100644 python_bindings/async_interop_example.py create mode 100644 python_bindings/proper_async_client.py create mode 100644 python_bindings/select_example.py diff --git a/python_bindings/async_interop_example.py b/python_bindings/async_interop_example.py new file mode 100644 index 00000000..94140c99 --- /dev/null +++ b/python_bindings/async_interop_example.py @@ -0,0 +1,161 @@ +import asyncio + +import flowsdk_ffi +import time +import socket + +""" +Asynchronous MQTT Client using flowsdk-ffi with asyncio integration. +This module demonstrates how to integrate the flowsdk-ffi MQTT engine with Python's +asyncio event loop using low-level socket operations and asyncio's add_reader/add_writer +callbacks. +Example: + Run the async interop client:: + $ python async_interop_example.py +Classes: + AsyncInteropClient: An asyncio-integrated MQTT client that uses non-blocking sockets + and the asyncio event loop for I/O operations. +The implementation showcases: + - Non-blocking TCP socket connection using asyncio.loop.sock_connect() + - Event-driven I/O using loop.add_reader() and loop.add_writer() + - Integration of flowsdk-ffi's tick-based engine with asyncio's event loop + - Handling MQTT events (connection, subscription, message reception, publishing) + - Proper resource cleanup and socket lifecycle management +Note: + This is an example demonstrating the interoperability pattern between + FlowSDK FFI MQTT engine and Python's asyncio framework. +""" + +class AsyncInteropClient: + def __init__(self, client_id): + self.opts = flowsdk_ffi.MqttOptionsFfi( + client_id=client_id, + mqtt_version=5, + clean_start=True, + keep_alive=60, + username=None, + password=None, + reconnect_base_delay_ms=1000, + reconnect_max_delay_ms=30000, + max_reconnect_attempts=0 + ) + self.engine = flowsdk_ffi.MqttEngineFfi.new_with_opts(self.opts) + self.start_time = time.monotonic() + self.outgoing_buffer = b"" + self.sock = None + self.loop = asyncio.get_running_loop() + self.connected_event = asyncio.Event() + + async def connect(self, host, port): + print(f"šŸ“” Connecting to {host}:{port}...") + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.setblocking(False) + + try: + self.sock.connect((host, port)) + except BlockingIOError: + pass + + # Use asyncio to wait for the socket to be writable (connection complete) + await self.loop.sock_connect(self.sock, (host, port)) + print("āœ… TCP Connected") + + # Start the engine logic + self.engine.connect() + + # Register reader + self.loop.add_reader(self.sock, self._on_read) + self._pump_logic() + + def _on_read(self): + try: + data = self.sock.recv(4096) + if data: + print(f"šŸ“„ Received {len(data)} bytes") + self.engine.handle_incoming(data) + self._pump_logic() + else: + print("šŸ“„ EOF") + self.stop() + except Exception as e: + print(f"āŒ Read error: {e}") + self.stop() + + def _on_write(self): + if not self.outgoing_buffer: + self.loop.remove_writer(self.sock) + return + + try: + sent = self.sock.send(self.outgoing_buffer) + print(f"šŸ“¤ Sent {sent} bytes") + self.outgoing_buffer = self.outgoing_buffer[sent:] + if not self.outgoing_buffer: + self.loop.remove_writer(self.sock) + except (BlockingIOError, InterruptedError): + pass + except Exception as e: + print(f"āŒ Write error: {e}") + self.stop() + + def _pump_logic(self): + # 1. Ticks + now_ms = int((time.monotonic() - self.start_time) * 1000) + self.engine.handle_tick(now_ms) + + # 2. Outgoing + new_data = self.engine.take_outgoing() + if new_data: + self.outgoing_buffer += new_data + self.loop.add_writer(self.sock, self._on_write) + + # 3. Events + events = self.engine.take_events() + for ev in events: + if ev.is_connected(): + print("āœ… MQTT Connected!") + self.connected_event.set() + elif ev.is_message_received(): + m = ev[0] + print(f"šŸ“Ø Message: {m.topic} -> {m.payload.decode()}") + elif ev.is_subscribed(): + print(f"āœ… Subscribed (PID: {ev[0].packet_id})") + elif ev.is_published(): + print(f"āœ… Published (PID: {ev[0].packet_id})") + + # 4. Schedule next tick + next_ms = self.engine.next_tick_ms() + if next_ms > 0: + delay = max(0, (next_ms - now_ms) / 1000.0) + self.loop.call_later(delay, self._pump_logic) + + def stop(self): + if self.sock: + self.loop.remove_reader(self.sock) + self.loop.remove_writer(self.sock) + self.sock.close() + self.sock = None + print("šŸ›‘ Client stopped") + +async def main(): + print("šŸš€ asyncio Interop Example (Using add_reader/add_writer)") + print("=" * 70) + + client = AsyncInteropClient(f"python_async_interop_{int(time.time() % 10000)}") + await client.connect("broker.emqx.io", 1883) + + await client.connected_event.wait() + + # Example operations + client.engine.subscribe("test/python/interop", 1) + client._pump_logic() # Trigger immediate pump to send subscribe + + client.engine.publish("test/python/interop", b"Hello from Interop!", 1, None) + client._pump_logic() # Trigger immediate pump to send publish + + # Run for a bit to receive messages + await asyncio.sleep(5) + client.stop() + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python_bindings/proper_async_client.py b/python_bindings/proper_async_client.py new file mode 100644 index 00000000..f1b100c4 --- /dev/null +++ b/python_bindings/proper_async_client.py @@ -0,0 +1,232 @@ +import asyncio +""" +A proper async MQTT client implementation using flowsdk-ffi with asyncio. +This module provides an asyncio-based MQTT client that wraps the flowsdk_ffi +library, providing async/await support for MQTT operations. It implements a +custom Protocol for handling the network layer and manages the timing and +pumping of the MQTT engine. +Classes: + FlowMqttProtocol: asyncio.Protocol implementation that manages the network + layer and handles engine ticks and data pumping. + FlowMqttClient: High-level async MQTT client providing connect, subscribe, + publish, and disconnect operations. +Example: + >>> async def example(): + ... client = FlowMqttClient("my_client_id") + ... await client.connect("broker.emqx.io", 1883) + ... await client.subscribe("test/topic", 1) + ... await client.publish("test/topic", b"Hello", 1) + ... await client.disconnect() +""" +import flowsdk_ffi +import time +import socket +from typing import Optional, Dict, Any + + + + +class FlowMqttProtocol(asyncio.Protocol): + def __init__(self, engine: flowsdk_ffi.MqttEngineFfi, loop: asyncio.AbstractEventLoop, on_event_cb): + self.engine = engine + self.loop = loop + self.transport: Optional[asyncio.Transport] = None + self.on_event_cb = on_event_cb + self.start_time = time.monotonic() + self._tick_handle: Optional[asyncio.TimerHandle] = None + self.closed = False + + def connection_made(self, transport: asyncio.Transport): + self.transport = transport + print("Protocol: Connection made") + + # Start the engine + self.engine.connect() + + # Start the tick loop immediately + self._schedule_tick(0) + + def data_received(self, data: bytes): + # Feed network data to engine + self.engine.handle_incoming(data) + # Trigger an immediate pump to process potential responses + self._pump() + + def connection_lost(self, exc: Optional[Exception]): + print(f"Protocol: Connection lost: {exc}") + self.closed = True + self.engine.handle_connection_lost() + if self._tick_handle: + self._tick_handle.cancel() + + def _schedule_tick(self, delay_sec: float): + if self.closed: + return + # Cancel previous handle if it exists to avoid overlaps (though logic shouldn't allow it) + if self._tick_handle: + self._tick_handle.cancel() + + self._tick_handle = self.loop.call_later(delay_sec, self._on_timer) + + def _on_timer(self): + if self.closed: + return + + # Run protocol tick + now_ms = int((time.monotonic() - self.start_time) * 1000) + self.engine.handle_tick(now_ms) + + self._pump() + + # Schedule next tick + next_tick_ms = self.engine.next_tick_ms() + if next_tick_ms < 0: + # Should not happen typically unless engine is dead + delay = 0.1 + else: + delay = max(0, (next_tick_ms - now_ms) / 1000.0) + + self._schedule_tick(delay) + + def _pump(self): + # 1. Send outgoing data + outgoing = self.engine.take_outgoing() + if outgoing and self.transport and not self.transport.is_closing(): + self.transport.write(outgoing) + + # 2. Process events + events = self.engine.take_events() + for ev in events: + self.on_event_cb(ev) + +class FlowMqttClient: + def __init__(self, client_id: str): + self.opts = flowsdk_ffi.MqttOptionsFfi( + client_id=client_id, + mqtt_version=5, + clean_start=True, + keep_alive=30, + username=None, + password=None, + reconnect_base_delay_ms=1000, + reconnect_max_delay_ms=30000, + max_reconnect_attempts=0 + ) + self.engine = flowsdk_ffi.MqttEngineFfi.new_with_opts(self.opts) + self.loop = asyncio.get_running_loop() + self.protocol: Optional[FlowMqttProtocol] = None + + # Futures for pending operations + # Connect is special, tracked by a single future + self._connect_future: asyncio.Future = self.loop.create_future() + + # Map PacketID -> Future + self._pending_publish: Dict[int, asyncio.Future] = {} + self._pending_subscribe: Dict[int, asyncio.Future] = {} + + async def connect(self, host: str, port: int): + print(f"Client: Connecting to {host}:{port}...") + + transport, protocol = await self.loop.create_connection( + lambda: FlowMqttProtocol(self.engine, self.loop, self._on_event), + host, port + ) + self.protocol = protocol + + # Wait for actual MQTT connection + await self._connect_future + print("Client: MQTT Connected!") + + async def subscribe(self, topic: str, qos: int) -> int: + pid = self.engine.subscribe(topic, qos) + fut = self.loop.create_future() + self._pending_subscribe[pid] = fut + + # Force a pump to send the packet immediately + if self.protocol: + self.protocol._pump() + + print(f"Client: Awaiting Ack for Subscribe (PID: {pid})...") + await fut + return pid + + async def publish(self, topic: str, payload: bytes, qos: int) -> int: + pid = self.engine.publish(topic, payload, qos, None) + fut = self.loop.create_future() + self._pending_publish[pid] = fut + + # Force a pump to send the packet immediately + if self.protocol: + self.protocol._pump() + + print(f"Client: Awaiting Ack for Publish (PID: {pid})...") + await fut + return pid + + async def disconnect(self): + print("Client: Disconnecting...") + self.engine.disconnect() + if self.protocol: + self.protocol._pump() # Flush DISCONNECT packet + if self.protocol.transport: + self.protocol.transport.close() + self.protocol.closed = True + + def _on_event(self, ev): + if ev.is_connected(): + if not self._connect_future.done(): + self._connect_future.set_result(True) + + elif ev.is_published(): + # Packet Acked + res = ev[0] + pid = res.packet_id + if pid in self._pending_publish: + fut = self._pending_publish.pop(pid) + if not fut.done(): + fut.set_result(res) + print(f"Client: Future resolved for Publish PID {pid}") + + elif ev.is_subscribed(): + # Sub Acked + res = ev[0] + pid = res.packet_id + if pid in self._pending_subscribe: + fut = self._pending_subscribe.pop(pid) + if not fut.done(): + fut.set_result(res) + print(f"Client: Future resolved for Subscribe PID {pid}") + + elif ev.is_message_received(): + msg = ev[0] + print(f"******** MSG RECEIVED ********") + print(f"Topic: {msg.topic}") + print(f"Payload: {msg.payload.decode(errors='replace')}") + print(f"******************************") + + elif ev.is_disconnected(): + pass + +async def main(): + client_id = f"python_proper_{int(time.time() % 10000)}" + client = FlowMqttClient(client_id) + + try: + await client.connect("broker.emqx.io", 1883) + + # Subscribe and wait for ack + await client.subscribe("test/python/proper", 1) + print("āœ… Subscribe completed!") + + # Publish and wait for ack + await client.publish("test/python/proper", b"Hello from Proper Client!", 1) + print("āœ… Publish completed!") + + # Wait a bit to receive the message back + await asyncio.sleep(2) + + finally: + await client.disconnect() + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python_bindings/select_example.py b/python_bindings/select_example.py new file mode 100644 index 00000000..d6c1ea1d --- /dev/null +++ b/python_bindings/select_example.py @@ -0,0 +1,192 @@ +import socket +import selectors +import time +import flowsdk_ffi +import sys + +def main(): + """ + Main function demonstrating a select-based MQTT client using FlowSDK FFI. + This example shows how to use Python's selectors module to handle non-blocking + I/O with an MQTT engine. It connects to a public MQTT broker, subscribes to a + topic, publishes a message, and handles all I/O events asynchronously. + The function performs the following steps: + 1. Creates an MQTT engine with configuration options + 2. Establishes a non-blocking TCP connection to the broker + 3. Uses a selector to monitor socket events (read/write availability) + 4. Manages the MQTT connection handshake + 5. Subscribes to a test topic + 6. Publishes a test message + 7. Processes incoming MQTT messages and protocol events + 8. Runs for 30 seconds or until disconnected + The selector's EVENT_WRITE flag indicates when the socket is ready for writing, + which occurs when: + - The initial non-blocking connect() completes successfully + - The socket's send buffer has space available for outgoing data + Returns: + None + Raises: + KeyboardInterrupt: Handled gracefully to allow clean disconnection + """ + print("šŸš€ Select-based FlowSDK Example (Port of Rust no_io_mqtt_client_example.rs)") + print("=" * 70) + + # 1. Create the engine + client_id = f"python_select_no_io_{int(time.time() % 10000)}" + opts = flowsdk_ffi.MqttOptionsFfi( + client_id=client_id, + mqtt_version=5, + clean_start=True, + keep_alive=60, + username=None, + password=None, + reconnect_base_delay_ms=1000, + reconnect_max_delay_ms=30000, + max_reconnect_attempts=0 + ) + engine = flowsdk_ffi.MqttEngineFfi.new_with_opts(opts) + print(f"āœ… Created MqttEngineFfi (Client ID: {client_id})") + + # 2. Establish TCP connection + broker_host = "broker.emqx.io" + broker_port = 1883 + print(f"šŸ“” Connecting to {broker_host}:{broker_port}...") + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setblocking(False) + + try: + sock.connect((broker_host, broker_port)) + except BlockingIOError: + pass # Expected for non-blocking connect + + # 3. Use Selector for I/O + sel = selectors.DefaultSelector() + sel.register(sock, selectors.EVENT_WRITE) # Wait for connect to complete + + # Initiate MQTT connection + engine.connect() + + start_time = time.monotonic() + end_time = start_time + 30 + + connected = False + mqtt_connected = False + subscribed = False + published = False + + print("šŸ”„ Entering main loop...") + + outgoing_data = b"" + + try: + while time.monotonic() < end_time: + now_ms = int((time.monotonic() - start_time) * 1000) + + # 1. Handle engine ticks + engine.handle_tick(now_ms) + + # 2. Accumulate outgoing data from engine + new_outgoing = engine.take_outgoing() + if new_outgoing: + outgoing_data += new_outgoing + + # 3. Handle network events + timeout = 0.1 + next_tick = engine.next_tick_ms() + if next_tick > 0: + timeout = max(0, (next_tick - now_ms) / 1000.0) + timeout = min(timeout, 0.1) + + events = sel.select(timeout=timeout) + + for key, mask in events: + if mask & selectors.EVENT_WRITE: + if not connected: + # Check if connection was successful + err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) + if err == 0: + print("āœ… TCP Socket connected") + connected = True + else: + print(f"āŒ TCP Connection failed: {err}") + return + + if outgoing_data: + try: + sent = sock.send(outgoing_data) + print(f"šŸ“¤ Sent {sent} bytes") + outgoing_data = outgoing_data[sent:] + except (BlockingIOError, InterruptedError): + pass + + if mask & selectors.EVENT_READ: + try: + data = sock.recv(4096) + if data: + print(f"šŸ“„ Received {len(data)} bytes") + engine.handle_incoming(data) + else: + print("šŸ“„ EOF from server") + return + except (BlockingIOError, InterruptedError): + pass + + # 4. Update selector registration based on pending data + if not connected: + # Still waiting for connection + sel.modify(sock, selectors.EVENT_WRITE) + elif outgoing_data: + # Have data to write + sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE) + else: + # Only waiting for read + sel.modify(sock, selectors.EVENT_READ) + + # 5. Process protocol events + ffi_events = engine.take_events() + for ev in ffi_events: + if ev.is_connected(): + res = ev[0] + print(f"āœ… MQTT Connected! (Reason: {res.reason_code})") + mqtt_connected = True + elif ev.is_message_received(): + msg = ev[0] + content = msg.payload.decode(errors='replace') + print(f"šŸ“Ø Message on '{msg.topic}': {content} (QoS: {msg.qos})") + elif ev.is_subscribed(): + print(f"āœ… Subscribed (PID: {ev[0].packet_id})") + elif ev.is_published(): + print(f"āœ… Published (PID: {ev[0].packet_id})") + elif ev.is_error(): + print(f"āŒ Error: {ev[0].message}") + elif ev.is_disconnected(): + print(f"šŸ’” Disconnected (Reason: {ev.reason_code})") + return + + # 6. App Logic + if mqtt_connected: + if not subscribed: + pid = engine.subscribe("test/python/select", 1) + print(f"šŸ“‘ Subscribing (PID: {pid})...") + subscribed = True + elif not published: + pid = engine.publish("test/python/select", b"Hello from Select!", 1, None) + print(f"šŸ“¤ Publishing (PID: {pid})...") + published = True + + except KeyboardInterrupt: + pass + finally: + print("\nšŸ‘‹ Disconnecting...") + engine.disconnect() + final_outgoing = engine.take_outgoing() + if final_outgoing: + sock.setblocking(True) + sock.sendall(final_outgoing) + sock.close() + sel.close() + print("āœ… Done") + +if __name__ == "__main__": + main() From 054830b275f69ddd5d54aceb192c769c3e16fd83 Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 2 Feb 2026 10:26:19 +0100 Subject: [PATCH 03/12] refactor: mv Python bindings into python/package and python/examples --- .github/workflows/release_pypi.yml | 2 +- .gitignore | 10 +++++----- {python_bindings => python/examples}/async_example.py | 0 .../examples}/async_interop_example.py | 0 .../examples}/proper_async_client.py | 0 {python_bindings => python/examples}/select_example.py | 0 {python_bindings => python/examples}/test_binding.py | 0 python/{ => package}/MANIFEST.in | 0 python/{ => package}/README.md | 0 python/{ => package}/flowsdk/__init__.py | 0 python/{ => package}/pyproject.toml | 0 python/{ => package}/setup.py | 0 scripts/build_python_bindings.sh | 8 ++++---- 13 files changed, 10 insertions(+), 10 deletions(-) rename {python_bindings => python/examples}/async_example.py (100%) rename {python_bindings => python/examples}/async_interop_example.py (100%) rename {python_bindings => python/examples}/proper_async_client.py (100%) rename {python_bindings => python/examples}/select_example.py (100%) rename {python_bindings => python/examples}/test_binding.py (100%) rename python/{ => package}/MANIFEST.in (100%) rename python/{ => package}/README.md (100%) rename python/{ => package}/flowsdk/__init__.py (100%) rename python/{ => package}/pyproject.toml (100%) rename python/{ => package}/setup.py (100%) diff --git a/.github/workflows/release_pypi.yml b/.github/workflows/release_pypi.yml index 2973548f..1e06e390 100644 --- a/.github/workflows/release_pypi.yml +++ b/.github/workflows/release_pypi.yml @@ -26,7 +26,7 @@ jobs: - name: Build wheels uses: pypa/cibuildwheel@v2.16.5 with: - package-dir: python + package-dir: python/package env: CIBW_BEFORE_BUILD: "pip install setuptools wheel" # Linux specific: ensure library is copied correctly if needed diff --git a/.gitignore b/.gitignore index 0badb2a7..f2ee3761 100644 --- a/.gitignore +++ b/.gitignore @@ -8,8 +8,8 @@ Cargo.lock *.profraw *.profdata -# Python Bindings -python_bindings/flowsdk_ffi.py -python_bindings/*.dylib -python_bindings/*.so -python_bindings/*.dll +# Python Examples (generated files) +python/examples/flowsdk_ffi.py +python/examples/*.dylib +python/examples/*.so +python/examples/*.dll diff --git a/python_bindings/async_example.py b/python/examples/async_example.py similarity index 100% rename from python_bindings/async_example.py rename to python/examples/async_example.py diff --git a/python_bindings/async_interop_example.py b/python/examples/async_interop_example.py similarity index 100% rename from python_bindings/async_interop_example.py rename to python/examples/async_interop_example.py diff --git a/python_bindings/proper_async_client.py b/python/examples/proper_async_client.py similarity index 100% rename from python_bindings/proper_async_client.py rename to python/examples/proper_async_client.py diff --git a/python_bindings/select_example.py b/python/examples/select_example.py similarity index 100% rename from python_bindings/select_example.py rename to python/examples/select_example.py diff --git a/python_bindings/test_binding.py b/python/examples/test_binding.py similarity index 100% rename from python_bindings/test_binding.py rename to python/examples/test_binding.py diff --git a/python/MANIFEST.in b/python/package/MANIFEST.in similarity index 100% rename from python/MANIFEST.in rename to python/package/MANIFEST.in diff --git a/python/README.md b/python/package/README.md similarity index 100% rename from python/README.md rename to python/package/README.md diff --git a/python/flowsdk/__init__.py b/python/package/flowsdk/__init__.py similarity index 100% rename from python/flowsdk/__init__.py rename to python/package/flowsdk/__init__.py diff --git a/python/pyproject.toml b/python/package/pyproject.toml similarity index 100% rename from python/pyproject.toml rename to python/package/pyproject.toml diff --git a/python/setup.py b/python/package/setup.py similarity index 100% rename from python/setup.py rename to python/package/setup.py diff --git a/scripts/build_python_bindings.sh b/scripts/build_python_bindings.sh index 2744b27f..7e8ba45f 100755 --- a/scripts/build_python_bindings.sh +++ b/scripts/build_python_bindings.sh @@ -26,18 +26,18 @@ echo "Building flowsdk_ffi ($PROFILE)..." cargo build -p flowsdk_ffi --profile $CARGO_PROFILE echo "Generating Python bindings..." -# Output direct to python/flowsdk +# Output direct to python/package/flowsdk cargo run -p flowsdk_ffi --features=uniffi/cli --bin uniffi-bindgen generate \ --library "$TARGET_DIR/libflowsdk_ffi.$EXT" \ --language python \ - --out-dir python/flowsdk + --out-dir python/package/flowsdk echo "Copying library for Python package..." -cp "$TARGET_DIR/libflowsdk_ffi.$EXT" python/flowsdk/ +cp "$TARGET_DIR/libflowsdk_ffi.$EXT" python/package/flowsdk/ if [[ "$1" == "--test" ]]; then echo "Running Python verification..." - export PYTHONPATH=$PWD/python + export PYTHONPATH=$PWD/python/package # We use a temporary test script that imports from flowsdk python3 -c "import flowsdk; print('Import successful'); engine = flowsdk.MqttEngineFfi('test', 5); print('Engine created')" fi From 4f8ad640a3519522f0f48604dd1df61e6ad71da7 Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 2 Feb 2026 10:41:57 +0100 Subject: [PATCH 04/12] Add reusable FlowMqttClient async module --- python/examples/proper_async_client.py | 233 +++------------- python/examples/simple_async_usage.py | 28 ++ python/package/README.md | 90 ++++++- python/package/flowsdk/__init__.py | 7 + python/package/flowsdk/async_client.py | 358 +++++++++++++++++++++++++ 5 files changed, 510 insertions(+), 206 deletions(-) create mode 100644 python/examples/simple_async_usage.py create mode 100644 python/package/flowsdk/async_client.py diff --git a/python/examples/proper_async_client.py b/python/examples/proper_async_client.py index f1b100c4..e80e4cf4 100644 --- a/python/examples/proper_async_client.py +++ b/python/examples/proper_async_client.py @@ -1,232 +1,59 @@ -import asyncio -""" -A proper async MQTT client implementation using flowsdk-ffi with asyncio. -This module provides an asyncio-based MQTT client that wraps the flowsdk_ffi -library, providing async/await support for MQTT operations. It implements a -custom Protocol for handling the network layer and manages the timing and -pumping of the MQTT engine. -Classes: - FlowMqttProtocol: asyncio.Protocol implementation that manages the network - layer and handles engine ticks and data pumping. - FlowMqttClient: High-level async MQTT client providing connect, subscribe, - publish, and disconnect operations. -Example: - >>> async def example(): - ... client = FlowMqttClient("my_client_id") - ... await client.connect("broker.emqx.io", 1883) - ... await client.subscribe("test/topic", 1) - ... await client.publish("test/topic", b"Hello", 1) - ... await client.disconnect() """ -import flowsdk_ffi -import time -import socket -from typing import Optional, Dict, Any - - - - -class FlowMqttProtocol(asyncio.Protocol): - def __init__(self, engine: flowsdk_ffi.MqttEngineFfi, loop: asyncio.AbstractEventLoop, on_event_cb): - self.engine = engine - self.loop = loop - self.transport: Optional[asyncio.Transport] = None - self.on_event_cb = on_event_cb - self.start_time = time.monotonic() - self._tick_handle: Optional[asyncio.TimerHandle] = None - self.closed = False - - def connection_made(self, transport: asyncio.Transport): - self.transport = transport - print("Protocol: Connection made") - - # Start the engine - self.engine.connect() - - # Start the tick loop immediately - self._schedule_tick(0) - - def data_received(self, data: bytes): - # Feed network data to engine - self.engine.handle_incoming(data) - # Trigger an immediate pump to process potential responses - self._pump() - - def connection_lost(self, exc: Optional[Exception]): - print(f"Protocol: Connection lost: {exc}") - self.closed = True - self.engine.handle_connection_lost() - if self._tick_handle: - self._tick_handle.cancel() - - def _schedule_tick(self, delay_sec: float): - if self.closed: - return - # Cancel previous handle if it exists to avoid overlaps (though logic shouldn't allow it) - if self._tick_handle: - self._tick_handle.cancel() - - self._tick_handle = self.loop.call_later(delay_sec, self._on_timer) - - def _on_timer(self): - if self.closed: - return - - # Run protocol tick - now_ms = int((time.monotonic() - self.start_time) * 1000) - self.engine.handle_tick(now_ms) - - self._pump() - - # Schedule next tick - next_tick_ms = self.engine.next_tick_ms() - if next_tick_ms < 0: - # Should not happen typically unless engine is dead - delay = 0.1 - else: - delay = max(0, (next_tick_ms - now_ms) / 1000.0) - - self._schedule_tick(delay) - - def _pump(self): - # 1. Send outgoing data - outgoing = self.engine.take_outgoing() - if outgoing and self.transport and not self.transport.is_closing(): - self.transport.write(outgoing) - - # 2. Process events - events = self.engine.take_events() - for ev in events: - self.on_event_cb(ev) +Example demonstrating the high-level FlowMqttClient for asyncio applications. -class FlowMqttClient: - def __init__(self, client_id: str): - self.opts = flowsdk_ffi.MqttOptionsFfi( - client_id=client_id, - mqtt_version=5, - clean_start=True, - keep_alive=30, - username=None, - password=None, - reconnect_base_delay_ms=1000, - reconnect_max_delay_ms=30000, - max_reconnect_attempts=0 - ) - self.engine = flowsdk_ffi.MqttEngineFfi.new_with_opts(self.opts) - self.loop = asyncio.get_running_loop() - self.protocol: Optional[FlowMqttProtocol] = None - - # Futures for pending operations - # Connect is special, tracked by a single future - self._connect_future: asyncio.Future = self.loop.create_future() - - # Map PacketID -> Future - self._pending_publish: Dict[int, asyncio.Future] = {} - self._pending_subscribe: Dict[int, asyncio.Future] = {} - - async def connect(self, host: str, port: int): - print(f"Client: Connecting to {host}:{port}...") - - transport, protocol = await self.loop.create_connection( - lambda: FlowMqttProtocol(self.engine, self.loop, self._on_event), - host, port - ) - self.protocol = protocol - - # Wait for actual MQTT connection - await self._connect_future - print("Client: MQTT Connected!") - - async def subscribe(self, topic: str, qos: int) -> int: - pid = self.engine.subscribe(topic, qos) - fut = self.loop.create_future() - self._pending_subscribe[pid] = fut - - # Force a pump to send the packet immediately - if self.protocol: - self.protocol._pump() - - print(f"Client: Awaiting Ack for Subscribe (PID: {pid})...") - await fut - return pid +This example shows how to use the FlowMqttClient class from the flowsdk package +to easily connect, subscribe, publish, and disconnect from an MQTT broker using +async/await syntax. +""" - async def publish(self, topic: str, payload: bytes, qos: int) -> int: - pid = self.engine.publish(topic, payload, qos, None) - fut = self.loop.create_future() - self._pending_publish[pid] = fut - - # Force a pump to send the packet immediately - if self.protocol: - self.protocol._pump() - - print(f"Client: Awaiting Ack for Publish (PID: {pid})...") - await fut - return pid - - async def disconnect(self): - print("Client: Disconnecting...") - self.engine.disconnect() - if self.protocol: - self.protocol._pump() # Flush DISCONNECT packet - if self.protocol.transport: - self.protocol.transport.close() - self.protocol.closed = True +import asyncio +import time +from flowsdk import FlowMqttClient - def _on_event(self, ev): - if ev.is_connected(): - if not self._connect_future.done(): - self._connect_future.set_result(True) - - elif ev.is_published(): - # Packet Acked - res = ev[0] - pid = res.packet_id - if pid in self._pending_publish: - fut = self._pending_publish.pop(pid) - if not fut.done(): - fut.set_result(res) - print(f"Client: Future resolved for Publish PID {pid}") - elif ev.is_subscribed(): - # Sub Acked - res = ev[0] - pid = res.packet_id - if pid in self._pending_subscribe: - fut = self._pending_subscribe.pop(pid) - if not fut.done(): - fut.set_result(res) - print(f"Client: Future resolved for Subscribe PID {pid}") +def on_message(topic: str, payload: bytes, qos: int): + """Callback for received messages.""" + print(f"******** MSG RECEIVED ********") + print(f"Topic: {topic}") + print(f"Payload: {payload.decode(errors='replace')}") + print(f"QoS: {qos}") + print(f"******************************") - elif ev.is_message_received(): - msg = ev[0] - print(f"******** MSG RECEIVED ********") - print(f"Topic: {msg.topic}") - print(f"Payload: {msg.payload.decode(errors='replace')}") - print(f"******************************") - - elif ev.is_disconnected(): - pass async def main(): + # Create client with message callback client_id = f"python_proper_{int(time.time() % 10000)}" - client = FlowMqttClient(client_id) + client = FlowMqttClient(client_id, on_message=on_message) try: + # Connect to broker + print(f"Connecting to broker.emqx.io:1883...") await client.connect("broker.emqx.io", 1883) + print("āœ… Connected!") # Subscribe and wait for ack + print("Subscribing to test/python/proper...") await client.subscribe("test/python/proper", 1) print("āœ… Subscribe completed!") # Publish and wait for ack + print("Publishing message...") await client.publish("test/python/proper", b"Hello from Proper Client!", 1) print("āœ… Publish completed!") # Wait a bit to receive the message back + print("Waiting for messages...") await asyncio.sleep(2) finally: + print("Disconnecting...") await client.disconnect() + print("āœ… Disconnected!") + + +if __name__ == "__main__": + asyncio.run(main()) + if __name__ == "__main__": asyncio.run(main()) diff --git a/python/examples/simple_async_usage.py b/python/examples/simple_async_usage.py new file mode 100644 index 00000000..f21cb7f3 --- /dev/null +++ b/python/examples/simple_async_usage.py @@ -0,0 +1,28 @@ +""" +Simple example showing minimal FlowMqttClient usage. + +This demonstrates the most basic async MQTT client usage pattern. +""" + +import asyncio +from flowsdk import FlowMqttClient + + +async def main(): + # Create and connect + client = FlowMqttClient( + "simple_client", + on_message=lambda topic, payload, qos: print(f"šŸ“Ø {topic}: {payload}") + ) + + await client.connect("broker.emqx.io", 1883) + await client.subscribe("test/#", 1) + await client.publish("test/hello", b"Hello World!", 1) + + # Keep running to receive messages + await asyncio.sleep(5) + await client.disconnect() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/package/README.md b/python/package/README.md index 203a2b99..74b3a9a2 100644 --- a/python/package/README.md +++ b/python/package/README.md @@ -1,6 +1,6 @@ # FlowSDK Python Bindings -Python bindings for [FlowSDK](https://github.com/emqx/flowsdk). +Python bindings for [FlowSDK](https://github.com/emqx/flowsdk), providing both low-level FFI access and high-level async MQTT client. ## Installation @@ -8,11 +8,95 @@ Python bindings for [FlowSDK](https://github.com/emqx/flowsdk). pip install flowsdk ``` -## Usage +## Quick Start + +### Async Client (Recommended) + +The high-level async client provides a simple asyncio-based API: + +```python +import asyncio +from flowsdk import FlowMqttClient + +async def main(): + # Create client with message callback + client = FlowMqttClient( + "my_client_id", + on_message=lambda topic, payload, qos: print(f"{topic}: {payload}") + ) + + # Connect and use + await client.connect("broker.emqx.io", 1883) + await client.subscribe("test/topic", qos=1) + await client.publish("test/topic", b"Hello World!", qos=1) + + await asyncio.sleep(2) # Wait for messages + await client.disconnect() + +asyncio.run(main()) +``` + +### Low-Level FFI + +For advanced use cases requiring manual control: ```python import flowsdk -engine = flowsdk.MqttEngineFfi("client_id", 5) +# Create engine +engine = flowsdk.MqttEngineFfi("client_id", mqtt_version=5) engine.connect() + +# Manual network I/O required (see examples/) +``` + +## Features + +- āœ… MQTT 3.1.1 and 5.0 support +- āœ… Async/await API with asyncio +- āœ… TLS/SSL support +- āœ… QUIC transport support +- āœ… QoS 0, 1, 2 support +- āœ… Clean session and persistent sessions +- āœ… Last Will and Testament +- āœ… Automatic reconnection + +## Examples + +See the [examples directory](../../examples/) for more usage patterns: + +- `simple_async_usage.py` - Minimal async client example +- `proper_async_client.py` - Full-featured async client +- `select_example.py` - Select-based non-blocking I/O +- `async_example.py` - Manual asyncio with low-level API +- `test_binding.py` - FFI binding tests + +## API Reference + +### FlowMqttClient + +High-level async MQTT client. + +**Constructor:** +```python +FlowMqttClient( + client_id: str, + mqtt_version: int = 5, # 3 or 5 + clean_start: bool = True, + keep_alive: int = 30, # seconds + username: Optional[str] = None, + password: Optional[str] = None, + on_message: Optional[Callable[[str, bytes, int], None]] = None +) ``` + +**Methods:** +- `async connect(host: str, port: int)` - Connect to broker +- `async subscribe(topic: str, qos: int = 0) -> int` - Subscribe to topic +- `async unsubscribe(topic: str) -> int` - Unsubscribe from topic +- `async publish(topic: str, payload: bytes, qos: int = 0, retain: bool = None) -> int` - Publish message +- `async disconnect()` - Disconnect gracefully + +## License + +Mozilla Public License 2.0 diff --git a/python/package/flowsdk/__init__.py b/python/package/flowsdk/__init__.py index 3509d4e9..303b9de3 100644 --- a/python/package/flowsdk/__init__.py +++ b/python/package/flowsdk/__init__.py @@ -2,3 +2,10 @@ from .flowsdk_ffi import * except ImportError: pass + +try: + from .async_client import FlowMqttClient, FlowMqttProtocol +except ImportError: + pass + +__all__ = ['FlowMqttClient', 'FlowMqttProtocol', 'MqttEngineFfi', 'TlsMqttEngineFfi', 'QuicMqttEngineFfi', 'MqttOptionsFfi'] diff --git a/python/package/flowsdk/async_client.py b/python/package/flowsdk/async_client.py new file mode 100644 index 00000000..35017628 --- /dev/null +++ b/python/package/flowsdk/async_client.py @@ -0,0 +1,358 @@ +""" +A production-ready async MQTT client implementation using flowsdk-ffi with asyncio. + +This module provides an asyncio-based MQTT client that wraps the flowsdk_ffi +library, providing async/await support for MQTT operations. It implements a +custom Protocol for handling the network layer and manages the timing and +pumping of the MQTT engine. + +Classes: + FlowMqttProtocol: asyncio.Protocol implementation that manages the network + layer and handles engine ticks and data pumping. + FlowMqttClient: High-level async MQTT client providing connect, subscribe, + publish, and disconnect operations. + +Example: + >>> import asyncio + >>> from flowsdk import FlowMqttClient + >>> + >>> async def example(): + ... client = FlowMqttClient("my_client_id") + ... await client.connect("broker.emqx.io", 1883) + ... await client.subscribe("test/topic", 1) + ... await client.publish("test/topic", b"Hello", 1) + ... await client.disconnect() + >>> + >>> asyncio.run(example()) +""" + +import asyncio +import time +from typing import Optional, Dict, Callable, Any + +try: + from . import flowsdk_ffi +except ImportError: + import flowsdk_ffi + + +class FlowMqttProtocol(asyncio.Protocol): + """ + asyncio Protocol implementation for MQTT engine. + + This class handles the network layer, manages engine ticks, and pumps + data between the network transport and the MQTT engine. + + Args: + engine: The MQTT engine instance from flowsdk_ffi + loop: The asyncio event loop + on_event_cb: Callback function to handle MQTT events + """ + + def __init__(self, engine, loop: asyncio.AbstractEventLoop, on_event_cb: Callable): + self.engine = engine + self.loop = loop + self.transport: Optional[asyncio.Transport] = None + self.on_event_cb = on_event_cb + self.start_time = time.monotonic() + self._tick_handle: Optional[asyncio.TimerHandle] = None + self.closed = False + + def connection_made(self, transport: asyncio.Transport): + """Called when TCP connection is established.""" + self.transport = transport + + # Start the MQTT connection + self.engine.connect() + + # Start the tick loop immediately + self._schedule_tick(0) + + def data_received(self, data: bytes): + """Called when data is received from the network.""" + # Feed network data to engine + self.engine.handle_incoming(data) + # Trigger an immediate pump to process potential responses + self._pump() + + def connection_lost(self, exc: Optional[Exception]): + """Called when the connection is closed.""" + self.closed = True + self.engine.handle_connection_lost() + if self._tick_handle: + self._tick_handle.cancel() + + def _schedule_tick(self, delay_sec: float): + """Schedule the next engine tick.""" + if self.closed: + return + + if self._tick_handle: + self._tick_handle.cancel() + + self._tick_handle = self.loop.call_later(delay_sec, self._on_timer) + + def _on_timer(self): + """Handle scheduled engine tick.""" + if self.closed: + return + + # Run protocol tick + now_ms = int((time.monotonic() - self.start_time) * 1000) + self.engine.handle_tick(now_ms) + + self._pump() + + # Schedule next tick + next_tick_ms = self.engine.next_tick_ms() + if next_tick_ms < 0: + delay = 0.1 + else: + delay = max(0, (next_tick_ms - now_ms) / 1000.0) + + self._schedule_tick(delay) + + def _pump(self): + """Pump data between engine and network.""" + # 1. Send outgoing data + outgoing = self.engine.take_outgoing() + if outgoing and self.transport and not self.transport.is_closing(): + self.transport.write(outgoing) + + # 2. Process events + events = self.engine.take_events() + for ev in events: + self.on_event_cb(ev) + + +class FlowMqttClient: + """ + High-level async MQTT client. + + This client provides a simple async/await API for MQTT operations, + handling connection management, subscriptions, and publishing. + + Args: + client_id: MQTT client identifier + mqtt_version: MQTT protocol version (3 or 5, default: 5) + clean_start: Whether to start a clean session (default: True) + keep_alive: Keep-alive interval in seconds (default: 30) + username: Optional username for authentication + password: Optional password for authentication + reconnect_base_delay_ms: Base delay for reconnection attempts (default: 1000) + reconnect_max_delay_ms: Maximum delay for reconnection (default: 30000) + max_reconnect_attempts: Maximum reconnection attempts, 0 for infinite (default: 0) + on_message: Optional callback for received messages: fn(topic: str, payload: bytes, qos: int) + + Example: + >>> async def example(): + ... client = FlowMqttClient("my_client", on_message=lambda t, p, q: print(f"{t}: {p}")) + ... await client.connect("broker.emqx.io", 1883) + ... await client.subscribe("test/topic", 1) + ... await client.publish("test/topic", b"Hello", 1) + ... await asyncio.sleep(1) # Wait for message + ... await client.disconnect() + """ + + def __init__( + self, + client_id: str, + mqtt_version: int = 5, + clean_start: bool = True, + keep_alive: int = 30, + username: Optional[str] = None, + password: Optional[str] = None, + reconnect_base_delay_ms: int = 1000, + reconnect_max_delay_ms: int = 30000, + max_reconnect_attempts: int = 0, + on_message: Optional[Callable[[str, bytes, int], None]] = None + ): + self.opts = flowsdk_ffi.MqttOptionsFfi( + client_id=client_id, + mqtt_version=mqtt_version, + clean_start=clean_start, + keep_alive=keep_alive, + username=username, + password=password, + reconnect_base_delay_ms=reconnect_base_delay_ms, + reconnect_max_delay_ms=reconnect_max_delay_ms, + max_reconnect_attempts=max_reconnect_attempts + ) + self.engine = flowsdk_ffi.MqttEngineFfi.new_with_opts(self.opts) + self.protocol: Optional[FlowMqttProtocol] = None + self.on_message = on_message + + # Futures for pending operations + self._connect_future: Optional[asyncio.Future] = None + self._pending_publish: Dict[int, asyncio.Future] = {} + self._pending_subscribe: Dict[int, asyncio.Future] = {} + self._pending_unsubscribe: Dict[int, asyncio.Future] = {} + + async def connect(self, host: str, port: int): + """ + Connect to MQTT broker. + + Args: + host: Broker hostname or IP address + port: Broker port number + + Raises: + ConnectionError: If connection fails + """ + loop = asyncio.get_running_loop() + self._connect_future = loop.create_future() + + transport, protocol = await loop.create_connection( + lambda: FlowMqttProtocol(self.engine, loop, self._on_event), + host, port + ) + self.protocol = protocol + + # Wait for actual MQTT connection + await self._connect_future + + async def subscribe(self, topic: str, qos: int = 0) -> int: + """ + Subscribe to a topic. + + Args: + topic: MQTT topic to subscribe to + qos: Quality of Service level (0, 1, or 2) + + Returns: + Packet ID of the subscription + + Raises: + RuntimeError: If not connected + """ + if not self.protocol: + raise RuntimeError("Not connected") + + loop = asyncio.get_running_loop() + pid = self.engine.subscribe(topic, qos) + fut = loop.create_future() + self._pending_subscribe[pid] = fut + + # Force a pump to send the packet immediately + self.protocol._pump() + + await fut + return pid + + async def unsubscribe(self, topic: str) -> int: + """ + Unsubscribe from a topic. + + Args: + topic: MQTT topic to unsubscribe from + + Returns: + Packet ID of the unsubscription + + Raises: + RuntimeError: If not connected + """ + if not self.protocol: + raise RuntimeError("Not connected") + + loop = asyncio.get_running_loop() + pid = self.engine.unsubscribe(topic) + fut = loop.create_future() + self._pending_unsubscribe[pid] = fut + + # Force a pump to send the packet immediately + self.protocol._pump() + + await fut + return pid + + async def publish(self, topic: str, payload: bytes, qos: int = 0, retain: Optional[bool] = None) -> int: + """ + Publish a message. + + Args: + topic: MQTT topic to publish to + payload: Message payload as bytes + qos: Quality of Service level (0, 1, or 2) + retain: Whether to retain the message (None uses default) + + Returns: + Packet ID of the publish (0 for QoS 0) + + Raises: + RuntimeError: If not connected + """ + if not self.protocol: + raise RuntimeError("Not connected") + + loop = asyncio.get_running_loop() + pid = self.engine.publish(topic, payload, qos, retain) + + if qos > 0: + fut = loop.create_future() + self._pending_publish[pid] = fut + + # Force a pump to send the packet immediately + self.protocol._pump() + + await fut + else: + # QoS 0: just send immediately, no ack needed + self.protocol._pump() + + return pid + + async def disconnect(self): + """ + Disconnect from the broker gracefully. + """ + if not self.protocol: + return + + self.engine.disconnect() + self.protocol._pump() # Flush DISCONNECT packet + + if self.protocol.transport: + self.protocol.transport.close() + self.protocol.closed = True + + def _on_event(self, ev): + """Internal event handler.""" + if ev.is_connected(): + if self._connect_future and not self._connect_future.done(): + self._connect_future.set_result(True) + + elif ev.is_published(): + res = ev[0] + pid = res.packet_id + if pid in self._pending_publish: + fut = self._pending_publish.pop(pid) + if not fut.done(): + fut.set_result(res) + + elif ev.is_subscribed(): + res = ev[0] + pid = res.packet_id + if pid in self._pending_subscribe: + fut = self._pending_subscribe.pop(pid) + if not fut.done(): + fut.set_result(res) + + elif ev.is_unsubscribed(): + res = ev[0] + pid = res.packet_id + if pid in self._pending_unsubscribe: + fut = self._pending_unsubscribe.pop(pid) + if not fut.done(): + fut.set_result(res) + + elif ev.is_message_received(): + msg = ev[0] + if self.on_message: + self.on_message(msg.topic, msg.payload, msg.qos) + + elif ev.is_disconnected(): + pass + + +__all__ = ['FlowMqttClient', 'FlowMqttProtocol'] From 4974d2fe733571f1f3d18a24d49f8d64e6e04fe5 Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 2 Feb 2026 10:58:36 +0100 Subject: [PATCH 05/12] feat: Introduce unified FlowMqttClient for QUIC and TCP --- .gitignore | 7 + python/examples/README.md | 178 +++++++++++++ python/examples/async_quic_client_example.py | 67 +++++ python/package/flowsdk/__init__.py | 4 +- python/package/flowsdk/async_client.py | 247 +++++++++++++++++-- 5 files changed, 478 insertions(+), 25 deletions(-) create mode 100644 python/examples/README.md create mode 100644 python/examples/async_quic_client_example.py diff --git a/.gitignore b/.gitignore index f2ee3761..458bb937 100644 --- a/.gitignore +++ b/.gitignore @@ -8,8 +8,15 @@ Cargo.lock *.profraw *.profdata + +# FFI generated code +python/package/flowsdk/flowsdk_ffi.py +python/package/flowsdk/libflowsdk_ffi.dylib + # Python Examples (generated files) python/examples/flowsdk_ffi.py python/examples/*.dylib python/examples/*.so python/examples/*.dll + +python/**/*.pyc diff --git a/python/examples/README.md b/python/examples/README.md new file mode 100644 index 00000000..1dc7d3bb --- /dev/null +++ b/python/examples/README.md @@ -0,0 +1,178 @@ +# FlowSDK Python Examples + +This directory contains examples demonstrating various usage patterns of the FlowSDK Python bindings. + +## Quick Start Examples + +### 1. Simple Async Usage (`simple_async_usage.py`) +**Minimal async MQTT client example - Start here!** + +```bash +PYTHONPATH=../package python3 simple_async_usage.py +``` + +The simplest way to use FlowSDK with asyncio. Shows connect, subscribe, publish, and disconnect in just a few lines. + +### 2. Proper Async Client (`proper_async_client.py`) +**Full-featured async client with message callbacks** + +```bash +PYTHONPATH=../package python3 proper_async_client.py +``` + +Demonstrates the complete FlowMqttClient API with proper error handling and message callbacks. + +## Transport Examples + +### 3. QUIC Transport (`async_quic_client_example.py`) +**MQTT over QUIC for modern UDP-based transport** + +```bash +PYTHONPATH=../package python3 async_quic_client_example.py +``` + +Shows how to use MQTT over QUIC protocol, which provides: +- Built-in encryption (TLS 1.3) +- Connection migration +- Lower latency than TCP +- No head-of-line blocking + +**Requirements**: QUIC-enabled MQTT broker (e.g., EMQX 5.0+ with QUIC listener) + +## Low-Level Examples + +### 4. Select-Based Client (`select_example.py`) +**Non-blocking I/O using select()** + +```bash +PYTHONPATH=../package python3 select_example.py +``` + +Demonstrates manual socket management with the select() system call. Good for: +- Understanding low-level I/O +- Integration with non-asyncio event loops +- Single-threaded blocking I/O patterns + +### 5. Async with Manual I/O (`async_example.py`) +**Asyncio with manual socket operations** + +```bash +PYTHONPATH=../package python3 async_example.py +``` + +Shows how to manually integrate the MQTT engine with asyncio event loop using socket I/O. + +### 6. Async Interop (`async_interop_example.py`) +**Asyncio Protocol integration pattern** + +```bash +PYTHONPATH=../package python3 async_interop_example.py +``` + +Demonstrates using asyncio's Protocol pattern with add_reader/add_writer for event loop integration. + +## Testing + +### 7. Binding Tests (`test_binding.py`) +**Low-level FFI binding verification** + +```bash +PYTHONPATH=../package python3 test_binding.py +``` + +Tests the FFI bindings for: +- Standard MQTT engine (MqttEngineFfi) +- TLS MQTT engine (TlsMqttEngineFfi) +- QUIC MQTT engine (QuicMqttEngineFfi) + +## Architecture Overview + +### High-Level API (Recommended) +``` +FlowMqttClient → FlowMqttProtocol → MqttEngineFfi → Rust FFI +``` +- Simple async/await API +- Automatic network handling +- Built-in reconnection support + +### Low-Level API (Advanced) +``` +Your Code → MqttEngineFfi → Manual I/O → Network +``` +- Full control over networking +- Custom transport implementation +- Integration with existing event loops + +## MQTT Broker Setup + +### For TCP Examples (most examples) +Any MQTT broker works: +```bash +# Using mosquitto +mosquitto -p 1883 + +# Or use public broker +# broker.emqx.io:1883 (already configured in examples) +``` + +### For QUIC Example +Requires QUIC-enabled broker (EMQX 5.0+): +```bash +# EMQX with QUIC enabled +docker run -d --name emqx \ + -p 1883:1883 \ + -p 14567:14567/udp \ + emqx/emqx:latest + +# Configure QUIC listener in emqx.conf: +# listeners.quic.default { +# bind = "0.0.0.0:14567" +# max_connections = 1024000 +# } +``` + +## Common Issues + +### Import Error +``` +ModuleNotFoundError: No module named 'flowsdk' +``` +**Solution**: Set PYTHONPATH to the package directory: +```bash +export PYTHONPATH=/path/to/python/package +``` + +### Connection Refused +``` +ConnectionRefusedError: [Errno 61] Connection refused +``` +**Solution**: +- Ensure MQTT broker is running +- Check broker address and port +- For public brokers, check your internet connection + +### QUIC Connection Failed +``` +QUIC: Connection lost: None +``` +**Solution**: +- Verify QUIC listener is enabled on broker +- Check UDP port 14567 is open +- Try with `insecure_skip_verify=True` for testing + +## Learning Path + +1. **Start here**: `simple_async_usage.py` - Learn the basics +2. **Next**: `proper_async_client.py` - See full features +3. **Advanced**: `select_example.py` - Understand low-level I/O +4. **Modern**: `async_quic_client_example.py` - Try QUIC transport + +## More Information + +- [Package README](../package/README.md) - API reference +- [FlowSDK Docs](../../../docs/) - Protocol details +- [MQTT Specification](https://mqtt.org/mqtt-specification/) - MQTT protocol + +## License + +Mozilla Public License 2.0 diff --git a/python/examples/async_quic_client_example.py b/python/examples/async_quic_client_example.py new file mode 100644 index 00000000..3371450c --- /dev/null +++ b/python/examples/async_quic_client_example.py @@ -0,0 +1,67 @@ +""" +Example demonstrating MQTT over QUIC using flowsdk-ffi with asyncio. + +QUIC provides a modern UDP-based transport with built-in encryption and multiplexing. +This example shows how to use the unified FlowMqttClient with QUIC transport. + +Note: Requires a QUIC-enabled MQTT broker (e.g., EMQX with QUIC support). +""" + +import asyncio +from flowsdk import FlowMqttClient, TransportType + + +def on_message(topic: str, payload: bytes, qos: int): + """Callback for received messages.""" + print(f"šŸ“Ø Received: {topic} -> {payload.decode('utf-8')} (QoS {qos})") + + +async def main(): + """Main example demonstrating QUIC MQTT client.""" + print("šŸš€ Starting QUIC MQTT Client Example...") + + # Create QUIC client + client = FlowMqttClient( + client_id="quic_test_client", + transport=TransportType.QUIC, + mqtt_version=5, + insecure_skip_verify=True, # Skip TLS verification for demo + on_message=on_message + ) + + try: + # Connect to broker using QUIC + print("šŸ”Œ Connecting to broker.emqx.io:14567 via QUIC...") + await client.connect("broker.emqx.io", 14567, server_name="broker.emqx.io") + print("āœ… Connected!") + + # Subscribe to a test topic + topic = "flowsdk/quic/test" + print(f"šŸ“” Subscribing to {topic}...") + await client.subscribe(topic, qos=1) + print("āœ… Subscribed!") + + # Publish some test messages + for i in range(3): + message = f"QUIC message {i+1}" + print(f"šŸ“¤ Publishing: {message}") + await client.publish(topic, message.encode(), qos=1) + await asyncio.sleep(0.5) + + # Wait for messages + print("ā³ Waiting for messages (5 seconds)...") + await asyncio.sleep(5) + + # Disconnect + print("šŸ‘‹ Disconnecting...") + await client.disconnect() + print("āœ… Disconnected!") + + except Exception as e: + print(f"āŒ Error: {e}") + import traceback + traceback.print_exc() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/package/flowsdk/__init__.py b/python/package/flowsdk/__init__.py index 303b9de3..1f89937f 100644 --- a/python/package/flowsdk/__init__.py +++ b/python/package/flowsdk/__init__.py @@ -4,8 +4,8 @@ pass try: - from .async_client import FlowMqttClient, FlowMqttProtocol + from .async_client import FlowMqttClient, FlowMqttProtocol, TransportType except ImportError: pass -__all__ = ['FlowMqttClient', 'FlowMqttProtocol', 'MqttEngineFfi', 'TlsMqttEngineFfi', 'QuicMqttEngineFfi', 'MqttOptionsFfi'] +__all__ = ['FlowMqttClient', 'FlowMqttProtocol', 'TransportType', 'MqttEngineFfi', 'TlsMqttEngineFfi', 'QuicMqttEngineFfi', 'MqttOptionsFfi'] diff --git a/python/package/flowsdk/async_client.py b/python/package/flowsdk/async_client.py index 35017628..7c617def 100644 --- a/python/package/flowsdk/async_client.py +++ b/python/package/flowsdk/async_client.py @@ -6,13 +6,17 @@ custom Protocol for handling the network layer and manages the timing and pumping of the MQTT engine. +Supports multiple transports: + - TCP: Standard MQTT over TCP (default) + - QUIC: MQTT over QUIC protocol (UDP-based, built-in encryption) + Classes: - FlowMqttProtocol: asyncio.Protocol implementation that manages the network - layer and handles engine ticks and data pumping. - FlowMqttClient: High-level async MQTT client providing connect, subscribe, - publish, and disconnect operations. + TransportType: Enum for selecting MQTT transport protocol + FlowMqttProtocol: asyncio.Protocol implementation for TCP transport + FlowMqttDatagramProtocol: asyncio.DatagramProtocol for QUIC transport + FlowMqttClient: High-level async MQTT client supporting multiple transports -Example: +Example (TCP): >>> import asyncio >>> from flowsdk import FlowMqttClient >>> @@ -24,11 +28,26 @@ ... await client.disconnect() >>> >>> asyncio.run(example()) + +Example (QUIC): + >>> import asyncio + >>> from flowsdk import FlowMqttClient, TransportType + >>> + >>> async def example(): + ... client = FlowMqttClient("my_client_id", transport=TransportType.QUIC) + ... await client.connect("broker.emqx.io", 14567, server_name="broker.emqx.io") + ... await client.subscribe("test/topic", 1) + ... await client.publish("test/topic", b"Hello", 1) + ... await client.disconnect() + >>> + >>> asyncio.run(example()) """ import asyncio +import socket import time -from typing import Optional, Dict, Callable, Any +from enum import Enum +from typing import Optional, Dict, Callable, Any, List, Union try: from . import flowsdk_ffi @@ -36,6 +55,12 @@ import flowsdk_ffi +class TransportType(Enum): + """MQTT transport protocol types.""" + TCP = "tcp" + QUIC = "quic" + + class FlowMqttProtocol(asyncio.Protocol): """ asyncio Protocol implementation for MQTT engine. @@ -125,15 +150,119 @@ def _pump(self): self.on_event_cb(ev) +class FlowMqttDatagramProtocol(asyncio.DatagramProtocol): + """ + asyncio DatagramProtocol implementation for QUIC MQTT engine. + + This class handles UDP datagram transport for QUIC, managing engine ticks + and pumping data between the network transport and the MQTT engine. + + Args: + engine: The QUIC MQTT engine instance from flowsdk_ffi + loop: The asyncio event loop + on_event_cb: Callback function to handle MQTT events + """ + + def __init__(self, engine, loop: asyncio.AbstractEventLoop, on_event_cb: Callable): + self.engine = engine + self.loop = loop + self.transport: Optional[asyncio.DatagramTransport] = None + self.on_event_cb = on_event_cb + self.start_time = time.monotonic() + self._tick_handle: Optional[asyncio.TimerHandle] = None + self.closed = False + self.remote_addr = None + + def connection_made(self, transport: asyncio.DatagramTransport): + """Called when UDP socket is ready.""" + self.transport = transport + # Get the remote address from the transport + try: + self.remote_addr = transport.get_extra_info('peername') + except Exception: + pass + + # Don't call engine.connect() here - QUIC requires parameters + # Connection will be initiated from FlowMqttClient.connect() + + # Start the tick loop immediately + self._schedule_tick(0) + + def datagram_received(self, data: bytes, addr): + """Called when a datagram is received.""" + # Feed datagram to QUIC engine + self.remote_addr = addr + now_ms = int((time.monotonic() - self.start_time) * 1000) + addr_str = f"{addr[0]}:{addr[1]}" + self.engine.handle_datagram(data, addr_str, now_ms) + # Trigger an immediate pump to process responses + self._pump() + + def error_received(self, exc: Exception): + """Called when an error is received.""" + pass # UDP errors are generally non-fatal + + def connection_lost(self, exc: Optional[Exception]): + """Called when the connection is closed.""" + self.closed = True + if self._tick_handle: + self._tick_handle.cancel() + + def _schedule_tick(self, delay_sec: float): + """Schedule the next engine tick.""" + if self.closed: + return + + if self._tick_handle: + self._tick_handle.cancel() + + self._tick_handle = self.loop.call_later(delay_sec, self._on_timer) + + def _on_timer(self): + """Handle scheduled engine tick.""" + if self.closed: + return + + # Run protocol tick + now_ms = int((time.monotonic() - self.start_time) * 1000) + events = self.engine.handle_tick(now_ms) + + # Process tick events (QUIC returns events directly from handle_tick) + for ev in events: + self.on_event_cb(ev) + + self._pump() + + # Fixed 10ms interval for QUIC responsiveness + delay = 0.01 + self._schedule_tick(delay) + + def _pump(self): + """Pump data between engine and network.""" + # 1. Send outgoing datagrams + datagrams = self.engine.take_outgoing_datagrams() + if datagrams and self.transport and self.remote_addr: + for datagram in datagrams: + # Extract bytes from MqttDatagramFfi + self.transport.sendto(datagram.data, self.remote_addr) + + # 2. Process events + events = self.engine.take_events() + for ev in events: + self.on_event_cb(ev) + + + class FlowMqttClient: """ - High-level async MQTT client. + High-level async MQTT client supporting multiple transports (TCP, QUIC). This client provides a simple async/await API for MQTT operations, handling connection management, subscriptions, and publishing. Args: client_id: MQTT client identifier + transport: Transport type (TransportType.TCP or TransportType.QUIC, default: TCP) mqtt_version: MQTT protocol version (3 or 5, default: 5) clean_start: Whether to start a clean session (default: True) keep_alive: Keep-alive interval in seconds (default: 30) @@ -143,20 +272,30 @@ class FlowMqttClient: reconnect_max_delay_ms: Maximum delay for reconnection (default: 30000) max_reconnect_attempts: Maximum reconnection attempts, 0 for infinite (default: 0) on_message: Optional callback for received messages: fn(topic: str, payload: bytes, qos: int) + ca_cert_file: Path to CA certificate file for QUIC TLS (QUIC only) + insecure_skip_verify: Skip TLS verification for QUIC (QUIC only, default: False) + alpn_protocols: ALPN protocols for QUIC (QUIC only, default: ["mqtt"]) - Example: - >>> async def example(): - ... client = FlowMqttClient("my_client", on_message=lambda t, p, q: print(f"{t}: {p}")) + Examples: + TCP: + >>> async def tcp_example(): + ... client = FlowMqttClient("my_client", transport=TransportType.TCP) ... await client.connect("broker.emqx.io", 1883) - ... await client.subscribe("test/topic", 1) ... await client.publish("test/topic", b"Hello", 1) - ... await asyncio.sleep(1) # Wait for message + ... await client.disconnect() + + QUIC: + >>> async def quic_example(): + ... client = FlowMqttClient("my_client", transport=TransportType.QUIC, insecure_skip_verify=True) + ... await client.connect("broker.emqx.io", 14567, server_name="broker.emqx.io") + ... await client.publish("test/topic", b"Hello", 1) ... await client.disconnect() """ def __init__( self, client_id: str, + transport: TransportType = TransportType.TCP, mqtt_version: int = 5, clean_start: bool = True, keep_alive: int = 30, @@ -165,8 +304,12 @@ def __init__( reconnect_base_delay_ms: int = 1000, reconnect_max_delay_ms: int = 30000, max_reconnect_attempts: int = 0, - on_message: Optional[Callable[[str, bytes, int], None]] = None + on_message: Optional[Callable[[str, bytes, int], None]] = None, + ca_cert_file: Optional[str] = None, + insecure_skip_verify: bool = False, + alpn_protocols: Optional[List[str]] = None ): + self.transport_type = transport self.opts = flowsdk_ffi.MqttOptionsFfi( client_id=client_id, mqtt_version=mqtt_version, @@ -178,8 +321,25 @@ def __init__( reconnect_max_delay_ms=reconnect_max_delay_ms, max_reconnect_attempts=max_reconnect_attempts ) - self.engine = flowsdk_ffi.MqttEngineFfi.new_with_opts(self.opts) - self.protocol: Optional[FlowMqttProtocol] = None + + # Create appropriate engine based on transport type + if transport == TransportType.TCP: + self.engine = flowsdk_ffi.MqttEngineFfi.new_with_opts(self.opts) + elif transport == TransportType.QUIC: + # Build QUIC TLS options + tls_opts = flowsdk_ffi.MqttTlsOptionsFfi( + ca_cert_file=ca_cert_file, + client_cert_file=None, + client_key_file=None, + insecure_skip_verify=insecure_skip_verify, + alpn_protocols=alpn_protocols or ["mqtt"] + ) + self.engine = flowsdk_ffi.QuicMqttEngineFfi(self.opts) + self.quic_tls_opts = tls_opts + else: + raise ValueError(f"Unsupported transport type: {transport}") + + self.protocol: Optional[Union[FlowMqttProtocol, FlowMqttDatagramProtocol]] = None self.on_message = on_message # Futures for pending operations @@ -188,13 +348,14 @@ def __init__( self._pending_subscribe: Dict[int, asyncio.Future] = {} self._pending_unsubscribe: Dict[int, asyncio.Future] = {} - async def connect(self, host: str, port: int): + async def connect(self, host: str, port: int, server_name: Optional[str] = None): """ Connect to MQTT broker. Args: host: Broker hostname or IP address port: Broker port number + server_name: Server name for TLS SNI (required for QUIC) Raises: ConnectionError: If connection fails @@ -202,11 +363,41 @@ async def connect(self, host: str, port: int): loop = asyncio.get_running_loop() self._connect_future = loop.create_future() - transport, protocol = await loop.create_connection( - lambda: FlowMqttProtocol(self.engine, loop, self._on_event), - host, port - ) - self.protocol = protocol + if self.transport_type == TransportType.TCP: + # TCP connection using FlowMqttProtocol + transport, protocol = await loop.create_connection( + lambda: FlowMqttProtocol(self.engine, loop, self._on_event), + host, port + ) + self.protocol = protocol + + elif self.transport_type == TransportType.QUIC: + # QUIC connection using FlowMqttDatagramProtocol + if not server_name: + server_name = host # Default to host if not specified + + # Resolve hostname to IP address for QUIC + addr_info = socket.getaddrinfo( + host, port, + family=socket.AF_INET, + type=socket.SOCK_DGRAM + ) + if not addr_info: + raise ConnectionError(f"Could not resolve {host}") + + server_addr = f"{addr_info[0][4][0]}:{port}" + + # Create UDP socket and protocol + transport, protocol = await loop.create_datagram_endpoint( + lambda: FlowMqttDatagramProtocol(self.engine, loop, self._on_event), + remote_addr=addr_info[0][4] + ) + self.protocol = protocol + protocol.remote_addr = addr_info[0][4] + + # Initiate QUIC connection with required parameters + now_ms = int((time.monotonic() - protocol.start_time) * 1000) + self.engine.connect(server_addr, server_name, self.quic_tls_opts, now_ms) # Wait for actual MQTT connection await self._connect_future @@ -274,19 +465,29 @@ async def publish(self, topic: str, payload: bytes, qos: int = 0, retain: Option topic: MQTT topic to publish to payload: Message payload as bytes qos: Quality of Service level (0, 1, or 2) - retain: Whether to retain the message (None uses default) + retain: Whether to retain the message (None uses default, ignored for QUIC) Returns: Packet ID of the publish (0 for QoS 0) Raises: RuntimeError: If not connected + + Note: + QUIC transport does not support the retain parameter and priority. """ if not self.protocol: raise RuntimeError("Not connected") loop = asyncio.get_running_loop() - pid = self.engine.publish(topic, payload, qos, retain) + + # Handle different engine publish signatures + if self.transport_type == TransportType.TCP: + # TCP engine takes 4 args: topic, payload, qos, retain + pid = self.engine.publish(topic, payload, qos, retain) + elif self.transport_type == TransportType.QUIC: + # QUIC engine takes 3 args: topic, payload, qos + pid = self.engine.publish(topic, payload, qos) if qos > 0: fut = loop.create_future() From 8c0510036c027ea2f9e3aa640c37c1f38450dffb Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 2 Feb 2026 21:41:57 +0100 Subject: [PATCH 06/12] docs(python): update README --- python/examples/README.md | 38 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/python/examples/README.md b/python/examples/README.md index 1dc7d3bb..08a9202f 100644 --- a/python/examples/README.md +++ b/python/examples/README.md @@ -31,13 +31,18 @@ Demonstrates the complete FlowMqttClient API with proper error handling and mess PYTHONPATH=../package python3 async_quic_client_example.py ``` -Shows how to use MQTT over QUIC protocol, which provides: +Shows how to use MQTT over QUIC protocol with the unified `FlowMqttClient`. QUIC provides: - Built-in encryption (TLS 1.3) - Connection migration - Lower latency than TCP - No head-of-line blocking -**Requirements**: QUIC-enabled MQTT broker (e.g., EMQX 5.0+ with QUIC listener) +**Requirements**: QUIC-enabled MQTT broker (e.g., EMQX 5.0+ with QUIC listener on port 14567) + +**Key features**: +- Set `transport=TransportType.QUIC` in FlowMqttClient constructor +- Use `insecure_skip_verify=True` for testing (production should use proper TLS certs) +- Pass `server_name` parameter to `connect()` for TLS SNI ## Low-Level Examples @@ -103,6 +108,35 @@ Your Code → MqttEngineFfi → Manual I/O → Network - Custom transport implementation - Integration with existing event loops +## Transport Selection + +FlowSDK supports multiple transport protocols via the unified `FlowMqttClient`: + +### TCP (Default) +```python +from flowsdk import FlowMqttClient, TransportType + +client = FlowMqttClient("my_client", transport=TransportType.TCP) +await client.connect("broker.emqx.io", 1883) +``` +**Use when**: Standard MQTT deployments, maximum compatibility + +### QUIC +```python +client = FlowMqttClient( + "my_client", + transport=TransportType.QUIC, + insecure_skip_verify=True # For testing only +) +await client.connect("broker.emqx.io", 14567, server_name="broker.emqx.io") +``` +**Use when**: Low latency needed, mobile clients (connection migration), UDP-based networks + +**Note**: +- QUIC requires server_name for TLS SNI +- Use proper TLS certificates in production +- Not all brokers support QUIC (EMQX 5.0+ does) + ## MQTT Broker Setup ### For TCP Examples (most examples) From d5c7dd058434b418af91f335fc28f2b95f03896c Mon Sep 17 00:00:00 2001 From: William Yang Date: Tue, 3 Feb 2026 09:26:11 +0100 Subject: [PATCH 07/12] feat(ffi): Add TLS transport support to the async client (python) --- python/examples/README.md | 151 +++++----------- python/examples/async_example.py | 1 + python/examples/async_interop_example.py | 161 ------------------ python/examples/async_tls_client_example.py | 68 ++++++++ ...mple.py => asyncio_quic_client_example.py} | 14 +- ...lient.py => asyncio_tcp_client_example.py} | 8 +- python/package/flowsdk/async_client.py | 93 ++++++---- 7 files changed, 184 insertions(+), 312 deletions(-) delete mode 100644 python/examples/async_interop_example.py create mode 100644 python/examples/async_tls_client_example.py rename python/examples/{async_quic_client_example.py => asyncio_quic_client_example.py} (84%) rename python/examples/{proper_async_client.py => asyncio_tcp_client_example.py} (90%) diff --git a/python/examples/README.md b/python/examples/README.md index 08a9202f..1404f2a0 100644 --- a/python/examples/README.md +++ b/python/examples/README.md @@ -13,22 +13,34 @@ PYTHONPATH=../package python3 simple_async_usage.py The simplest way to use FlowSDK with asyncio. Shows connect, subscribe, publish, and disconnect in just a few lines. -### 2. Proper Async Client (`proper_async_client.py`) -**Full-featured async client with message callbacks** +### 2. Asyncio TCP Client (`asyncio_tcp_client_example.py`) +**Standard async client using TCP** ```bash -PYTHONPATH=../package python3 proper_async_client.py +PYTHONPATH=../package python3 asyncio_tcp_client_example.py ``` -Demonstrates the complete FlowMqttClient API with proper error handling and message callbacks. +Demonstrates the high-level `FlowMqttClient` API using standard TCP transport with message callbacks and proper error handling. -## Transport Examples +## Secure Transport Examples -### 3. QUIC Transport (`async_quic_client_example.py`) +### 3. TLS Transport (`async_tls_client_example.py`) +**MQTT over TLS for secure communication** + +```bash +PYTHONPATH=../package python3 async_tls_client_example.py +``` + +Shows how to use MQTT over TLS (port 8883) with the unified `FlowMqttClient`. Includes: +- Explicit TLS transport selection +- Certificate verification (default) +- SNI support via `server_name` + +### 4. QUIC Transport (`asyncio_quic_client_example.py`) **MQTT over QUIC for modern UDP-based transport** ```bash -PYTHONPATH=../package python3 async_quic_client_example.py +PYTHONPATH=../package python3 asyncio_quic_client_example.py ``` Shows how to use MQTT over QUIC protocol with the unified `FlowMqttClient`. QUIC provides: @@ -39,26 +51,18 @@ Shows how to use MQTT over QUIC protocol with the unified `FlowMqttClient`. QUIC **Requirements**: QUIC-enabled MQTT broker (e.g., EMQX 5.0+ with QUIC listener on port 14567) -**Key features**: -- Set `transport=TransportType.QUIC` in FlowMqttClient constructor -- Use `insecure_skip_verify=True` for testing (production should use proper TLS certs) -- Pass `server_name` parameter to `connect()` for TLS SNI - ## Low-Level Examples -### 4. Select-Based Client (`select_example.py`) +### 5. Select-Based Client (`select_example.py`) **Non-blocking I/O using select()** ```bash PYTHONPATH=../package python3 select_example.py ``` -Demonstrates manual socket management with the select() system call. Good for: -- Understanding low-level I/O -- Integration with non-asyncio event loops -- Single-threaded blocking I/O patterns +Demonstrates manual socket management with the select() system call. -### 5. Async with Manual I/O (`async_example.py`) +### 6. Async with Manual I/O (`async_example.py`) **Asyncio with manual socket operations** ```bash @@ -67,29 +71,15 @@ PYTHONPATH=../package python3 async_example.py Shows how to manually integrate the MQTT engine with asyncio event loop using socket I/O. -### 6. Async Interop (`async_interop_example.py`) -**Asyncio Protocol integration pattern** - -```bash -PYTHONPATH=../package python3 async_interop_example.py -``` - -Demonstrates using asyncio's Protocol pattern with add_reader/add_writer for event loop integration. - ## Testing -### 7. Binding Tests (`test_binding.py`) +### 8. Binding Tests (`test_binding.py`) **Low-level FFI binding verification** ```bash PYTHONPATH=../package python3 test_binding.py ``` -Tests the FFI bindings for: -- Standard MQTT engine (MqttEngineFfi) -- TLS MQTT engine (TlsMqttEngineFfi) -- QUIC MQTT engine (QuicMqttEngineFfi) - ## Architecture Overview ### High-Level API (Recommended) @@ -104,108 +94,43 @@ FlowMqttClient → FlowMqttProtocol → MqttEngineFfi → Rust FFI ``` Your Code → MqttEngineFfi → Manual I/O → Network ``` -- Full control over networking -- Custom transport implementation -- Integration with existing event loops ## Transport Selection FlowSDK supports multiple transport protocols via the unified `FlowMqttClient`: -### TCP (Default) +### TCP ```python -from flowsdk import FlowMqttClient, TransportType - client = FlowMqttClient("my_client", transport=TransportType.TCP) await client.connect("broker.emqx.io", 1883) ``` -**Use when**: Standard MQTT deployments, maximum compatibility + +### TLS +```python +client = FlowMqttClient( + "my_client", + transport=TransportType.TLS, + server_name="broker.emqx.io" +) +await client.connect("broker.emqx.io", 8883) +``` ### QUIC ```python client = FlowMqttClient( "my_client", transport=TransportType.QUIC, - insecure_skip_verify=True # For testing only + server_name="broker.emqx.io" ) await client.connect("broker.emqx.io", 14567, server_name="broker.emqx.io") ``` -**Use when**: Low latency needed, mobile clients (connection migration), UDP-based networks - -**Note**: -- QUIC requires server_name for TLS SNI -- Use proper TLS certificates in production -- Not all brokers support QUIC (EMQX 5.0+ does) - -## MQTT Broker Setup - -### For TCP Examples (most examples) -Any MQTT broker works: -```bash -# Using mosquitto -mosquitto -p 1883 - -# Or use public broker -# broker.emqx.io:1883 (already configured in examples) -``` - -### For QUIC Example -Requires QUIC-enabled broker (EMQX 5.0+): -```bash -# EMQX with QUIC enabled -docker run -d --name emqx \ - -p 1883:1883 \ - -p 14567:14567/udp \ - emqx/emqx:latest - -# Configure QUIC listener in emqx.conf: -# listeners.quic.default { -# bind = "0.0.0.0:14567" -# max_connections = 1024000 -# } -``` - -## Common Issues - -### Import Error -``` -ModuleNotFoundError: No module named 'flowsdk' -``` -**Solution**: Set PYTHONPATH to the package directory: -```bash -export PYTHONPATH=/path/to/python/package -``` - -### Connection Refused -``` -ConnectionRefusedError: [Errno 61] Connection refused -``` -**Solution**: -- Ensure MQTT broker is running -- Check broker address and port -- For public brokers, check your internet connection - -### QUIC Connection Failed -``` -QUIC: Connection lost: None -``` -**Solution**: -- Verify QUIC listener is enabled on broker -- Check UDP port 14567 is open -- Try with `insecure_skip_verify=True` for testing ## Learning Path 1. **Start here**: `simple_async_usage.py` - Learn the basics -2. **Next**: `proper_async_client.py` - See full features -3. **Advanced**: `select_example.py` - Understand low-level I/O -4. **Modern**: `async_quic_client_example.py` - Try QUIC transport - -## More Information - -- [Package README](../package/README.md) - API reference -- [FlowSDK Docs](../../../docs/) - Protocol details -- [MQTT Specification](https://mqtt.org/mqtt-specification/) - MQTT protocol +2. **Next**: `asyncio_tcp_client_example.py` - See full features +3. **Secure**: `async_tls_client_example.py` - Try TLS transport +4. **Modern**: `asyncio_quic_client_example.py` - Try QUIC transport ## License diff --git a/python/examples/async_example.py b/python/examples/async_example.py index f9bdc567..7c19dded 100644 --- a/python/examples/async_example.py +++ b/python/examples/async_example.py @@ -1,3 +1,4 @@ + import asyncio import flowsdk_ffi import time diff --git a/python/examples/async_interop_example.py b/python/examples/async_interop_example.py deleted file mode 100644 index 94140c99..00000000 --- a/python/examples/async_interop_example.py +++ /dev/null @@ -1,161 +0,0 @@ -import asyncio - -import flowsdk_ffi -import time -import socket - -""" -Asynchronous MQTT Client using flowsdk-ffi with asyncio integration. -This module demonstrates how to integrate the flowsdk-ffi MQTT engine with Python's -asyncio event loop using low-level socket operations and asyncio's add_reader/add_writer -callbacks. -Example: - Run the async interop client:: - $ python async_interop_example.py -Classes: - AsyncInteropClient: An asyncio-integrated MQTT client that uses non-blocking sockets - and the asyncio event loop for I/O operations. -The implementation showcases: - - Non-blocking TCP socket connection using asyncio.loop.sock_connect() - - Event-driven I/O using loop.add_reader() and loop.add_writer() - - Integration of flowsdk-ffi's tick-based engine with asyncio's event loop - - Handling MQTT events (connection, subscription, message reception, publishing) - - Proper resource cleanup and socket lifecycle management -Note: - This is an example demonstrating the interoperability pattern between - FlowSDK FFI MQTT engine and Python's asyncio framework. -""" - -class AsyncInteropClient: - def __init__(self, client_id): - self.opts = flowsdk_ffi.MqttOptionsFfi( - client_id=client_id, - mqtt_version=5, - clean_start=True, - keep_alive=60, - username=None, - password=None, - reconnect_base_delay_ms=1000, - reconnect_max_delay_ms=30000, - max_reconnect_attempts=0 - ) - self.engine = flowsdk_ffi.MqttEngineFfi.new_with_opts(self.opts) - self.start_time = time.monotonic() - self.outgoing_buffer = b"" - self.sock = None - self.loop = asyncio.get_running_loop() - self.connected_event = asyncio.Event() - - async def connect(self, host, port): - print(f"šŸ“” Connecting to {host}:{port}...") - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.setblocking(False) - - try: - self.sock.connect((host, port)) - except BlockingIOError: - pass - - # Use asyncio to wait for the socket to be writable (connection complete) - await self.loop.sock_connect(self.sock, (host, port)) - print("āœ… TCP Connected") - - # Start the engine logic - self.engine.connect() - - # Register reader - self.loop.add_reader(self.sock, self._on_read) - self._pump_logic() - - def _on_read(self): - try: - data = self.sock.recv(4096) - if data: - print(f"šŸ“„ Received {len(data)} bytes") - self.engine.handle_incoming(data) - self._pump_logic() - else: - print("šŸ“„ EOF") - self.stop() - except Exception as e: - print(f"āŒ Read error: {e}") - self.stop() - - def _on_write(self): - if not self.outgoing_buffer: - self.loop.remove_writer(self.sock) - return - - try: - sent = self.sock.send(self.outgoing_buffer) - print(f"šŸ“¤ Sent {sent} bytes") - self.outgoing_buffer = self.outgoing_buffer[sent:] - if not self.outgoing_buffer: - self.loop.remove_writer(self.sock) - except (BlockingIOError, InterruptedError): - pass - except Exception as e: - print(f"āŒ Write error: {e}") - self.stop() - - def _pump_logic(self): - # 1. Ticks - now_ms = int((time.monotonic() - self.start_time) * 1000) - self.engine.handle_tick(now_ms) - - # 2. Outgoing - new_data = self.engine.take_outgoing() - if new_data: - self.outgoing_buffer += new_data - self.loop.add_writer(self.sock, self._on_write) - - # 3. Events - events = self.engine.take_events() - for ev in events: - if ev.is_connected(): - print("āœ… MQTT Connected!") - self.connected_event.set() - elif ev.is_message_received(): - m = ev[0] - print(f"šŸ“Ø Message: {m.topic} -> {m.payload.decode()}") - elif ev.is_subscribed(): - print(f"āœ… Subscribed (PID: {ev[0].packet_id})") - elif ev.is_published(): - print(f"āœ… Published (PID: {ev[0].packet_id})") - - # 4. Schedule next tick - next_ms = self.engine.next_tick_ms() - if next_ms > 0: - delay = max(0, (next_ms - now_ms) / 1000.0) - self.loop.call_later(delay, self._pump_logic) - - def stop(self): - if self.sock: - self.loop.remove_reader(self.sock) - self.loop.remove_writer(self.sock) - self.sock.close() - self.sock = None - print("šŸ›‘ Client stopped") - -async def main(): - print("šŸš€ asyncio Interop Example (Using add_reader/add_writer)") - print("=" * 70) - - client = AsyncInteropClient(f"python_async_interop_{int(time.time() % 10000)}") - await client.connect("broker.emqx.io", 1883) - - await client.connected_event.wait() - - # Example operations - client.engine.subscribe("test/python/interop", 1) - client._pump_logic() # Trigger immediate pump to send subscribe - - client.engine.publish("test/python/interop", b"Hello from Interop!", 1, None) - client._pump_logic() # Trigger immediate pump to send publish - - # Run for a bit to receive messages - await asyncio.sleep(5) - client.stop() - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/python/examples/async_tls_client_example.py b/python/examples/async_tls_client_example.py new file mode 100644 index 00000000..90518f66 --- /dev/null +++ b/python/examples/async_tls_client_example.py @@ -0,0 +1,68 @@ +""" +Example demonstrating the high-level FlowMqttClient for asyncio applications. + +This example shows how to use the FlowMqttClient class from the flowsdk package +to easily connect, subscribe, publish, and disconnect from an MQTT broker using +async/await syntax. +""" +import asyncio +import time +import os +import sys + +# Add the package to path +sys.path.append(os.path.join( "python", "package")) + +from flowsdk import FlowMqttClient, TransportType + +async def test_tls(): + print("šŸš€ Testing TLS Transport Support...") + + msg_received = asyncio.Event() + + def on_message(topic: str, payload: bytes, qos: int): + print(f"šŸ“Ø Received message on {topic}: {payload.decode()}") + if topic == "test/python/tls" and payload == b"Hello TLS!": + msg_received.set() + + # We'll use broker.emqx.io:8883 which supports TLS + client = FlowMqttClient( + client_id=f"python_tls_test_{int(time.time() % 10000)}", + transport=TransportType.TLS, + insecure_skip_verify=False, # Skip verification for simplicity in test + server_name="broker.emqx.io", + on_message=on_message + ) + + try: + host = "broker.emqx.io" + port = 8883 + print(f"šŸ“” Connecting to {host}:{port} (TLS)...") + await client.connect(host, port) + print("āœ… TLS Connected!") + + await client.subscribe("test/python/tls", 1) + print("āœ… Subscribed!") + + await client.publish("test/python/tls", b"Hello TLS!", 1) + print("āœ… Published!") + + # Wait for the message with a timeout + print("ā³ Waiting for message reception...") + try: + await asyncio.wait_for(msg_received.wait(), timeout=5.0) + print("✨ Message verification successful!") + except asyncio.TimeoutError: + print("āŒ Timeout waiting for message!") + + except Exception as e: + print(f"āŒ TLS Test Failed: {e}") + import traceback + traceback.print_exc() + finally: + print("šŸ‘‹ Disconnecting...") + await client.disconnect() + print("āœ… Disconnected!") + +if __name__ == "__main__": + asyncio.run(test_tls()) diff --git a/python/examples/async_quic_client_example.py b/python/examples/asyncio_quic_client_example.py similarity index 84% rename from python/examples/async_quic_client_example.py rename to python/examples/asyncio_quic_client_example.py index 3371450c..ee85ef0c 100644 --- a/python/examples/async_quic_client_example.py +++ b/python/examples/asyncio_quic_client_example.py @@ -2,12 +2,18 @@ Example demonstrating MQTT over QUIC using flowsdk-ffi with asyncio. QUIC provides a modern UDP-based transport with built-in encryption and multiplexing. -This example shows how to use the unified FlowMqttClient with QUIC transport. +This example shows how to use the FlowMqttClient with QUIC transport. Note: Requires a QUIC-enabled MQTT broker (e.g., EMQX with QUIC support). """ import asyncio +import os +import sys + +# Add the package to path +sys.path.append(os.path.join("python", "package")) + from flowsdk import FlowMqttClient, TransportType @@ -31,8 +37,10 @@ async def main(): try: # Connect to broker using QUIC - print("šŸ”Œ Connecting to broker.emqx.io:14567 via QUIC...") - await client.connect("broker.emqx.io", 14567, server_name="broker.emqx.io") + host = "broker.emqx.io" + port = 14567 + print(f"šŸ”Œ Connecting to {host}:{port} via QUIC...") + await client.connect(host, port, server_name="broker.emqx.io") print("āœ… Connected!") # Subscribe to a test topic diff --git a/python/examples/proper_async_client.py b/python/examples/asyncio_tcp_client_example.py similarity index 90% rename from python/examples/proper_async_client.py rename to python/examples/asyncio_tcp_client_example.py index e80e4cf4..571fa426 100644 --- a/python/examples/proper_async_client.py +++ b/python/examples/asyncio_tcp_client_example.py @@ -4,6 +4,8 @@ This example shows how to use the FlowMqttClient class from the flowsdk package to easily connect, subscribe, publish, and disconnect from an MQTT broker using async/await syntax. + +@TODO: Add more features here """ import asyncio @@ -27,8 +29,10 @@ async def main(): try: # Connect to broker - print(f"Connecting to broker.emqx.io:1883...") - await client.connect("broker.emqx.io", 1883) + host = "broker.emqx.io" + port = 1883 + print(f"Connecting to {host}:{port}...") + await client.connect(host, port) print("āœ… Connected!") # Subscribe and wait for ack diff --git a/python/package/flowsdk/async_client.py b/python/package/flowsdk/async_client.py index 7c617def..7068664e 100644 --- a/python/package/flowsdk/async_client.py +++ b/python/package/flowsdk/async_client.py @@ -58,24 +58,27 @@ class TransportType(Enum): """MQTT transport protocol types.""" TCP = "tcp" + TLS = "tls" QUIC = "quic" class FlowMqttProtocol(asyncio.Protocol): """ - asyncio Protocol implementation for MQTT engine. + asyncio Protocol implementation for MQTT engine (TCP/TLS). This class handles the network layer, manages engine ticks, and pumps data between the network transport and the MQTT engine. Args: - engine: The MQTT engine instance from flowsdk_ffi + engine: The MQTT engine instance + transport_type: The transport protocol being used loop: The asyncio event loop on_event_cb: Callback function to handle MQTT events """ - def __init__(self, engine, loop: asyncio.AbstractEventLoop, on_event_cb: Callable): + def __init__(self, engine, transport_type: TransportType, loop: asyncio.AbstractEventLoop, on_event_cb: Callable): self.engine = engine + self.transport_type = transport_type self.loop = loop self.transport: Optional[asyncio.Transport] = None self.on_event_cb = on_event_cb @@ -84,11 +87,12 @@ def __init__(self, engine, loop: asyncio.AbstractEventLoop, on_event_cb: Callabl self.closed = False def connection_made(self, transport: asyncio.Transport): - """Called when TCP connection is established.""" + """Called when connection is established.""" self.transport = transport # Start the MQTT connection - self.engine.connect() + if hasattr(self.engine, 'connect'): + self.engine.connect() # Start the tick loop immediately self._schedule_tick(0) @@ -96,14 +100,21 @@ def connection_made(self, transport: asyncio.Transport): def data_received(self, data: bytes): """Called when data is received from the network.""" # Feed network data to engine - self.engine.handle_incoming(data) + if self.transport_type == TransportType.TLS: + self.engine.handle_socket_data(data) + else: + self.engine.handle_incoming(data) + # Trigger an immediate pump to process potential responses self._pump() def connection_lost(self, exc: Optional[Exception]): """Called when the connection is closed.""" self.closed = True - self.engine.handle_connection_lost() + # Only TCP engine has handle_connection_lost + if self.transport_type == TransportType.TCP and hasattr(self.engine, 'handle_connection_lost'): + self.engine.handle_connection_lost() + if self._tick_handle: self._tick_handle.cancel() @@ -129,18 +140,26 @@ def _on_timer(self): self._pump() # Schedule next tick - next_tick_ms = self.engine.next_tick_ms() - if next_tick_ms < 0: - delay = 0.1 + if self.transport_type == TransportType.TCP: + next_tick_ms = self.engine.next_tick_ms() + if next_tick_ms < 0: + delay = 0.1 + else: + delay = max(0, (next_tick_ms - now_ms) / 1000.0) else: - delay = max(0, (next_tick_ms - now_ms) / 1000.0) + # Fixed 10ms for TLS/QUIC + delay = 0.01 self._schedule_tick(delay) def _pump(self): """Pump data between engine and network.""" # 1. Send outgoing data - outgoing = self.engine.take_outgoing() + if self.transport_type == TransportType.TLS: + outgoing = self.engine.take_socket_data() + else: + outgoing = self.engine.take_outgoing() + if outgoing and self.transport and not self.transport.is_closing(): self.transport.write(outgoing) @@ -225,15 +244,14 @@ def _on_timer(self): # Run protocol tick now_ms = int((time.monotonic() - self.start_time) * 1000) + # QUIC engine handle_tick returns events events = self.engine.handle_tick(now_ms) - - # Process tick events (QUIC returns events directly from handle_tick) for ev in events: self.on_event_cb(ev) self._pump() - # Fixed 10ms interval for QUIC responsiveness + # Fixed 10ms interval for QUIC delay = 0.01 self._schedule_tick(delay) @@ -306,10 +324,14 @@ def __init__( max_reconnect_attempts: int = 0, on_message: Optional[Callable[[str, bytes, int], None]] = None, ca_cert_file: Optional[str] = None, + client_cert_file: Optional[str] = None, + client_key_file: Optional[str] = None, insecure_skip_verify: bool = False, - alpn_protocols: Optional[List[str]] = None + alpn_protocols: Optional[List[str]] = None, + server_name: Optional[str] = None ): self.transport_type = transport + self.server_name = server_name self.opts = flowsdk_ffi.MqttOptionsFfi( client_id=client_id, mqtt_version=mqtt_version, @@ -322,20 +344,24 @@ def __init__( max_reconnect_attempts=max_reconnect_attempts ) + # Common TLS/QUIC options + self.tls_opts = flowsdk_ffi.MqttTlsOptionsFfi( + ca_cert_file=ca_cert_file, + client_cert_file=client_cert_file, + client_key_file=client_key_file, + insecure_skip_verify=insecure_skip_verify, + alpn_protocols=alpn_protocols or ["mqtt"] + ) + # Create appropriate engine based on transport type if transport == TransportType.TCP: self.engine = flowsdk_ffi.MqttEngineFfi.new_with_opts(self.opts) + elif transport == TransportType.TLS: + if not server_name: + raise ValueError("server_name (SNI) is required for TLS transport") + self.engine = flowsdk_ffi.TlsMqttEngineFfi(self.opts, self.tls_opts, server_name) elif transport == TransportType.QUIC: - # Build QUIC TLS options - tls_opts = flowsdk_ffi.MqttTlsOptionsFfi( - ca_cert_file=ca_cert_file, - client_cert_file=None, - client_key_file=None, - insecure_skip_verify=insecure_skip_verify, - alpn_protocols=alpn_protocols or ["mqtt"] - ) self.engine = flowsdk_ffi.QuicMqttEngineFfi(self.opts) - self.quic_tls_opts = tls_opts else: raise ValueError(f"Unsupported transport type: {transport}") @@ -363,10 +389,10 @@ async def connect(self, host: str, port: int, server_name: Optional[str] = None) loop = asyncio.get_running_loop() self._connect_future = loop.create_future() - if self.transport_type == TransportType.TCP: - # TCP connection using FlowMqttProtocol + if self.transport_type in (TransportType.TCP, TransportType.TLS): + # Stream-based connection (TCP or TLS) transport, protocol = await loop.create_connection( - lambda: FlowMqttProtocol(self.engine, loop, self._on_event), + lambda: FlowMqttProtocol(self.engine, self.transport_type, loop, self._on_event), host, port ) self.protocol = protocol @@ -397,7 +423,7 @@ async def connect(self, host: str, port: int, server_name: Optional[str] = None) # Initiate QUIC connection with required parameters now_ms = int((time.monotonic() - protocol.start_time) * 1000) - self.engine.connect(server_addr, server_name, self.quic_tls_opts, now_ms) + self.engine.connect(server_addr, server_name, self.tls_opts, now_ms) # Wait for actual MQTT connection await self._connect_future @@ -483,10 +509,11 @@ async def publish(self, topic: str, payload: bytes, qos: int = 0, retain: Option # Handle different engine publish signatures if self.transport_type == TransportType.TCP: - # TCP engine takes 4 args: topic, payload, qos, retain - pid = self.engine.publish(topic, payload, qos, retain) - elif self.transport_type == TransportType.QUIC: - # QUIC engine takes 3 args: topic, payload, qos + # TCP engine takes 4 args: topic, payload, qos, priority + # Note: priority is currently passed as None as it's not exposed in high-level API yet + pid = self.engine.publish(topic, payload, qos, None) + else: + # TLS and QUIC engines take 3 args: topic, payload, qos pid = self.engine.publish(topic, payload, qos) if qos > 0: From db1045ae9ec3b22392fce89363bd51748725c3c3 Mon Sep 17 00:00:00 2001 From: =William Yang Date: Sun, 8 Feb 2026 18:02:57 +0100 Subject: [PATCH 08/12] fix: python examples --- .github/workflows/release_pypi.yml | 2 ++ python/examples/async_example.py | 7 ++++--- python/examples/asyncio_quic_client_example.py | 7 ------- ...tls_client_example.py => asyncio_tls_client_example.py} | 0 python/examples/select_example.py | 6 +++--- 5 files changed, 9 insertions(+), 13 deletions(-) rename python/examples/{async_tls_client_example.py => asyncio_tls_client_example.py} (100%) diff --git a/.github/workflows/release_pypi.yml b/.github/workflows/release_pypi.yml index 1e06e390..545d05c6 100644 --- a/.github/workflows/release_pypi.yml +++ b/.github/workflows/release_pypi.yml @@ -1,4 +1,6 @@ name: Publish to PyPI +permissions: + contents: read on: release: diff --git a/python/examples/async_example.py b/python/examples/async_example.py index 7c19dded..a691d718 100644 --- a/python/examples/async_example.py +++ b/python/examples/async_example.py @@ -1,8 +1,9 @@ import asyncio -import flowsdk_ffi +import flowsdk import time import sys +from flowsdk import FlowMqttClient, TransportType, MqttOptionsFfi, MqttEngineFfi async def main(): print("šŸš€ FlowSDK No-IO Async Example") @@ -11,7 +12,7 @@ async def main(): # 1. Create the engine import random client_id = f"python_async_no_io_{random.randint(1000, 9999)}" - opts = flowsdk_ffi.MqttOptionsFfi( + opts = MqttOptionsFfi( client_id=client_id, mqtt_version=5, clean_start=True, @@ -22,7 +23,7 @@ async def main(): reconnect_max_delay_ms=30000, max_reconnect_attempts=0 ) - engine = flowsdk_ffi.MqttEngineFfi.new_with_opts(opts) + engine = MqttEngineFfi.new_with_opts(opts) print(f"āœ… Created MqttEngineFfi (Client ID: {client_id})") # 2. Establish TCP connection using asyncio diff --git a/python/examples/asyncio_quic_client_example.py b/python/examples/asyncio_quic_client_example.py index ee85ef0c..760383ad 100644 --- a/python/examples/asyncio_quic_client_example.py +++ b/python/examples/asyncio_quic_client_example.py @@ -6,14 +6,7 @@ Note: Requires a QUIC-enabled MQTT broker (e.g., EMQX with QUIC support). """ - import asyncio -import os -import sys - -# Add the package to path -sys.path.append(os.path.join("python", "package")) - from flowsdk import FlowMqttClient, TransportType diff --git a/python/examples/async_tls_client_example.py b/python/examples/asyncio_tls_client_example.py similarity index 100% rename from python/examples/async_tls_client_example.py rename to python/examples/asyncio_tls_client_example.py diff --git a/python/examples/select_example.py b/python/examples/select_example.py index d6c1ea1d..ea5adff1 100644 --- a/python/examples/select_example.py +++ b/python/examples/select_example.py @@ -1,7 +1,7 @@ import socket import selectors import time -import flowsdk_ffi +import flowsdk import sys def main(): @@ -33,7 +33,7 @@ def main(): # 1. Create the engine client_id = f"python_select_no_io_{int(time.time() % 10000)}" - opts = flowsdk_ffi.MqttOptionsFfi( + opts = flowsdk.MqttOptionsFfi( client_id=client_id, mqtt_version=5, clean_start=True, @@ -44,7 +44,7 @@ def main(): reconnect_max_delay_ms=30000, max_reconnect_attempts=0 ) - engine = flowsdk_ffi.MqttEngineFfi.new_with_opts(opts) + engine = flowsdk.MqttEngineFfi.new_with_opts(opts) print(f"āœ… Created MqttEngineFfi (Client ID: {client_id})") # 2. Establish TCP connection From 6d8cad5d4aedf3efb901503584afa810c7da5f77 Mon Sep 17 00:00:00 2001 From: =William Yang Date: Sun, 8 Feb 2026 18:08:58 +0100 Subject: [PATCH 09/12] chore: minor fix --- .github/workflows/release_pypi.yml | 2 +- python/package/flowsdk/async_client.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/release_pypi.yml b/.github/workflows/release_pypi.yml index 545d05c6..50b69e84 100644 --- a/.github/workflows/release_pypi.yml +++ b/.github/workflows/release_pypi.yml @@ -26,7 +26,7 @@ jobs: ./scripts/build_python_bindings.sh --release - name: Build wheels - uses: pypa/cibuildwheel@v2.16.5 + uses: pypa/cibuildwheel@v2 with: package-dir: python/package env: diff --git a/python/package/flowsdk/async_client.py b/python/package/flowsdk/async_client.py index 7068664e..50a99fe8 100644 --- a/python/package/flowsdk/async_client.py +++ b/python/package/flowsdk/async_client.py @@ -1,5 +1,5 @@ """ -A production-ready async MQTT client implementation using flowsdk-ffi with asyncio. +async MQTT client implementation using flowsdk-ffi with asyncio. This module provides an asyncio-based MQTT client that wraps the flowsdk_ffi library, providing async/await support for MQTT operations. It implements a @@ -9,6 +9,7 @@ Supports multiple transports: - TCP: Standard MQTT over TCP (default) - QUIC: MQTT over QUIC protocol (UDP-based, built-in encryption) + - TLS: MQTT over TLS (encrypted TCP) Classes: TransportType: Enum for selecting MQTT transport protocol From 5dde6e6d96f24919da34d48e78a4d65fb0c2c8d0 Mon Sep 17 00:00:00 2001 From: =William Yang Date: Sun, 8 Feb 2026 18:17:59 +0100 Subject: [PATCH 10/12] minor fix --- python/examples/asyncio_tcp_client_example.py | 4 ---- python/package/MANIFEST.in | 1 + 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/python/examples/asyncio_tcp_client_example.py b/python/examples/asyncio_tcp_client_example.py index 571fa426..f7d46278 100644 --- a/python/examples/asyncio_tcp_client_example.py +++ b/python/examples/asyncio_tcp_client_example.py @@ -57,7 +57,3 @@ async def main(): if __name__ == "__main__": asyncio.run(main()) - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/python/package/MANIFEST.in b/python/package/MANIFEST.in index 8d9b384a..3821d4d2 100644 --- a/python/package/MANIFEST.in +++ b/python/package/MANIFEST.in @@ -1,3 +1,4 @@ include flowsdk/*.dylib include flowsdk/*.so include flowsdk/*.dll +include README.md From 12073862b2a82624555c52abf000a7a7ce4ac964 Mon Sep 17 00:00:00 2001 From: =William Yang Date: Sun, 8 Feb 2026 21:55:27 +0100 Subject: [PATCH 11/12] feat(ffi): python conn error handling --- .gitignore | 2 + python/package/flowsdk/async_client.py | 109 +++++++++++++++---------- python/package/pyproject.toml | 2 +- 3 files changed, 70 insertions(+), 43 deletions(-) diff --git a/.gitignore b/.gitignore index 458bb937..b63c262c 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,8 @@ Cargo.lock # FFI generated code python/package/flowsdk/flowsdk_ffi.py python/package/flowsdk/libflowsdk_ffi.dylib +python/package/flowsdk/libflowsdk_ffi.so +python/package/flowsdk/libflowsdk_ffi.dll # Python Examples (generated files) python/examples/flowsdk_ffi.py diff --git a/python/package/flowsdk/async_client.py b/python/package/flowsdk/async_client.py index 50a99fe8..c2ccf2e1 100644 --- a/python/package/flowsdk/async_client.py +++ b/python/package/flowsdk/async_client.py @@ -45,6 +45,7 @@ """ import asyncio +import logging import socket import time from enum import Enum @@ -56,6 +57,10 @@ import flowsdk_ffi +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + + class TransportType(Enum): """MQTT transport protocol types.""" TCP = "tcp" @@ -196,11 +201,10 @@ def __init__(self, engine, loop: asyncio.AbstractEventLoop, on_event_cb: Callabl def connection_made(self, transport: asyncio.DatagramTransport): """Called when UDP socket is ready.""" self.transport = transport - # Get the remote address from the transport try: self.remote_addr = transport.get_extra_info('peername') except Exception: - pass + logger.debug("Could not get peername from transport", exc_info=True) # Don't call engine.connect() here - QUIC requires parameters # Connection will be initiated from FlowMqttClient.connect() @@ -375,7 +379,7 @@ def __init__( self._pending_subscribe: Dict[int, asyncio.Future] = {} self._pending_unsubscribe: Dict[int, asyncio.Future] = {} - async def connect(self, host: str, port: int, server_name: Optional[str] = None): + async def connect(self, host: str, port: int, server_name: Optional[str] = None, timeout: float = 10.0): """ Connect to MQTT broker. @@ -383,51 +387,66 @@ async def connect(self, host: str, port: int, server_name: Optional[str] = None) host: Broker hostname or IP address port: Broker port number server_name: Server name for TLS SNI (required for QUIC) + timeout: Connection timeout in seconds (default: 10.0) Raises: ConnectionError: If connection fails + asyncio.TimeoutError: If connection times out """ loop = asyncio.get_running_loop() self._connect_future = loop.create_future() - if self.transport_type in (TransportType.TCP, TransportType.TLS): - # Stream-based connection (TCP or TLS) - transport, protocol = await loop.create_connection( - lambda: FlowMqttProtocol(self.engine, self.transport_type, loop, self._on_event), - host, port - ) - self.protocol = protocol - - elif self.transport_type == TransportType.QUIC: - # QUIC connection using FlowMqttDatagramProtocol - if not server_name: - server_name = host # Default to host if not specified + async def _do_connect(): + if self.transport_type in (TransportType.TCP, TransportType.TLS): + # Stream-based connection (TCP or TLS) + transport, protocol = await loop.create_connection( + lambda: FlowMqttProtocol(self.engine, self.transport_type, loop, self._on_event), + host, port + ) + self.protocol = protocol + + elif self.transport_type == TransportType.QUIC: + # QUIC connection using FlowMqttDatagramProtocol + if not server_name: + _server_name = host # Default to host if not specified + else: + _server_name = server_name + + # Resolve hostname to IP address for QUIC + addr_info = await loop.getaddrinfo( + host, port, + family=socket.AF_INET, + type=socket.SOCK_DGRAM + ) + if not addr_info: + raise ConnectionError(f"Could not resolve {host}") + + server_addr = f"{addr_info[0][4][0]}:{port}" + + # Create UDP socket and protocol + transport, protocol = await loop.create_datagram_endpoint( + lambda: FlowMqttDatagramProtocol(self.engine, loop, self._on_event), + remote_addr=addr_info[0][4] + ) + self.protocol = protocol + protocol.remote_addr = addr_info[0][4] + + # Initiate QUIC connection with required parameters + now_ms = int((time.monotonic() - protocol.start_time) * 1000) + self.engine.connect(server_addr, _server_name, self.tls_opts, now_ms) - # Resolve hostname to IP address for QUIC - addr_info = socket.getaddrinfo( - host, port, - family=socket.AF_INET, - type=socket.SOCK_DGRAM - ) - if not addr_info: - raise ConnectionError(f"Could not resolve {host}") - - server_addr = f"{addr_info[0][4][0]}:{port}" - - # Create UDP socket and protocol - transport, protocol = await loop.create_datagram_endpoint( - lambda: FlowMqttDatagramProtocol(self.engine, loop, self._on_event), - remote_addr=addr_info[0][4] - ) - self.protocol = protocol - protocol.remote_addr = addr_info[0][4] - - # Initiate QUIC connection with required parameters - now_ms = int((time.monotonic() - protocol.start_time) * 1000) - self.engine.connect(server_addr, server_name, self.tls_opts, now_ms) - - # Wait for actual MQTT connection - await self._connect_future + # Wait for actual MQTT connection + if self._connect_future: + await self._connect_future + + try: + await asyncio.wait_for(_do_connect(), timeout=timeout) + except Exception: + # Clean up on failure + self.protocol = None + if self._connect_future and not self._connect_future.done(): + self._connect_future.cancel() + raise async def subscribe(self, topic: str, qos: int = 0) -> int: """ @@ -580,8 +599,14 @@ def _on_event(self, ev): if self.on_message: self.on_message(msg.topic, msg.payload, msg.qos) + elif ev.is_error(): + if self._connect_future and not self._connect_future.done(): + self._connect_future.set_exception(ConnectionError(f"MQTT connection error: {ev.message}")) + logger.error(f"MQTT Error: {ev.message}") + elif ev.is_disconnected(): - pass + if self._connect_future and not self._connect_future.done(): + self._connect_future.set_exception(ConnectionError("MQTT disconnected during connection process")) -__all__ = ['FlowMqttClient', 'FlowMqttProtocol'] +__all__ = ['FlowMqttClient', 'FlowMqttProtocol','TransportType', 'FlowMqttDatagramProtocol'] diff --git a/python/package/pyproject.toml b/python/package/pyproject.toml index dfa1a1ca..387c3195 100644 --- a/python/package/pyproject.toml +++ b/python/package/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "flowsdk" -version = "0.3.1" +version = "0.4.2" description = "Python bindings for FlowSDK utilizing Uniffi" readme = "README.md" authors = [ From 43f5d8ecb159ee7a20524ea11091b24d68ab1eb8 Mon Sep 17 00:00:00 2001 From: =William Yang Date: Sun, 8 Feb 2026 22:15:55 +0100 Subject: [PATCH 12/12] refactor(ffi): python, rename `_pump` to `pump` for public access --- python/examples/async_example.py | 5 ++-- python/examples/select_example.py | 1 - python/examples/test_binding.py | 2 -- python/package/flowsdk/async_client.py | 32 +++++++++++++++++--------- 4 files changed, 23 insertions(+), 17 deletions(-) diff --git a/python/examples/async_example.py b/python/examples/async_example.py index a691d718..a01ca6bd 100644 --- a/python/examples/async_example.py +++ b/python/examples/async_example.py @@ -2,7 +2,7 @@ import asyncio import flowsdk import time -import sys + from flowsdk import FlowMqttClient, TransportType, MqttOptionsFfi, MqttEngineFfi async def main(): @@ -21,7 +21,6 @@ async def main(): password=None, reconnect_base_delay_ms=1000, reconnect_max_delay_ms=30000, - max_reconnect_attempts=0 ) engine = MqttEngineFfi.new_with_opts(opts) print(f"āœ… Created MqttEngineFfi (Client ID: {client_id})") @@ -43,7 +42,7 @@ async def main(): # 4. Main orchestration loop try: - # Run for 15 seconds + # Run for 15 sconds start_time = time.monotonic() end_time = start_time + 15 diff --git a/python/examples/select_example.py b/python/examples/select_example.py index ea5adff1..327c48ae 100644 --- a/python/examples/select_example.py +++ b/python/examples/select_example.py @@ -2,7 +2,6 @@ import selectors import time import flowsdk -import sys def main(): """ diff --git a/python/examples/test_binding.py b/python/examples/test_binding.py index 9df7888e..8c813e29 100644 --- a/python/examples/test_binding.py +++ b/python/examples/test_binding.py @@ -1,6 +1,4 @@ import flowsdk_ffi -import time -import json def test_mqtt_engine(): print("--- Testing Standard MqttEngineFfi ---") diff --git a/python/package/flowsdk/async_client.py b/python/package/flowsdk/async_client.py index c2ccf2e1..738f6b0f 100644 --- a/python/package/flowsdk/async_client.py +++ b/python/package/flowsdk/async_client.py @@ -112,7 +112,7 @@ def data_received(self, data: bytes): self.engine.handle_incoming(data) # Trigger an immediate pump to process potential responses - self._pump() + self.pump() def connection_lost(self, exc: Optional[Exception]): """Called when the connection is closed.""" @@ -143,7 +143,7 @@ def _on_timer(self): now_ms = int((time.monotonic() - self.start_time) * 1000) self.engine.handle_tick(now_ms) - self._pump() + self.pump() # Schedule next tick if self.transport_type == TransportType.TCP: @@ -158,7 +158,7 @@ def _on_timer(self): self._schedule_tick(delay) - def _pump(self): + def pump(self): """Pump data between engine and network.""" # 1. Send outgoing data if self.transport_type == TransportType.TLS: @@ -220,7 +220,7 @@ def datagram_received(self, data: bytes, addr): addr_str = f"{addr[0]}:{addr[1]}" self.engine.handle_datagram(data, addr_str, now_ms) # Trigger an immediate pump to process responses - self._pump() + self.pump() def error_received(self, exc: Exception): """Called when an error is received.""" @@ -254,13 +254,13 @@ def _on_timer(self): for ev in events: self.on_event_cb(ev) - self._pump() + self.pump() # Fixed 10ms interval for QUIC delay = 0.01 self._schedule_tick(delay) - def _pump(self): + def pump(self): """Pump data between engine and network.""" # 1. Send outgoing datagrams datagrams = self.engine.take_outgoing_datagrams() @@ -471,7 +471,7 @@ async def subscribe(self, topic: str, qos: int = 0) -> int: self._pending_subscribe[pid] = fut # Force a pump to send the packet immediately - self.protocol._pump() + self.protocol.pump() await fut return pid @@ -498,7 +498,7 @@ async def unsubscribe(self, topic: str) -> int: self._pending_unsubscribe[pid] = fut # Force a pump to send the packet immediately - self.protocol._pump() + self.protocol.pump() await fut return pid @@ -541,15 +541,25 @@ async def publish(self, topic: str, payload: bytes, qos: int = 0, retain: Option self._pending_publish[pid] = fut # Force a pump to send the packet immediately - self.protocol._pump() + self.protocol.pump() await fut else: # QoS 0: just send immediately, no ack needed - self.protocol._pump() + self.protocol.pump() return pid + def pump(self): + """ + Force data transmission and process pending events. + + This can be useful if the automatic tick loop is not running or if + you want to ensure data is sent immediately. + """ + if self.protocol: + self.protocol.pump() + async def disconnect(self): """ Disconnect from the broker gracefully. @@ -558,7 +568,7 @@ async def disconnect(self): return self.engine.disconnect() - self.protocol._pump() # Flush DISCONNECT packet + self.protocol.pump() # Flush DISCONNECT packet if self.protocol.transport: self.protocol.transport.close()