diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8145d8b0..13635f7a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -134,6 +134,24 @@ jobs: - name: Run security audit run: cargo audit + python-bindings: + name: Python Bindings + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.10' + + - name: Build and Test Bindings + run: ./scripts/build_python_bindings.sh --test + coverage: name: Code Coverage runs-on: ubuntu-latest diff --git a/.github/workflows/release_pypi.yml b/.github/workflows/release_pypi.yml new file mode 100644 index 00000000..50b69e84 --- /dev/null +++ b/.github/workflows/release_pypi.yml @@ -0,0 +1,59 @@ +name: Publish to PyPI +permissions: + contents: read + +on: + release: + types: [published] + +jobs: + build_wheels: + name: Build wheels on ${{ matrix.os }} + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [ubuntu-latest, windows-latest, macos-latest] + + steps: + - uses: actions/checkout@v4 + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + + - name: Build and Prepare Bindings + shell: bash + run: | + ./scripts/build_python_bindings.sh --release + + - name: Build wheels + uses: pypa/cibuildwheel@v2 + with: + package-dir: python/package + env: + CIBW_BEFORE_BUILD: "pip install setuptools wheel" + # Linux specific: ensure library is copied correctly if needed + # But our script handles the dylib placement. + + - uses: actions/upload-artifact@v4 + with: + name: cibw-wheels-${{ matrix.os }}-${{ strategy.job-index }} + path: ./wheelhouse/*.whl + + upload_pypi: + needs: [build_wheels] + runs-on: ubuntu-latest + environment: + name: pypi + url: https://pypi.org/p/flowsdk + permissions: + id-token: write + steps: + - uses: actions/download-artifact@v4 + with: + pattern: cibw-wheels-* + path: dist + merge-multiple: true + + - uses: pypa/gh-action-pypi-publish@release/v1 + with: + password: ${{ secrets.PYPI_API_TOKEN }} diff --git a/.gitignore b/.gitignore index a03e0511..b63c262c 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,18 @@ Cargo.lock *.profraw *.profdata + + +# FFI generated code +python/package/flowsdk/flowsdk_ffi.py +python/package/flowsdk/libflowsdk_ffi.dylib +python/package/flowsdk/libflowsdk_ffi.so +python/package/flowsdk/libflowsdk_ffi.dll + +# Python Examples (generated files) +python/examples/flowsdk_ffi.py +python/examples/*.dylib +python/examples/*.so +python/examples/*.dll + +python/**/*.pyc diff --git a/python/examples/README.md b/python/examples/README.md new file mode 100644 index 00000000..1404f2a0 --- /dev/null +++ b/python/examples/README.md @@ -0,0 +1,137 @@ +# FlowSDK Python Examples + +This directory contains examples demonstrating various usage patterns of the FlowSDK Python bindings. + +## Quick Start Examples + +### 1. Simple Async Usage (`simple_async_usage.py`) +**Minimal async MQTT client example - Start here!** + +```bash +PYTHONPATH=../package python3 simple_async_usage.py +``` + +The simplest way to use FlowSDK with asyncio. Shows connect, subscribe, publish, and disconnect in just a few lines. + +### 2. Asyncio TCP Client (`asyncio_tcp_client_example.py`) +**Standard async client using TCP** + +```bash +PYTHONPATH=../package python3 asyncio_tcp_client_example.py +``` + +Demonstrates the high-level `FlowMqttClient` API using standard TCP transport with message callbacks and proper error handling. + +## Secure Transport Examples + +### 3. TLS Transport (`async_tls_client_example.py`) +**MQTT over TLS for secure communication** + +```bash +PYTHONPATH=../package python3 async_tls_client_example.py +``` + +Shows how to use MQTT over TLS (port 8883) with the unified `FlowMqttClient`. Includes: +- Explicit TLS transport selection +- Certificate verification (default) +- SNI support via `server_name` + +### 4. QUIC Transport (`asyncio_quic_client_example.py`) +**MQTT over QUIC for modern UDP-based transport** + +```bash +PYTHONPATH=../package python3 asyncio_quic_client_example.py +``` + +Shows how to use MQTT over QUIC protocol with the unified `FlowMqttClient`. QUIC provides: +- Built-in encryption (TLS 1.3) +- Connection migration +- Lower latency than TCP +- No head-of-line blocking + +**Requirements**: QUIC-enabled MQTT broker (e.g., EMQX 5.0+ with QUIC listener on port 14567) + +## Low-Level Examples + +### 5. Select-Based Client (`select_example.py`) +**Non-blocking I/O using select()** + +```bash +PYTHONPATH=../package python3 select_example.py +``` + +Demonstrates manual socket management with the select() system call. + +### 6. Async with Manual I/O (`async_example.py`) +**Asyncio with manual socket operations** + +```bash +PYTHONPATH=../package python3 async_example.py +``` + +Shows how to manually integrate the MQTT engine with asyncio event loop using socket I/O. + +## Testing + +### 8. Binding Tests (`test_binding.py`) +**Low-level FFI binding verification** + +```bash +PYTHONPATH=../package python3 test_binding.py +``` + +## Architecture Overview + +### High-Level API (Recommended) +``` +FlowMqttClient → FlowMqttProtocol → MqttEngineFfi → Rust FFI +``` +- Simple async/await API +- Automatic network handling +- Built-in reconnection support + +### Low-Level API (Advanced) +``` +Your Code → MqttEngineFfi → Manual I/O → Network +``` + +## Transport Selection + +FlowSDK supports multiple transport protocols via the unified `FlowMqttClient`: + +### TCP +```python +client = FlowMqttClient("my_client", transport=TransportType.TCP) +await client.connect("broker.emqx.io", 1883) +``` + +### TLS +```python +client = FlowMqttClient( + "my_client", + transport=TransportType.TLS, + server_name="broker.emqx.io" +) +await client.connect("broker.emqx.io", 8883) +``` + +### QUIC +```python +client = FlowMqttClient( + "my_client", + transport=TransportType.QUIC, + server_name="broker.emqx.io" +) +await client.connect("broker.emqx.io", 14567, server_name="broker.emqx.io") +``` + +## Learning Path + +1. **Start here**: `simple_async_usage.py` - Learn the basics +2. **Next**: `asyncio_tcp_client_example.py` - See full features +3. **Secure**: `async_tls_client_example.py` - Try TLS transport +4. **Modern**: `asyncio_quic_client_example.py` - Try QUIC transport + +## License + +Mozilla Public License 2.0 diff --git a/python/examples/async_example.py b/python/examples/async_example.py new file mode 100644 index 00000000..a01ca6bd --- /dev/null +++ b/python/examples/async_example.py @@ -0,0 +1,144 @@ + +import asyncio +import flowsdk +import time + +from flowsdk import FlowMqttClient, TransportType, MqttOptionsFfi, MqttEngineFfi + +async def main(): + print("🚀 FlowSDK No-IO Async Example") + print("=" * 60) + + # 1. Create the engine + import random + client_id = f"python_async_no_io_{random.randint(1000, 9999)}" + opts = MqttOptionsFfi( + client_id=client_id, + mqtt_version=5, + clean_start=True, + keep_alive=60, + username=None, + password=None, + reconnect_base_delay_ms=1000, + reconnect_max_delay_ms=30000, + ) + engine = MqttEngineFfi.new_with_opts(opts) + print(f"✅ Created MqttEngineFfi (Client ID: {client_id})") + + # 2. Establish TCP connection using asyncio + broker_host = "broker.emqx.io" + broker_port = 1883 + print(f"📡 Connecting to {broker_host}:{broker_port}...") + try: + reader, writer = await asyncio.open_connection(broker_host, broker_port) + except Exception as e: + print(f"❌ Connection failed: {e}") + return + + print("✅ TCP Connected") + + # 3. Initiate MQTT connection + engine.connect() + + # 4. Main orchestration loop + try: + # Run for 15 sconds + start_time = time.monotonic() + end_time = start_time + 15 + + # Subscribe/Publish state + subscribed = False + published = False + + while time.monotonic() < end_time: + # now_ms should be the time elapsed since the engine was created. + # Since we create the engine just before this, monotonic() - start_time is a good proxy. + now_ms = int((time.monotonic() - start_time) * 1000) + + # 1. Handle protocol timers + engine.handle_tick(now_ms) + + # 2. Handle network input (non-blocking-ish) + try: + # Use a small timeout to keep the loop pumping + data = await asyncio.wait_for(reader.read(4096), timeout=0.05) + if data: + print(f"📥 Received {len(data)} bytes from network") + engine.handle_incoming(data) + elif reader.at_eof(): + print("📥 Reader got EOF") + engine.handle_connection_lost() + break + except asyncio.TimeoutError: + # No data available in this tick, that's fine + pass + except Exception as e: + print(f"❌ Read error: {e}") + engine.handle_connection_lost() + break + + # 3. Handle network output + outgoing = engine.take_outgoing() + if outgoing: + print(f"📤 Sending {len(outgoing)} bytes to network...") + writer.write(outgoing) + await writer.drain() + + # 4. Process all accumulated events + events = engine.take_events() + for event in events: + if event.is_connected(): + res = event[0] + print(f"✅ MQTT Connected! (Reason: {res.reason_code}, Session Present: {res.session_present})") + elif event.is_disconnected(): + print(f"💔 MQTT Disconnected! (Reason: {event.reason_code})") + elif event.is_message_received(): + msg = event[0] + print(f"📨 Message on '{msg.topic}': {msg.payload.decode()} (QoS: {msg.qos})") + elif event.is_subscribed(): + res = event[0] + print(f"✅ Subscribed (PID: {res.packet_id}, Reasons: {res.reason_codes})") + elif event.is_published(): + res = event[0] + print(f"✅ Published (PID: {res.packet_id}, Reason: {res.reason_code})") + elif event.is_error(): + print(f"❌ Engine Error: {event[0].message}") + elif event.is_reconnect_needed(): + print("🔄 Engine signaled ReconnectNeeded") + elif event.is_reconnect_scheduled(): + print(f"⏰ Engine scheduled reconnect (Attempt {event.attempt}, Delay {event.delay_ms}ms)") + elif event.is_ping_response(): + print(f"🏓 Ping Response (Success: {event.success})") + + # 5. Application logic + if engine.is_connected(): + if not subscribed: + pid = engine.subscribe("test/python/no_io", 1) + print(f"📑 API -> subscribe('test/python/no_io', qos=1) -> PID: {pid}") + subscribed = True + elif not published: + pid = engine.publish("test/python/no_io", b"Hello from Python No-IO!", 1, None) + print(f"📤 API -> publish('test/python/no_io', payload='...', qos=1) -> PID: {pid}") + published = True + + except Exception as e: + print(f"❌ Loop error: {e}") + finally: + # 6. Disconnect gracefully + print("\n👋 Disconnecting...") + if not writer.is_closing(): + engine.disconnect() + outgoing = engine.take_outgoing() + if outgoing: + writer.write(outgoing) + await writer.drain() + + writer.close() + await writer.wait_closed() + print("✅ Done") + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + pass diff --git a/python/examples/asyncio_quic_client_example.py b/python/examples/asyncio_quic_client_example.py new file mode 100644 index 00000000..760383ad --- /dev/null +++ b/python/examples/asyncio_quic_client_example.py @@ -0,0 +1,68 @@ +""" +Example demonstrating MQTT over QUIC using flowsdk-ffi with asyncio. + +QUIC provides a modern UDP-based transport with built-in encryption and multiplexing. +This example shows how to use the FlowMqttClient with QUIC transport. + +Note: Requires a QUIC-enabled MQTT broker (e.g., EMQX with QUIC support). +""" +import asyncio +from flowsdk import FlowMqttClient, TransportType + + +def on_message(topic: str, payload: bytes, qos: int): + """Callback for received messages.""" + print(f"📨 Received: {topic} -> {payload.decode('utf-8')} (QoS {qos})") + + +async def main(): + """Main example demonstrating QUIC MQTT client.""" + print("🚀 Starting QUIC MQTT Client Example...") + + # Create QUIC client + client = FlowMqttClient( + client_id="quic_test_client", + transport=TransportType.QUIC, + mqtt_version=5, + insecure_skip_verify=True, # Skip TLS verification for demo + on_message=on_message + ) + + try: + # Connect to broker using QUIC + host = "broker.emqx.io" + port = 14567 + print(f"🔌 Connecting to {host}:{port} via QUIC...") + await client.connect(host, port, server_name="broker.emqx.io") + print("✅ Connected!") + + # Subscribe to a test topic + topic = "flowsdk/quic/test" + print(f"📡 Subscribing to {topic}...") + await client.subscribe(topic, qos=1) + print("✅ Subscribed!") + + # Publish some test messages + for i in range(3): + message = f"QUIC message {i+1}" + print(f"📤 Publishing: {message}") + await client.publish(topic, message.encode(), qos=1) + await asyncio.sleep(0.5) + + # Wait for messages + print("⏳ Waiting for messages (5 seconds)...") + await asyncio.sleep(5) + + # Disconnect + print("👋 Disconnecting...") + await client.disconnect() + print("✅ Disconnected!") + + except Exception as e: + print(f"❌ Error: {e}") + import traceback + traceback.print_exc() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/examples/asyncio_tcp_client_example.py b/python/examples/asyncio_tcp_client_example.py new file mode 100644 index 00000000..f7d46278 --- /dev/null +++ b/python/examples/asyncio_tcp_client_example.py @@ -0,0 +1,59 @@ +""" +Example demonstrating the high-level FlowMqttClient for asyncio applications. + +This example shows how to use the FlowMqttClient class from the flowsdk package +to easily connect, subscribe, publish, and disconnect from an MQTT broker using +async/await syntax. + +@TODO: Add more features here +""" + +import asyncio +import time +from flowsdk import FlowMqttClient + + +def on_message(topic: str, payload: bytes, qos: int): + """Callback for received messages.""" + print(f"******** MSG RECEIVED ********") + print(f"Topic: {topic}") + print(f"Payload: {payload.decode(errors='replace')}") + print(f"QoS: {qos}") + print(f"******************************") + + +async def main(): + # Create client with message callback + client_id = f"python_proper_{int(time.time() % 10000)}" + client = FlowMqttClient(client_id, on_message=on_message) + + try: + # Connect to broker + host = "broker.emqx.io" + port = 1883 + print(f"Connecting to {host}:{port}...") + await client.connect(host, port) + print("✅ Connected!") + + # Subscribe and wait for ack + print("Subscribing to test/python/proper...") + await client.subscribe("test/python/proper", 1) + print("✅ Subscribe completed!") + + # Publish and wait for ack + print("Publishing message...") + await client.publish("test/python/proper", b"Hello from Proper Client!", 1) + print("✅ Publish completed!") + + # Wait a bit to receive the message back + print("Waiting for messages...") + await asyncio.sleep(2) + + finally: + print("Disconnecting...") + await client.disconnect() + print("✅ Disconnected!") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/examples/asyncio_tls_client_example.py b/python/examples/asyncio_tls_client_example.py new file mode 100644 index 00000000..90518f66 --- /dev/null +++ b/python/examples/asyncio_tls_client_example.py @@ -0,0 +1,68 @@ +""" +Example demonstrating the high-level FlowMqttClient for asyncio applications. + +This example shows how to use the FlowMqttClient class from the flowsdk package +to easily connect, subscribe, publish, and disconnect from an MQTT broker using +async/await syntax. +""" +import asyncio +import time +import os +import sys + +# Add the package to path +sys.path.append(os.path.join( "python", "package")) + +from flowsdk import FlowMqttClient, TransportType + +async def test_tls(): + print("🚀 Testing TLS Transport Support...") + + msg_received = asyncio.Event() + + def on_message(topic: str, payload: bytes, qos: int): + print(f"📨 Received message on {topic}: {payload.decode()}") + if topic == "test/python/tls" and payload == b"Hello TLS!": + msg_received.set() + + # We'll use broker.emqx.io:8883 which supports TLS + client = FlowMqttClient( + client_id=f"python_tls_test_{int(time.time() % 10000)}", + transport=TransportType.TLS, + insecure_skip_verify=False, # Skip verification for simplicity in test + server_name="broker.emqx.io", + on_message=on_message + ) + + try: + host = "broker.emqx.io" + port = 8883 + print(f"📡 Connecting to {host}:{port} (TLS)...") + await client.connect(host, port) + print("✅ TLS Connected!") + + await client.subscribe("test/python/tls", 1) + print("✅ Subscribed!") + + await client.publish("test/python/tls", b"Hello TLS!", 1) + print("✅ Published!") + + # Wait for the message with a timeout + print("⏳ Waiting for message reception...") + try: + await asyncio.wait_for(msg_received.wait(), timeout=5.0) + print("✨ Message verification successful!") + except asyncio.TimeoutError: + print("❌ Timeout waiting for message!") + + except Exception as e: + print(f"❌ TLS Test Failed: {e}") + import traceback + traceback.print_exc() + finally: + print("👋 Disconnecting...") + await client.disconnect() + print("✅ Disconnected!") + +if __name__ == "__main__": + asyncio.run(test_tls()) diff --git a/python/examples/select_example.py b/python/examples/select_example.py new file mode 100644 index 00000000..327c48ae --- /dev/null +++ b/python/examples/select_example.py @@ -0,0 +1,191 @@ +import socket +import selectors +import time +import flowsdk + +def main(): + """ + Main function demonstrating a select-based MQTT client using FlowSDK FFI. + This example shows how to use Python's selectors module to handle non-blocking + I/O with an MQTT engine. It connects to a public MQTT broker, subscribes to a + topic, publishes a message, and handles all I/O events asynchronously. + The function performs the following steps: + 1. Creates an MQTT engine with configuration options + 2. Establishes a non-blocking TCP connection to the broker + 3. Uses a selector to monitor socket events (read/write availability) + 4. Manages the MQTT connection handshake + 5. Subscribes to a test topic + 6. Publishes a test message + 7. Processes incoming MQTT messages and protocol events + 8. Runs for 30 seconds or until disconnected + The selector's EVENT_WRITE flag indicates when the socket is ready for writing, + which occurs when: + - The initial non-blocking connect() completes successfully + - The socket's send buffer has space available for outgoing data + Returns: + None + Raises: + KeyboardInterrupt: Handled gracefully to allow clean disconnection + """ + print("🚀 Select-based FlowSDK Example (Port of Rust no_io_mqtt_client_example.rs)") + print("=" * 70) + + # 1. Create the engine + client_id = f"python_select_no_io_{int(time.time() % 10000)}" + opts = flowsdk.MqttOptionsFfi( + client_id=client_id, + mqtt_version=5, + clean_start=True, + keep_alive=60, + username=None, + password=None, + reconnect_base_delay_ms=1000, + reconnect_max_delay_ms=30000, + max_reconnect_attempts=0 + ) + engine = flowsdk.MqttEngineFfi.new_with_opts(opts) + print(f"✅ Created MqttEngineFfi (Client ID: {client_id})") + + # 2. Establish TCP connection + broker_host = "broker.emqx.io" + broker_port = 1883 + print(f"📡 Connecting to {broker_host}:{broker_port}...") + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setblocking(False) + + try: + sock.connect((broker_host, broker_port)) + except BlockingIOError: + pass # Expected for non-blocking connect + + # 3. Use Selector for I/O + sel = selectors.DefaultSelector() + sel.register(sock, selectors.EVENT_WRITE) # Wait for connect to complete + + # Initiate MQTT connection + engine.connect() + + start_time = time.monotonic() + end_time = start_time + 30 + + connected = False + mqtt_connected = False + subscribed = False + published = False + + print("🔄 Entering main loop...") + + outgoing_data = b"" + + try: + while time.monotonic() < end_time: + now_ms = int((time.monotonic() - start_time) * 1000) + + # 1. Handle engine ticks + engine.handle_tick(now_ms) + + # 2. Accumulate outgoing data from engine + new_outgoing = engine.take_outgoing() + if new_outgoing: + outgoing_data += new_outgoing + + # 3. Handle network events + timeout = 0.1 + next_tick = engine.next_tick_ms() + if next_tick > 0: + timeout = max(0, (next_tick - now_ms) / 1000.0) + timeout = min(timeout, 0.1) + + events = sel.select(timeout=timeout) + + for key, mask in events: + if mask & selectors.EVENT_WRITE: + if not connected: + # Check if connection was successful + err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) + if err == 0: + print("✅ TCP Socket connected") + connected = True + else: + print(f"❌ TCP Connection failed: {err}") + return + + if outgoing_data: + try: + sent = sock.send(outgoing_data) + print(f"📤 Sent {sent} bytes") + outgoing_data = outgoing_data[sent:] + except (BlockingIOError, InterruptedError): + pass + + if mask & selectors.EVENT_READ: + try: + data = sock.recv(4096) + if data: + print(f"📥 Received {len(data)} bytes") + engine.handle_incoming(data) + else: + print("📥 EOF from server") + return + except (BlockingIOError, InterruptedError): + pass + + # 4. Update selector registration based on pending data + if not connected: + # Still waiting for connection + sel.modify(sock, selectors.EVENT_WRITE) + elif outgoing_data: + # Have data to write + sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE) + else: + # Only waiting for read + sel.modify(sock, selectors.EVENT_READ) + + # 5. Process protocol events + ffi_events = engine.take_events() + for ev in ffi_events: + if ev.is_connected(): + res = ev[0] + print(f"✅ MQTT Connected! (Reason: {res.reason_code})") + mqtt_connected = True + elif ev.is_message_received(): + msg = ev[0] + content = msg.payload.decode(errors='replace') + print(f"📨 Message on '{msg.topic}': {content} (QoS: {msg.qos})") + elif ev.is_subscribed(): + print(f"✅ Subscribed (PID: {ev[0].packet_id})") + elif ev.is_published(): + print(f"✅ Published (PID: {ev[0].packet_id})") + elif ev.is_error(): + print(f"❌ Error: {ev[0].message}") + elif ev.is_disconnected(): + print(f"💔 Disconnected (Reason: {ev.reason_code})") + return + + # 6. App Logic + if mqtt_connected: + if not subscribed: + pid = engine.subscribe("test/python/select", 1) + print(f"📑 Subscribing (PID: {pid})...") + subscribed = True + elif not published: + pid = engine.publish("test/python/select", b"Hello from Select!", 1, None) + print(f"📤 Publishing (PID: {pid})...") + published = True + + except KeyboardInterrupt: + pass + finally: + print("\n👋 Disconnecting...") + engine.disconnect() + final_outgoing = engine.take_outgoing() + if final_outgoing: + sock.setblocking(True) + sock.sendall(final_outgoing) + sock.close() + sel.close() + print("✅ Done") + +if __name__ == "__main__": + main() diff --git a/python/examples/simple_async_usage.py b/python/examples/simple_async_usage.py new file mode 100644 index 00000000..f21cb7f3 --- /dev/null +++ b/python/examples/simple_async_usage.py @@ -0,0 +1,28 @@ +""" +Simple example showing minimal FlowMqttClient usage. + +This demonstrates the most basic async MQTT client usage pattern. +""" + +import asyncio +from flowsdk import FlowMqttClient + + +async def main(): + # Create and connect + client = FlowMqttClient( + "simple_client", + on_message=lambda topic, payload, qos: print(f"📨 {topic}: {payload}") + ) + + await client.connect("broker.emqx.io", 1883) + await client.subscribe("test/#", 1) + await client.publish("test/hello", b"Hello World!", 1) + + # Keep running to receive messages + await asyncio.sleep(5) + await client.disconnect() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/examples/test_binding.py b/python/examples/test_binding.py new file mode 100644 index 00000000..8c813e29 --- /dev/null +++ b/python/examples/test_binding.py @@ -0,0 +1,128 @@ +import flowsdk_ffi + +def test_mqtt_engine(): + print("--- Testing Standard MqttEngineFfi ---") + opts = flowsdk_ffi.MqttOptionsFfi( + client_id="python_client", + mqtt_version=5, + clean_start=True, + keep_alive=60, + username=None, + password=None, + reconnect_base_delay_ms=1000, + reconnect_max_delay_ms=30000, + max_reconnect_attempts=0 + ) + engine = flowsdk_ffi.MqttEngineFfi.new_with_opts(opts) + + print(f"MQTT Version: {engine.get_version()}") + + # Simulate connection + engine.connect() + + # Subscribe + pid = engine.subscribe("test/topic", 1) + print(f"Subscribe packet_id: {pid}") + + # Publish + pid = engine.publish("test/topic", b"hello world", 1, None) + print(f"Publish packet_id: {pid}") + + # Simulate receiving a message (loopback) + print("Simulating message reception...") + msg = flowsdk_ffi.MqttMessageFfi(topic="test/topic", payload=b"hello loopback", qos=1, retain=False) + event = flowsdk_ffi.MqttEventFfi.MESSAGE_RECEIVED(msg) + engine.push_event_ffi(event) + + # Check events + events = engine.take_events() + for ev in events: + if ev.is_message_received(): + m = ev[0] # The MqttMessageFfi is the first element in the variant tuple + print(f"RECEIVED PUBLISH: topic={m.topic}, payload={m.payload.decode()}, qos={m.qos}") + else: + print(f"Other Event: {ev}") + + # Disconnect + engine.disconnect() + print("Disconnected.") + +def test_tls_engine(): + print("\n--- Testing TlsMqttEngineFfi ---") + opts = flowsdk_ffi.MqttOptionsFfi( + client_id="tls_client", + mqtt_version=5, + clean_start=True, + keep_alive=60, + username=None, + password=None, + reconnect_base_delay_ms=1000, + reconnect_max_delay_ms=30000, + max_reconnect_attempts=0 + ) + tls_opts = flowsdk_ffi.MqttTlsOptionsFfi( + ca_cert_file=None, + client_cert_file=None, + client_key_file=None, + insecure_skip_verify=True, + alpn_protocols=["mqtt"] + ) + + engine = flowsdk_ffi.TlsMqttEngineFfi(opts, tls_opts, "localhost") + engine.connect() + + # Check for outgoing socket data + data = engine.take_socket_data() + print(f"Outgoing TLS data: {len(data)} bytes") + + engine.disconnect() + print("Disconnected.") + +def test_quic_engine(): + print("\n--- Testing QuicMqttEngineFfi ---") + opts = flowsdk_ffi.MqttOptionsFfi( + client_id="quic_client", + mqtt_version=5, + clean_start=True, + keep_alive=60, + username=None, + password=None, + reconnect_base_delay_ms=1000, + reconnect_max_delay_ms=30000, + max_reconnect_attempts=0 + ) + + engine = flowsdk_ffi.QuicMqttEngineFfi(opts) + + tls_opts = flowsdk_ffi.MqttTlsOptionsFfi( + ca_cert_file=None, + client_cert_file=None, + client_key_file=None, + insecure_skip_verify=True, + alpn_protocols=["mqtt"] + ) + + engine.connect("127.0.0.1:1883", "localhost", tls_opts, 0) + + # Check for outgoing datagrams + datagrams = engine.take_outgoing_datagrams() + print(f"Outgoing QUIC datagrams: {len(datagrams)}") + + # Simulate a tick + events = engine.handle_tick(100) + print(f"Tick events: {len(events)}") + + engine.disconnect() + print("Disconnected.") + +if __name__ == "__main__": + try: + test_mqtt_engine() + test_tls_engine() + test_quic_engine() + print("\nAll verification tests passed!") + except Exception as e: + print(f"\nVerification Failed: {e}") + import traceback + traceback.print_exc() + exit(1) diff --git a/python/package/MANIFEST.in b/python/package/MANIFEST.in new file mode 100644 index 00000000..3821d4d2 --- /dev/null +++ b/python/package/MANIFEST.in @@ -0,0 +1,4 @@ +include flowsdk/*.dylib +include flowsdk/*.so +include flowsdk/*.dll +include README.md diff --git a/python/package/README.md b/python/package/README.md new file mode 100644 index 00000000..74b3a9a2 --- /dev/null +++ b/python/package/README.md @@ -0,0 +1,102 @@ +# FlowSDK Python Bindings + +Python bindings for [FlowSDK](https://github.com/emqx/flowsdk), providing both low-level FFI access and high-level async MQTT client. + +## Installation + +```bash +pip install flowsdk +``` + +## Quick Start + +### Async Client (Recommended) + +The high-level async client provides a simple asyncio-based API: + +```python +import asyncio +from flowsdk import FlowMqttClient + +async def main(): + # Create client with message callback + client = FlowMqttClient( + "my_client_id", + on_message=lambda topic, payload, qos: print(f"{topic}: {payload}") + ) + + # Connect and use + await client.connect("broker.emqx.io", 1883) + await client.subscribe("test/topic", qos=1) + await client.publish("test/topic", b"Hello World!", qos=1) + + await asyncio.sleep(2) # Wait for messages + await client.disconnect() + +asyncio.run(main()) +``` + +### Low-Level FFI + +For advanced use cases requiring manual control: + +```python +import flowsdk + +# Create engine +engine = flowsdk.MqttEngineFfi("client_id", mqtt_version=5) +engine.connect() + +# Manual network I/O required (see examples/) +``` + +## Features + +- ✅ MQTT 3.1.1 and 5.0 support +- ✅ Async/await API with asyncio +- ✅ TLS/SSL support +- ✅ QUIC transport support +- ✅ QoS 0, 1, 2 support +- ✅ Clean session and persistent sessions +- ✅ Last Will and Testament +- ✅ Automatic reconnection + +## Examples + +See the [examples directory](../../examples/) for more usage patterns: + +- `simple_async_usage.py` - Minimal async client example +- `proper_async_client.py` - Full-featured async client +- `select_example.py` - Select-based non-blocking I/O +- `async_example.py` - Manual asyncio with low-level API +- `test_binding.py` - FFI binding tests + +## API Reference + +### FlowMqttClient + +High-level async MQTT client. + +**Constructor:** +```python +FlowMqttClient( + client_id: str, + mqtt_version: int = 5, # 3 or 5 + clean_start: bool = True, + keep_alive: int = 30, # seconds + username: Optional[str] = None, + password: Optional[str] = None, + on_message: Optional[Callable[[str, bytes, int], None]] = None +) +``` + +**Methods:** +- `async connect(host: str, port: int)` - Connect to broker +- `async subscribe(topic: str, qos: int = 0) -> int` - Subscribe to topic +- `async unsubscribe(topic: str) -> int` - Unsubscribe from topic +- `async publish(topic: str, payload: bytes, qos: int = 0, retain: bool = None) -> int` - Publish message +- `async disconnect()` - Disconnect gracefully + +## License + +Mozilla Public License 2.0 diff --git a/python/package/flowsdk/__init__.py b/python/package/flowsdk/__init__.py new file mode 100644 index 00000000..1f89937f --- /dev/null +++ b/python/package/flowsdk/__init__.py @@ -0,0 +1,11 @@ +try: + from .flowsdk_ffi import * +except ImportError: + pass + +try: + from .async_client import FlowMqttClient, FlowMqttProtocol, TransportType +except ImportError: + pass + +__all__ = ['FlowMqttClient', 'FlowMqttProtocol', 'TransportType', 'MqttEngineFfi', 'TlsMqttEngineFfi', 'QuicMqttEngineFfi', 'MqttOptionsFfi'] diff --git a/python/package/flowsdk/async_client.py b/python/package/flowsdk/async_client.py new file mode 100644 index 00000000..738f6b0f --- /dev/null +++ b/python/package/flowsdk/async_client.py @@ -0,0 +1,622 @@ +""" +async MQTT client implementation using flowsdk-ffi with asyncio. + +This module provides an asyncio-based MQTT client that wraps the flowsdk_ffi +library, providing async/await support for MQTT operations. It implements a +custom Protocol for handling the network layer and manages the timing and +pumping of the MQTT engine. + +Supports multiple transports: + - TCP: Standard MQTT over TCP (default) + - QUIC: MQTT over QUIC protocol (UDP-based, built-in encryption) + - TLS: MQTT over TLS (encrypted TCP) + +Classes: + TransportType: Enum for selecting MQTT transport protocol + FlowMqttProtocol: asyncio.Protocol implementation for TCP transport + FlowMqttDatagramProtocol: asyncio.DatagramProtocol for QUIC transport + FlowMqttClient: High-level async MQTT client supporting multiple transports + +Example (TCP): + >>> import asyncio + >>> from flowsdk import FlowMqttClient + >>> + >>> async def example(): + ... client = FlowMqttClient("my_client_id") + ... await client.connect("broker.emqx.io", 1883) + ... await client.subscribe("test/topic", 1) + ... await client.publish("test/topic", b"Hello", 1) + ... await client.disconnect() + >>> + >>> asyncio.run(example()) + +Example (QUIC): + >>> import asyncio + >>> from flowsdk import FlowMqttClient, TransportType + >>> + >>> async def example(): + ... client = FlowMqttClient("my_client_id", transport=TransportType.QUIC) + ... await client.connect("broker.emqx.io", 14567, server_name="broker.emqx.io") + ... await client.subscribe("test/topic", 1) + ... await client.publish("test/topic", b"Hello", 1) + ... await client.disconnect() + >>> + >>> asyncio.run(example()) +""" + +import asyncio +import logging +import socket +import time +from enum import Enum +from typing import Optional, Dict, Callable, Any, List, Union + +try: + from . import flowsdk_ffi +except ImportError: + import flowsdk_ffi + + +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + + +class TransportType(Enum): + """MQTT transport protocol types.""" + TCP = "tcp" + TLS = "tls" + QUIC = "quic" + + +class FlowMqttProtocol(asyncio.Protocol): + """ + asyncio Protocol implementation for MQTT engine (TCP/TLS). + + This class handles the network layer, manages engine ticks, and pumps + data between the network transport and the MQTT engine. + + Args: + engine: The MQTT engine instance + transport_type: The transport protocol being used + loop: The asyncio event loop + on_event_cb: Callback function to handle MQTT events + """ + + def __init__(self, engine, transport_type: TransportType, loop: asyncio.AbstractEventLoop, on_event_cb: Callable): + self.engine = engine + self.transport_type = transport_type + self.loop = loop + self.transport: Optional[asyncio.Transport] = None + self.on_event_cb = on_event_cb + self.start_time = time.monotonic() + self._tick_handle: Optional[asyncio.TimerHandle] = None + self.closed = False + + def connection_made(self, transport: asyncio.Transport): + """Called when connection is established.""" + self.transport = transport + + # Start the MQTT connection + if hasattr(self.engine, 'connect'): + self.engine.connect() + + # Start the tick loop immediately + self._schedule_tick(0) + + def data_received(self, data: bytes): + """Called when data is received from the network.""" + # Feed network data to engine + if self.transport_type == TransportType.TLS: + self.engine.handle_socket_data(data) + else: + self.engine.handle_incoming(data) + + # Trigger an immediate pump to process potential responses + self.pump() + + def connection_lost(self, exc: Optional[Exception]): + """Called when the connection is closed.""" + self.closed = True + # Only TCP engine has handle_connection_lost + if self.transport_type == TransportType.TCP and hasattr(self.engine, 'handle_connection_lost'): + self.engine.handle_connection_lost() + + if self._tick_handle: + self._tick_handle.cancel() + + def _schedule_tick(self, delay_sec: float): + """Schedule the next engine tick.""" + if self.closed: + return + + if self._tick_handle: + self._tick_handle.cancel() + + self._tick_handle = self.loop.call_later(delay_sec, self._on_timer) + + def _on_timer(self): + """Handle scheduled engine tick.""" + if self.closed: + return + + # Run protocol tick + now_ms = int((time.monotonic() - self.start_time) * 1000) + self.engine.handle_tick(now_ms) + + self.pump() + + # Schedule next tick + if self.transport_type == TransportType.TCP: + next_tick_ms = self.engine.next_tick_ms() + if next_tick_ms < 0: + delay = 0.1 + else: + delay = max(0, (next_tick_ms - now_ms) / 1000.0) + else: + # Fixed 10ms for TLS/QUIC + delay = 0.01 + + self._schedule_tick(delay) + + def pump(self): + """Pump data between engine and network.""" + # 1. Send outgoing data + if self.transport_type == TransportType.TLS: + outgoing = self.engine.take_socket_data() + else: + outgoing = self.engine.take_outgoing() + + if outgoing and self.transport and not self.transport.is_closing(): + self.transport.write(outgoing) + + # 2. Process events + events = self.engine.take_events() + for ev in events: + self.on_event_cb(ev) + + +class FlowMqttDatagramProtocol(asyncio.DatagramProtocol): + """ + asyncio DatagramProtocol implementation for QUIC MQTT engine. + + This class handles UDP datagram transport for QUIC, managing engine ticks + and pumping data between the network transport and the MQTT engine. + + Args: + engine: The QUIC MQTT engine instance from flowsdk_ffi + loop: The asyncio event loop + on_event_cb: Callback function to handle MQTT events + """ + + def __init__(self, engine, loop: asyncio.AbstractEventLoop, on_event_cb: Callable): + self.engine = engine + self.loop = loop + self.transport: Optional[asyncio.DatagramTransport] = None + self.on_event_cb = on_event_cb + self.start_time = time.monotonic() + self._tick_handle: Optional[asyncio.TimerHandle] = None + self.closed = False + self.remote_addr = None + + def connection_made(self, transport: asyncio.DatagramTransport): + """Called when UDP socket is ready.""" + self.transport = transport + try: + self.remote_addr = transport.get_extra_info('peername') + except Exception: + logger.debug("Could not get peername from transport", exc_info=True) + + # Don't call engine.connect() here - QUIC requires parameters + # Connection will be initiated from FlowMqttClient.connect() + + # Start the tick loop immediately + self._schedule_tick(0) + + def datagram_received(self, data: bytes, addr): + """Called when a datagram is received.""" + # Feed datagram to QUIC engine + self.remote_addr = addr + now_ms = int((time.monotonic() - self.start_time) * 1000) + addr_str = f"{addr[0]}:{addr[1]}" + self.engine.handle_datagram(data, addr_str, now_ms) + # Trigger an immediate pump to process responses + self.pump() + + def error_received(self, exc: Exception): + """Called when an error is received.""" + pass # UDP errors are generally non-fatal + + def connection_lost(self, exc: Optional[Exception]): + """Called when the connection is closed.""" + self.closed = True + if self._tick_handle: + self._tick_handle.cancel() + + def _schedule_tick(self, delay_sec: float): + """Schedule the next engine tick.""" + if self.closed: + return + + if self._tick_handle: + self._tick_handle.cancel() + + self._tick_handle = self.loop.call_later(delay_sec, self._on_timer) + + def _on_timer(self): + """Handle scheduled engine tick.""" + if self.closed: + return + + # Run protocol tick + now_ms = int((time.monotonic() - self.start_time) * 1000) + # QUIC engine handle_tick returns events + events = self.engine.handle_tick(now_ms) + for ev in events: + self.on_event_cb(ev) + + self.pump() + + # Fixed 10ms interval for QUIC + delay = 0.01 + self._schedule_tick(delay) + + def pump(self): + """Pump data between engine and network.""" + # 1. Send outgoing datagrams + datagrams = self.engine.take_outgoing_datagrams() + if datagrams and self.transport and self.remote_addr: + for datagram in datagrams: + # Extract bytes from MqttDatagramFfi + self.transport.sendto(datagram.data, self.remote_addr) + + # 2. Process events + events = self.engine.take_events() + for ev in events: + self.on_event_cb(ev) + + + +class FlowMqttClient: + """ + High-level async MQTT client supporting multiple transports (TCP, QUIC). + + This client provides a simple async/await API for MQTT operations, + handling connection management, subscriptions, and publishing. + + Args: + client_id: MQTT client identifier + transport: Transport type (TransportType.TCP or TransportType.QUIC, default: TCP) + mqtt_version: MQTT protocol version (3 or 5, default: 5) + clean_start: Whether to start a clean session (default: True) + keep_alive: Keep-alive interval in seconds (default: 30) + username: Optional username for authentication + password: Optional password for authentication + reconnect_base_delay_ms: Base delay for reconnection attempts (default: 1000) + reconnect_max_delay_ms: Maximum delay for reconnection (default: 30000) + max_reconnect_attempts: Maximum reconnection attempts, 0 for infinite (default: 0) + on_message: Optional callback for received messages: fn(topic: str, payload: bytes, qos: int) + ca_cert_file: Path to CA certificate file for QUIC TLS (QUIC only) + insecure_skip_verify: Skip TLS verification for QUIC (QUIC only, default: False) + alpn_protocols: ALPN protocols for QUIC (QUIC only, default: ["mqtt"]) + + Examples: + TCP: + >>> async def tcp_example(): + ... client = FlowMqttClient("my_client", transport=TransportType.TCP) + ... await client.connect("broker.emqx.io", 1883) + ... await client.publish("test/topic", b"Hello", 1) + ... await client.disconnect() + + QUIC: + >>> async def quic_example(): + ... client = FlowMqttClient("my_client", transport=TransportType.QUIC, insecure_skip_verify=True) + ... await client.connect("broker.emqx.io", 14567, server_name="broker.emqx.io") + ... await client.publish("test/topic", b"Hello", 1) + ... await client.disconnect() + """ + + def __init__( + self, + client_id: str, + transport: TransportType = TransportType.TCP, + mqtt_version: int = 5, + clean_start: bool = True, + keep_alive: int = 30, + username: Optional[str] = None, + password: Optional[str] = None, + reconnect_base_delay_ms: int = 1000, + reconnect_max_delay_ms: int = 30000, + max_reconnect_attempts: int = 0, + on_message: Optional[Callable[[str, bytes, int], None]] = None, + ca_cert_file: Optional[str] = None, + client_cert_file: Optional[str] = None, + client_key_file: Optional[str] = None, + insecure_skip_verify: bool = False, + alpn_protocols: Optional[List[str]] = None, + server_name: Optional[str] = None + ): + self.transport_type = transport + self.server_name = server_name + self.opts = flowsdk_ffi.MqttOptionsFfi( + client_id=client_id, + mqtt_version=mqtt_version, + clean_start=clean_start, + keep_alive=keep_alive, + username=username, + password=password, + reconnect_base_delay_ms=reconnect_base_delay_ms, + reconnect_max_delay_ms=reconnect_max_delay_ms, + max_reconnect_attempts=max_reconnect_attempts + ) + + # Common TLS/QUIC options + self.tls_opts = flowsdk_ffi.MqttTlsOptionsFfi( + ca_cert_file=ca_cert_file, + client_cert_file=client_cert_file, + client_key_file=client_key_file, + insecure_skip_verify=insecure_skip_verify, + alpn_protocols=alpn_protocols or ["mqtt"] + ) + + # Create appropriate engine based on transport type + if transport == TransportType.TCP: + self.engine = flowsdk_ffi.MqttEngineFfi.new_with_opts(self.opts) + elif transport == TransportType.TLS: + if not server_name: + raise ValueError("server_name (SNI) is required for TLS transport") + self.engine = flowsdk_ffi.TlsMqttEngineFfi(self.opts, self.tls_opts, server_name) + elif transport == TransportType.QUIC: + self.engine = flowsdk_ffi.QuicMqttEngineFfi(self.opts) + else: + raise ValueError(f"Unsupported transport type: {transport}") + + self.protocol: Optional[Union[FlowMqttProtocol, FlowMqttDatagramProtocol]] = None + self.on_message = on_message + + # Futures for pending operations + self._connect_future: Optional[asyncio.Future] = None + self._pending_publish: Dict[int, asyncio.Future] = {} + self._pending_subscribe: Dict[int, asyncio.Future] = {} + self._pending_unsubscribe: Dict[int, asyncio.Future] = {} + + async def connect(self, host: str, port: int, server_name: Optional[str] = None, timeout: float = 10.0): + """ + Connect to MQTT broker. + + Args: + host: Broker hostname or IP address + port: Broker port number + server_name: Server name for TLS SNI (required for QUIC) + timeout: Connection timeout in seconds (default: 10.0) + + Raises: + ConnectionError: If connection fails + asyncio.TimeoutError: If connection times out + """ + loop = asyncio.get_running_loop() + self._connect_future = loop.create_future() + + async def _do_connect(): + if self.transport_type in (TransportType.TCP, TransportType.TLS): + # Stream-based connection (TCP or TLS) + transport, protocol = await loop.create_connection( + lambda: FlowMqttProtocol(self.engine, self.transport_type, loop, self._on_event), + host, port + ) + self.protocol = protocol + + elif self.transport_type == TransportType.QUIC: + # QUIC connection using FlowMqttDatagramProtocol + if not server_name: + _server_name = host # Default to host if not specified + else: + _server_name = server_name + + # Resolve hostname to IP address for QUIC + addr_info = await loop.getaddrinfo( + host, port, + family=socket.AF_INET, + type=socket.SOCK_DGRAM + ) + if not addr_info: + raise ConnectionError(f"Could not resolve {host}") + + server_addr = f"{addr_info[0][4][0]}:{port}" + + # Create UDP socket and protocol + transport, protocol = await loop.create_datagram_endpoint( + lambda: FlowMqttDatagramProtocol(self.engine, loop, self._on_event), + remote_addr=addr_info[0][4] + ) + self.protocol = protocol + protocol.remote_addr = addr_info[0][4] + + # Initiate QUIC connection with required parameters + now_ms = int((time.monotonic() - protocol.start_time) * 1000) + self.engine.connect(server_addr, _server_name, self.tls_opts, now_ms) + + # Wait for actual MQTT connection + if self._connect_future: + await self._connect_future + + try: + await asyncio.wait_for(_do_connect(), timeout=timeout) + except Exception: + # Clean up on failure + self.protocol = None + if self._connect_future and not self._connect_future.done(): + self._connect_future.cancel() + raise + + async def subscribe(self, topic: str, qos: int = 0) -> int: + """ + Subscribe to a topic. + + Args: + topic: MQTT topic to subscribe to + qos: Quality of Service level (0, 1, or 2) + + Returns: + Packet ID of the subscription + + Raises: + RuntimeError: If not connected + """ + if not self.protocol: + raise RuntimeError("Not connected") + + loop = asyncio.get_running_loop() + pid = self.engine.subscribe(topic, qos) + fut = loop.create_future() + self._pending_subscribe[pid] = fut + + # Force a pump to send the packet immediately + self.protocol.pump() + + await fut + return pid + + async def unsubscribe(self, topic: str) -> int: + """ + Unsubscribe from a topic. + + Args: + topic: MQTT topic to unsubscribe from + + Returns: + Packet ID of the unsubscription + + Raises: + RuntimeError: If not connected + """ + if not self.protocol: + raise RuntimeError("Not connected") + + loop = asyncio.get_running_loop() + pid = self.engine.unsubscribe(topic) + fut = loop.create_future() + self._pending_unsubscribe[pid] = fut + + # Force a pump to send the packet immediately + self.protocol.pump() + + await fut + return pid + + async def publish(self, topic: str, payload: bytes, qos: int = 0, retain: Optional[bool] = None) -> int: + """ + Publish a message. + + Args: + topic: MQTT topic to publish to + payload: Message payload as bytes + qos: Quality of Service level (0, 1, or 2) + retain: Whether to retain the message (None uses default, ignored for QUIC) + + Returns: + Packet ID of the publish (0 for QoS 0) + + Raises: + RuntimeError: If not connected + + Note: + QUIC transport does not support the retain parameter and priority. + """ + if not self.protocol: + raise RuntimeError("Not connected") + + loop = asyncio.get_running_loop() + + # Handle different engine publish signatures + if self.transport_type == TransportType.TCP: + # TCP engine takes 4 args: topic, payload, qos, priority + # Note: priority is currently passed as None as it's not exposed in high-level API yet + pid = self.engine.publish(topic, payload, qos, None) + else: + # TLS and QUIC engines take 3 args: topic, payload, qos + pid = self.engine.publish(topic, payload, qos) + + if qos > 0: + fut = loop.create_future() + self._pending_publish[pid] = fut + + # Force a pump to send the packet immediately + self.protocol.pump() + + await fut + else: + # QoS 0: just send immediately, no ack needed + self.protocol.pump() + + return pid + + def pump(self): + """ + Force data transmission and process pending events. + + This can be useful if the automatic tick loop is not running or if + you want to ensure data is sent immediately. + """ + if self.protocol: + self.protocol.pump() + + async def disconnect(self): + """ + Disconnect from the broker gracefully. + """ + if not self.protocol: + return + + self.engine.disconnect() + self.protocol.pump() # Flush DISCONNECT packet + + if self.protocol.transport: + self.protocol.transport.close() + self.protocol.closed = True + + def _on_event(self, ev): + """Internal event handler.""" + if ev.is_connected(): + if self._connect_future and not self._connect_future.done(): + self._connect_future.set_result(True) + + elif ev.is_published(): + res = ev[0] + pid = res.packet_id + if pid in self._pending_publish: + fut = self._pending_publish.pop(pid) + if not fut.done(): + fut.set_result(res) + + elif ev.is_subscribed(): + res = ev[0] + pid = res.packet_id + if pid in self._pending_subscribe: + fut = self._pending_subscribe.pop(pid) + if not fut.done(): + fut.set_result(res) + + elif ev.is_unsubscribed(): + res = ev[0] + pid = res.packet_id + if pid in self._pending_unsubscribe: + fut = self._pending_unsubscribe.pop(pid) + if not fut.done(): + fut.set_result(res) + + elif ev.is_message_received(): + msg = ev[0] + if self.on_message: + self.on_message(msg.topic, msg.payload, msg.qos) + + elif ev.is_error(): + if self._connect_future and not self._connect_future.done(): + self._connect_future.set_exception(ConnectionError(f"MQTT connection error: {ev.message}")) + logger.error(f"MQTT Error: {ev.message}") + + elif ev.is_disconnected(): + if self._connect_future and not self._connect_future.done(): + self._connect_future.set_exception(ConnectionError("MQTT disconnected during connection process")) + + +__all__ = ['FlowMqttClient', 'FlowMqttProtocol','TransportType', 'FlowMqttDatagramProtocol'] diff --git a/python/package/pyproject.toml b/python/package/pyproject.toml new file mode 100644 index 00000000..387c3195 --- /dev/null +++ b/python/package/pyproject.toml @@ -0,0 +1,23 @@ +[build-system] +requires = ["setuptools>=42", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "flowsdk" +version = "0.4.2" +description = "Python bindings for FlowSDK utilizing Uniffi" +readme = "README.md" +authors = [ + { name = "EMQ Technologies Co., Ltd.", email = "contact@emqx.io" } +] +license = { text = "MPL-2.0" } +classifiers = [ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: Mozilla Public License 2.0 (MPL 2.0)", + "Operating System :: OS Independent", +] +requires-python = ">=3.7" + +[tool.setuptools.packages.find] +where = ["."] +include = ["flowsdk*"] diff --git a/python/package/setup.py b/python/package/setup.py new file mode 100644 index 00000000..2ba7d88d --- /dev/null +++ b/python/package/setup.py @@ -0,0 +1,9 @@ +from setuptools import setup, find_packages + +setup( + packages=find_packages(), + include_package_data=True, + package_data={ + "flowsdk": ["*.dylib", "*.so", "*.dll"], + }, +) diff --git a/scripts/build_python_bindings.sh b/scripts/build_python_bindings.sh new file mode 100755 index 00000000..7e8ba45f --- /dev/null +++ b/scripts/build_python_bindings.sh @@ -0,0 +1,45 @@ +#!/bin/bash +set -e + +# Default to debug build +PROFILE="debug" +CARGO_PROFILE="dev" +TARGET_DIR="target/debug" + +if [[ "$1" == "--release" ]]; then + PROFILE="release" + CARGO_PROFILE="release" + TARGET_DIR="target/release" + shift +fi + +# Platforms +OS="$(uname -s)" +case "${OS}" in + Linux*) EXT="so";; + Darwin*) EXT="dylib";; + CYGWIN*|MINGW*|MSYS*) EXT="dll";; # Windows-ish + *) EXT="so";; +esac + +echo "Building flowsdk_ffi ($PROFILE)..." +cargo build -p flowsdk_ffi --profile $CARGO_PROFILE + +echo "Generating Python bindings..." +# Output direct to python/package/flowsdk +cargo run -p flowsdk_ffi --features=uniffi/cli --bin uniffi-bindgen generate \ + --library "$TARGET_DIR/libflowsdk_ffi.$EXT" \ + --language python \ + --out-dir python/package/flowsdk + +echo "Copying library for Python package..." +cp "$TARGET_DIR/libflowsdk_ffi.$EXT" python/package/flowsdk/ + +if [[ "$1" == "--test" ]]; then + echo "Running Python verification..." + export PYTHONPATH=$PWD/python/package + # We use a temporary test script that imports from flowsdk + python3 -c "import flowsdk; print('Import successful'); engine = flowsdk.MqttEngineFfi('test', 5); print('Engine created')" +fi + +echo "Done!"