-
Notifications
You must be signed in to change notification settings - Fork 0
feat: FFI python skeleton bindings. #84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c2526c7
35fef6f
054830b
4f8ad64
4974d2f
8c05100
d5c7dd0
db1045a
6d8cad5
5dde6e6
1207386
43f5d8e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,59 @@ | ||||||
| name: Publish to PyPI | ||||||
| permissions: | ||||||
| contents: read | ||||||
|
|
||||||
| on: | ||||||
| release: | ||||||
| types: [published] | ||||||
|
|
||||||
| jobs: | ||||||
| build_wheels: | ||||||
| name: Build wheels on ${{ matrix.os }} | ||||||
| runs-on: ${{ matrix.os }} | ||||||
| strategy: | ||||||
| matrix: | ||||||
| os: [ubuntu-latest, windows-latest, macos-latest] | ||||||
|
|
||||||
| steps: | ||||||
| - uses: actions/checkout@v4 | ||||||
|
|
||||||
| - name: Install Rust | ||||||
| uses: dtolnay/rust-toolchain@stable | ||||||
Check warningCode scanning / CodeQL Unpinned tag for a non-immutable Action in workflow Medium
Unpinned 3rd party Action 'Publish to PyPI' step
Uses Step Error loading related location Loading |
||||||
|
|
||||||
| - name: Build and Prepare Bindings | ||||||
| shell: bash | ||||||
| run: | | ||||||
| ./scripts/build_python_bindings.sh --release | ||||||
|
|
||||||
| - name: Build wheels | ||||||
| uses: pypa/cibuildwheel@v2 | ||||||
Check warningCode scanning / CodeQL Unpinned tag for a non-immutable Action in workflow Medium
Unpinned 3rd party Action 'Publish to PyPI' step
Uses Step Error loading related location Loading |
||||||
| with: | ||||||
| package-dir: python/package | ||||||
| env: | ||||||
| CIBW_BEFORE_BUILD: "pip install setuptools wheel" | ||||||
| # Linux specific: ensure library is copied correctly if needed | ||||||
| # But our script handles the dylib placement. | ||||||
|
|
||||||
| - uses: actions/upload-artifact@v4 | ||||||
| with: | ||||||
| name: cibw-wheels-${{ matrix.os }}-${{ strategy.job-index }} | ||||||
| path: ./wheelhouse/*.whl | ||||||
|
|
||||||
| upload_pypi: | ||||||
|
||||||
| needs: [build_wheels] | ||||||
| runs-on: ubuntu-latest | ||||||
| environment: | ||||||
| name: pypi | ||||||
| url: https://pypi.org/p/flowsdk | ||||||
| permissions: | ||||||
| id-token: write | ||||||
| steps: | ||||||
| - uses: actions/download-artifact@v4 | ||||||
| with: | ||||||
| pattern: cibw-wheels-* | ||||||
| path: dist | ||||||
| merge-multiple: true | ||||||
|
|
||||||
| - uses: pypa/gh-action-pypi-publish@release/v1 | ||||||
Check warningCode scanning / CodeQL Unpinned tag for a non-immutable Action in workflow Medium
Unpinned 3rd party Action 'Publish to PyPI' step
Uses Step Error loading related location Loading |
||||||
| with: | ||||||
| password: ${{ secrets.PYPI_API_TOKEN }} | ||||||
|
Comment on lines
+58
to
+59
|
||||||
| with: | |
| password: ${{ secrets.PYPI_API_TOKEN }} |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,137 @@ | ||
| # FlowSDK Python Examples | ||
|
|
||
| This directory contains examples demonstrating various usage patterns of the FlowSDK Python bindings. | ||
|
|
||
| ## Quick Start Examples | ||
|
|
||
| ### 1. Simple Async Usage (`simple_async_usage.py`) | ||
| **Minimal async MQTT client example - Start here!** | ||
|
|
||
| ```bash | ||
| PYTHONPATH=../package python3 simple_async_usage.py | ||
| ``` | ||
|
|
||
| The simplest way to use FlowSDK with asyncio. Shows connect, subscribe, publish, and disconnect in just a few lines. | ||
|
|
||
| ### 2. Asyncio TCP Client (`asyncio_tcp_client_example.py`) | ||
| **Standard async client using TCP** | ||
|
|
||
| ```bash | ||
| PYTHONPATH=../package python3 asyncio_tcp_client_example.py | ||
| ``` | ||
|
|
||
| Demonstrates the high-level `FlowMqttClient` API using standard TCP transport with message callbacks and proper error handling. | ||
|
|
||
| ## Secure Transport Examples | ||
|
|
||
| ### 3. TLS Transport (`async_tls_client_example.py`) | ||
| **MQTT over TLS for secure communication** | ||
|
|
||
| ```bash | ||
| PYTHONPATH=../package python3 async_tls_client_example.py | ||
| ``` | ||
|
|
||
| Shows how to use MQTT over TLS (port 8883) with the unified `FlowMqttClient`. Includes: | ||
| - Explicit TLS transport selection | ||
| - Certificate verification (default) | ||
| - SNI support via `server_name` | ||
|
|
||
| ### 4. QUIC Transport (`asyncio_quic_client_example.py`) | ||
| **MQTT over QUIC for modern UDP-based transport** | ||
|
|
||
| ```bash | ||
| PYTHONPATH=../package python3 asyncio_quic_client_example.py | ||
| ``` | ||
|
|
||
| Shows how to use MQTT over QUIC protocol with the unified `FlowMqttClient`. QUIC provides: | ||
| - Built-in encryption (TLS 1.3) | ||
| - Connection migration | ||
| - Lower latency than TCP | ||
| - No head-of-line blocking | ||
|
|
||
| **Requirements**: QUIC-enabled MQTT broker (e.g., EMQX 5.0+ with QUIC listener on port 14567) | ||
|
|
||
| ## Low-Level Examples | ||
|
|
||
| ### 5. Select-Based Client (`select_example.py`) | ||
| **Non-blocking I/O using select()** | ||
|
|
||
| ```bash | ||
| PYTHONPATH=../package python3 select_example.py | ||
| ``` | ||
|
|
||
| Demonstrates manual socket management with the select() system call. | ||
|
|
||
| ### 6. Async with Manual I/O (`async_example.py`) | ||
| **Asyncio with manual socket operations** | ||
|
|
||
| ```bash | ||
| PYTHONPATH=../package python3 async_example.py | ||
| ``` | ||
|
|
||
| Shows how to manually integrate the MQTT engine with asyncio event loop using socket I/O. | ||
|
|
||
| ## Testing | ||
|
|
||
| ### 8. Binding Tests (`test_binding.py`) | ||
| **Low-level FFI binding verification** | ||
|
|
||
| ```bash | ||
| PYTHONPATH=../package python3 test_binding.py | ||
| ``` | ||
|
|
||
| ## Architecture Overview | ||
|
|
||
| ### High-Level API (Recommended) | ||
| ``` | ||
| FlowMqttClient → FlowMqttProtocol → MqttEngineFfi → Rust FFI | ||
| ``` | ||
| - Simple async/await API | ||
| - Automatic network handling | ||
| - Built-in reconnection support | ||
|
|
||
| ### Low-Level API (Advanced) | ||
| ``` | ||
| Your Code → MqttEngineFfi → Manual I/O → Network | ||
| ``` | ||
|
|
||
| ## Transport Selection | ||
|
|
||
| FlowSDK supports multiple transport protocols via the unified `FlowMqttClient`: | ||
|
|
||
| ### TCP | ||
| ```python | ||
| client = FlowMqttClient("my_client", transport=TransportType.TCP) | ||
| await client.connect("broker.emqx.io", 1883) | ||
| ``` | ||
|
|
||
| ### TLS | ||
| ```python | ||
| client = FlowMqttClient( | ||
| "my_client", | ||
| transport=TransportType.TLS, | ||
| server_name="broker.emqx.io" | ||
| ) | ||
| await client.connect("broker.emqx.io", 8883) | ||
| ``` | ||
|
|
||
| ### QUIC | ||
| ```python | ||
| client = FlowMqttClient( | ||
| "my_client", | ||
| transport=TransportType.QUIC, | ||
| server_name="broker.emqx.io" | ||
| ) | ||
| await client.connect("broker.emqx.io", 14567, server_name="broker.emqx.io") | ||
| ``` | ||
|
|
||
| ## Learning Path | ||
|
|
||
| 1. **Start here**: `simple_async_usage.py` - Learn the basics | ||
| 2. **Next**: `asyncio_tcp_client_example.py` - See full features | ||
| 3. **Secure**: `async_tls_client_example.py` - Try TLS transport | ||
| 4. **Modern**: `asyncio_quic_client_example.py` - Try QUIC transport | ||
|
|
||
| ## License | ||
|
|
||
| Mozilla Public License 2.0 |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,144 @@ | ||||||
|
|
||||||
| import asyncio | ||||||
| import flowsdk | ||||||
| import time | ||||||
|
|
||||||
| from flowsdk import FlowMqttClient, TransportType, MqttOptionsFfi, MqttEngineFfi | ||||||
|
|
||||||
| async def main(): | ||||||
| print("🚀 FlowSDK No-IO Async Example") | ||||||
| print("=" * 60) | ||||||
|
|
||||||
| # 1. Create the engine | ||||||
| import random | ||||||
| client_id = f"python_async_no_io_{random.randint(1000, 9999)}" | ||||||
| opts = MqttOptionsFfi( | ||||||
| client_id=client_id, | ||||||
| mqtt_version=5, | ||||||
| clean_start=True, | ||||||
| keep_alive=60, | ||||||
| username=None, | ||||||
| password=None, | ||||||
| reconnect_base_delay_ms=1000, | ||||||
| reconnect_max_delay_ms=30000, | ||||||
| ) | ||||||
| engine = MqttEngineFfi.new_with_opts(opts) | ||||||
| print(f"✅ Created MqttEngineFfi (Client ID: {client_id})") | ||||||
|
|
||||||
| # 2. Establish TCP connection using asyncio | ||||||
| broker_host = "broker.emqx.io" | ||||||
| broker_port = 1883 | ||||||
| print(f"📡 Connecting to {broker_host}:{broker_port}...") | ||||||
| try: | ||||||
| reader, writer = await asyncio.open_connection(broker_host, broker_port) | ||||||
| except Exception as e: | ||||||
| print(f"❌ Connection failed: {e}") | ||||||
| return | ||||||
|
|
||||||
| print("✅ TCP Connected") | ||||||
|
|
||||||
| # 3. Initiate MQTT connection | ||||||
| engine.connect() | ||||||
|
|
||||||
| # 4. Main orchestration loop | ||||||
| try: | ||||||
| # Run for 15 sconds | ||||||
| start_time = time.monotonic() | ||||||
| end_time = start_time + 15 | ||||||
|
|
||||||
| # Subscribe/Publish state | ||||||
| subscribed = False | ||||||
| published = False | ||||||
|
|
||||||
| while time.monotonic() < end_time: | ||||||
| # now_ms should be the time elapsed since the engine was created. | ||||||
| # Since we create the engine just before this, monotonic() - start_time is a good proxy. | ||||||
| now_ms = int((time.monotonic() - start_time) * 1000) | ||||||
|
|
||||||
| # 1. Handle protocol timers | ||||||
| engine.handle_tick(now_ms) | ||||||
|
|
||||||
| # 2. Handle network input (non-blocking-ish) | ||||||
| try: | ||||||
| # Use a small timeout to keep the loop pumping | ||||||
| data = await asyncio.wait_for(reader.read(4096), timeout=0.05) | ||||||
| if data: | ||||||
| print(f"📥 Received {len(data)} bytes from network") | ||||||
| engine.handle_incoming(data) | ||||||
| elif reader.at_eof(): | ||||||
| print("📥 Reader got EOF") | ||||||
| engine.handle_connection_lost() | ||||||
| break | ||||||
| except asyncio.TimeoutError: | ||||||
| # No data available in this tick, that's fine | ||||||
| pass | ||||||
| except Exception as e: | ||||||
| print(f"❌ Read error: {e}") | ||||||
| engine.handle_connection_lost() | ||||||
| break | ||||||
|
|
||||||
| # 3. Handle network output | ||||||
| outgoing = engine.take_outgoing() | ||||||
| if outgoing: | ||||||
| print(f"📤 Sending {len(outgoing)} bytes to network...") | ||||||
| writer.write(outgoing) | ||||||
| await writer.drain() | ||||||
|
|
||||||
| # 4. Process all accumulated events | ||||||
| events = engine.take_events() | ||||||
| for event in events: | ||||||
| if event.is_connected(): | ||||||
| res = event[0] | ||||||
| print(f"✅ MQTT Connected! (Reason: {res.reason_code}, Session Present: {res.session_present})") | ||||||
| elif event.is_disconnected(): | ||||||
| print(f"💔 MQTT Disconnected! (Reason: {event.reason_code})") | ||||||
| elif event.is_message_received(): | ||||||
| msg = event[0] | ||||||
| print(f"📨 Message on '{msg.topic}': {msg.payload.decode()} (QoS: {msg.qos})") | ||||||
| elif event.is_subscribed(): | ||||||
| res = event[0] | ||||||
| print(f"✅ Subscribed (PID: {res.packet_id}, Reasons: {res.reason_codes})") | ||||||
| elif event.is_published(): | ||||||
| res = event[0] | ||||||
| print(f"✅ Published (PID: {res.packet_id}, Reason: {res.reason_code})") | ||||||
| elif event.is_error(): | ||||||
| print(f"❌ Engine Error: {event[0].message}") | ||||||
| elif event.is_reconnect_needed(): | ||||||
| print("🔄 Engine signaled ReconnectNeeded") | ||||||
| elif event.is_reconnect_scheduled(): | ||||||
| print(f"⏰ Engine scheduled reconnect (Attempt {event.attempt}, Delay {event.delay_ms}ms)") | ||||||
| elif event.is_ping_response(): | ||||||
| print(f"🏓 Ping Response (Success: {event.success})") | ||||||
|
|
||||||
| # 5. Application logic | ||||||
| if engine.is_connected(): | ||||||
| if not subscribed: | ||||||
| pid = engine.subscribe("test/python/no_io", 1) | ||||||
| print(f"📑 API -> subscribe('test/python/no_io', qos=1) -> PID: {pid}") | ||||||
| subscribed = True | ||||||
| elif not published: | ||||||
| pid = engine.publish("test/python/no_io", b"Hello from Python No-IO!", 1, None) | ||||||
| print(f"📤 API -> publish('test/python/no_io', payload='...', qos=1) -> PID: {pid}") | ||||||
| published = True | ||||||
|
|
||||||
| except Exception as e: | ||||||
| print(f"❌ Loop error: {e}") | ||||||
| finally: | ||||||
| # 6. Disconnect gracefully | ||||||
| print("\n👋 Disconnecting...") | ||||||
| if not writer.is_closing(): | ||||||
| engine.disconnect() | ||||||
| outgoing = engine.take_outgoing() | ||||||
| if outgoing: | ||||||
| writer.write(outgoing) | ||||||
| await writer.drain() | ||||||
|
|
||||||
| writer.close() | ||||||
| await writer.wait_closed() | ||||||
| print("✅ Done") | ||||||
|
|
||||||
| if __name__ == "__main__": | ||||||
| try: | ||||||
| asyncio.run(main()) | ||||||
| except KeyboardInterrupt: | ||||||
| pass | ||||||
|
||||||
| pass | |
| print("\n⏹ Interrupted by user, shutting down...", file=sys.stderr) |
Check warning
Code scanning / CodeQL
Unpinned tag for a non-immutable Action in workflow Medium