Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,24 @@
- name: Run security audit
run: cargo audit

python-bindings:
name: Python Bindings
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Install Rust
uses: dtolnay/rust-toolchain@stable

Check warning

Code scanning / CodeQL

Unpinned tag for a non-immutable Action in workflow Medium

Unpinned 3rd party Action 'CI' step
Uses Step
uses 'dtolnay/rust-toolchain' with ref 'stable', not a pinned commit hash

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.10'

- name: Build and Test Bindings
run: ./scripts/build_python_bindings.sh --test

coverage:

Check warning

Code scanning / CodeQL

Workflow does not contain permissions Medium

Actions job or workflow does not limit the permissions of the GITHUB_TOKEN. Consider setting an explicit permissions block, using the following as a minimal starting point: {contents: read}
name: Code Coverage
runs-on: ubuntu-latest
Expand Down
59 changes: 59 additions & 0 deletions .github/workflows/release_pypi.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
name: Publish to PyPI
permissions:
contents: read

on:
release:
types: [published]

jobs:
build_wheels:
name: Build wheels on ${{ matrix.os }}
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]

steps:
- uses: actions/checkout@v4

- name: Install Rust
uses: dtolnay/rust-toolchain@stable

Check warning

Code scanning / CodeQL

Unpinned tag for a non-immutable Action in workflow Medium

Unpinned 3rd party Action 'Publish to PyPI' step
Uses Step
uses 'dtolnay/rust-toolchain' with ref 'stable', not a pinned commit hash

- name: Build and Prepare Bindings
shell: bash
run: |
./scripts/build_python_bindings.sh --release

- name: Build wheels
uses: pypa/cibuildwheel@v2

Check warning

Code scanning / CodeQL

Unpinned tag for a non-immutable Action in workflow Medium

Unpinned 3rd party Action 'Publish to PyPI' step
Uses Step
uses 'pypa/cibuildwheel' with ref 'v2', not a pinned commit hash
with:
package-dir: python/package
env:
CIBW_BEFORE_BUILD: "pip install setuptools wheel"
# Linux specific: ensure library is copied correctly if needed
# But our script handles the dylib placement.

- uses: actions/upload-artifact@v4
with:
name: cibw-wheels-${{ matrix.os }}-${{ strategy.job-index }}
path: ./wheelhouse/*.whl

upload_pypi:
needs: [build_wheels]
runs-on: ubuntu-latest
environment:
name: pypi
url: https://pypi.org/p/flowsdk
permissions:
id-token: write
steps:
- uses: actions/download-artifact@v4
with:
pattern: cibw-wheels-*
path: dist
merge-multiple: true

- uses: pypa/gh-action-pypi-publish@release/v1

Check warning

Code scanning / CodeQL

Unpinned tag for a non-immutable Action in workflow Medium

Unpinned 3rd party Action 'Publish to PyPI' step
Uses Step
uses 'pypa/gh-action-pypi-publish' with ref 'release/v1', not a pinned commit hash
with:
password: ${{ secrets.PYPI_API_TOKEN }}
Comment on lines +58 to +59
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PyPI workflow uses authentication via password: ${{ secrets.PYPI_API_TOKEN }} on line 57, but according to the PyPI documentation and GitHub Actions best practices, trusted publishing with OIDC (which is already configured via id-token: write on line 47) should be used without specifying a password parameter. The presence of both OIDC configuration and password authentication is contradictory.

Suggested change
with:
password: ${{ secrets.PYPI_API_TOKEN }}

Copilot uses AI. Check for mistakes.
15 changes: 15 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,18 @@ Cargo.lock

*.profraw
*.profdata


# FFI generated code
python/package/flowsdk/flowsdk_ffi.py
python/package/flowsdk/libflowsdk_ffi.dylib
python/package/flowsdk/libflowsdk_ffi.so
python/package/flowsdk/libflowsdk_ffi.dll

# Python Examples (generated files)
python/examples/flowsdk_ffi.py
python/examples/*.dylib
python/examples/*.so
python/examples/*.dll

python/**/*.pyc
137 changes: 137 additions & 0 deletions python/examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# FlowSDK Python Examples

This directory contains examples demonstrating various usage patterns of the FlowSDK Python bindings.

## Quick Start Examples

### 1. Simple Async Usage (`simple_async_usage.py`)
**Minimal async MQTT client example - Start here!**

```bash
PYTHONPATH=../package python3 simple_async_usage.py
```

The simplest way to use FlowSDK with asyncio. Shows connect, subscribe, publish, and disconnect in just a few lines.

### 2. Asyncio TCP Client (`asyncio_tcp_client_example.py`)
**Standard async client using TCP**

```bash
PYTHONPATH=../package python3 asyncio_tcp_client_example.py
```

Demonstrates the high-level `FlowMqttClient` API using standard TCP transport with message callbacks and proper error handling.

## Secure Transport Examples

### 3. TLS Transport (`async_tls_client_example.py`)
**MQTT over TLS for secure communication**

```bash
PYTHONPATH=../package python3 async_tls_client_example.py
```

Shows how to use MQTT over TLS (port 8883) with the unified `FlowMqttClient`. Includes:
- Explicit TLS transport selection
- Certificate verification (default)
- SNI support via `server_name`

### 4. QUIC Transport (`asyncio_quic_client_example.py`)
**MQTT over QUIC for modern UDP-based transport**

```bash
PYTHONPATH=../package python3 asyncio_quic_client_example.py
```

Shows how to use MQTT over QUIC protocol with the unified `FlowMqttClient`. QUIC provides:
- Built-in encryption (TLS 1.3)
- Connection migration
- Lower latency than TCP
- No head-of-line blocking

**Requirements**: QUIC-enabled MQTT broker (e.g., EMQX 5.0+ with QUIC listener on port 14567)

## Low-Level Examples

### 5. Select-Based Client (`select_example.py`)
**Non-blocking I/O using select()**

```bash
PYTHONPATH=../package python3 select_example.py
```

Demonstrates manual socket management with the select() system call.

### 6. Async with Manual I/O (`async_example.py`)
**Asyncio with manual socket operations**

```bash
PYTHONPATH=../package python3 async_example.py
```

Shows how to manually integrate the MQTT engine with asyncio event loop using socket I/O.

## Testing

### 8. Binding Tests (`test_binding.py`)
**Low-level FFI binding verification**

```bash
PYTHONPATH=../package python3 test_binding.py
```

## Architecture Overview

### High-Level API (Recommended)
```
FlowMqttClient → FlowMqttProtocol → MqttEngineFfi → Rust FFI
```
- Simple async/await API
- Automatic network handling
- Built-in reconnection support

### Low-Level API (Advanced)
```
Your Code → MqttEngineFfi → Manual I/O → Network
```

## Transport Selection

FlowSDK supports multiple transport protocols via the unified `FlowMqttClient`:

### TCP
```python
client = FlowMqttClient("my_client", transport=TransportType.TCP)
await client.connect("broker.emqx.io", 1883)
```

### TLS
```python
client = FlowMqttClient(
"my_client",
transport=TransportType.TLS,
server_name="broker.emqx.io"
)
await client.connect("broker.emqx.io", 8883)
```

### QUIC
```python
client = FlowMqttClient(
"my_client",
transport=TransportType.QUIC,
server_name="broker.emqx.io"
)
await client.connect("broker.emqx.io", 14567, server_name="broker.emqx.io")
```

## Learning Path

1. **Start here**: `simple_async_usage.py` - Learn the basics
2. **Next**: `asyncio_tcp_client_example.py` - See full features
3. **Secure**: `async_tls_client_example.py` - Try TLS transport
4. **Modern**: `asyncio_quic_client_example.py` - Try QUIC transport

## License

Mozilla Public License 2.0
144 changes: 144 additions & 0 deletions python/examples/async_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@

import asyncio
import flowsdk
import time

from flowsdk import FlowMqttClient, TransportType, MqttOptionsFfi, MqttEngineFfi

async def main():
print("🚀 FlowSDK No-IO Async Example")
print("=" * 60)

# 1. Create the engine
import random
client_id = f"python_async_no_io_{random.randint(1000, 9999)}"
opts = MqttOptionsFfi(
client_id=client_id,
mqtt_version=5,
clean_start=True,
keep_alive=60,
username=None,
password=None,
reconnect_base_delay_ms=1000,
reconnect_max_delay_ms=30000,
)
engine = MqttEngineFfi.new_with_opts(opts)
print(f"✅ Created MqttEngineFfi (Client ID: {client_id})")

# 2. Establish TCP connection using asyncio
broker_host = "broker.emqx.io"
broker_port = 1883
print(f"📡 Connecting to {broker_host}:{broker_port}...")
try:
reader, writer = await asyncio.open_connection(broker_host, broker_port)
except Exception as e:
print(f"❌ Connection failed: {e}")
return

print("✅ TCP Connected")

# 3. Initiate MQTT connection
engine.connect()

# 4. Main orchestration loop
try:
# Run for 15 sconds
start_time = time.monotonic()
end_time = start_time + 15

# Subscribe/Publish state
subscribed = False
published = False

while time.monotonic() < end_time:
# now_ms should be the time elapsed since the engine was created.
# Since we create the engine just before this, monotonic() - start_time is a good proxy.
now_ms = int((time.monotonic() - start_time) * 1000)

# 1. Handle protocol timers
engine.handle_tick(now_ms)

# 2. Handle network input (non-blocking-ish)
try:
# Use a small timeout to keep the loop pumping
data = await asyncio.wait_for(reader.read(4096), timeout=0.05)
if data:
print(f"📥 Received {len(data)} bytes from network")
engine.handle_incoming(data)
elif reader.at_eof():
print("📥 Reader got EOF")
engine.handle_connection_lost()
break
except asyncio.TimeoutError:
# No data available in this tick, that's fine
pass
except Exception as e:
print(f"❌ Read error: {e}")
engine.handle_connection_lost()
break

# 3. Handle network output
outgoing = engine.take_outgoing()
if outgoing:
print(f"📤 Sending {len(outgoing)} bytes to network...")
writer.write(outgoing)
await writer.drain()

# 4. Process all accumulated events
events = engine.take_events()
for event in events:
if event.is_connected():
res = event[0]
print(f"✅ MQTT Connected! (Reason: {res.reason_code}, Session Present: {res.session_present})")
elif event.is_disconnected():
print(f"💔 MQTT Disconnected! (Reason: {event.reason_code})")
elif event.is_message_received():
msg = event[0]
print(f"📨 Message on '{msg.topic}': {msg.payload.decode()} (QoS: {msg.qos})")
elif event.is_subscribed():
res = event[0]
print(f"✅ Subscribed (PID: {res.packet_id}, Reasons: {res.reason_codes})")
elif event.is_published():
res = event[0]
print(f"✅ Published (PID: {res.packet_id}, Reason: {res.reason_code})")
elif event.is_error():
print(f"❌ Engine Error: {event[0].message}")
elif event.is_reconnect_needed():
print("🔄 Engine signaled ReconnectNeeded")
elif event.is_reconnect_scheduled():
print(f"⏰ Engine scheduled reconnect (Attempt {event.attempt}, Delay {event.delay_ms}ms)")
elif event.is_ping_response():
print(f"🏓 Ping Response (Success: {event.success})")

# 5. Application logic
if engine.is_connected():
if not subscribed:
pid = engine.subscribe("test/python/no_io", 1)
print(f"📑 API -> subscribe('test/python/no_io', qos=1) -> PID: {pid}")
subscribed = True
elif not published:
pid = engine.publish("test/python/no_io", b"Hello from Python No-IO!", 1, None)
print(f"📤 API -> publish('test/python/no_io', payload='...', qos=1) -> PID: {pid}")
published = True

except Exception as e:
print(f"❌ Loop error: {e}")
finally:
# 6. Disconnect gracefully
print("\n👋 Disconnecting...")
if not writer.is_closing():
engine.disconnect()
outgoing = engine.take_outgoing()
if outgoing:
writer.write(outgoing)
await writer.drain()

writer.close()
await writer.wait_closed()
print("✅ Done")

if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'except' clause does nothing but pass and there is no explanatory comment.

Suggested change
pass
print("\n⏹ Interrupted by user, shutting down...", file=sys.stderr)

Copilot uses AI. Check for mistakes.
Loading