Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
339 changes: 339 additions & 0 deletions 2026-04-rtm-sub-second-latency/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,339 @@
# Real-Time Mode (RTM) Sub-Second Latency Demo

This demo showcases Databricks Real-Time Mode (RTM) for achieving sub-second latency in streaming pipelines. It implements a stateless guardrail pipeline that validates Ethereum blockchain events in real-time.

**Blog Post**: [Unlocking Sub-Second Latency with Spark Structured Streaming Real-Time Mode](https://www.canadiandataguy.com/p/unlocking-sub-second-latency-with)

## Overview

Real-Time Mode eliminates micro-batch scheduling overhead by processing records as they arrive, achieving ~100ms end-to-end latency (5-100ms Spark processing) versus 1-5 seconds with traditional micro-batch processing.

**Use Cases:**
- Fraud detection
- IoT alerting
- Security signal processing
- Operational guardrails
- Blockchain event validation

## Requirements

- **Databricks Runtime**: 16.4 LTS or later
- **Compute**: **Dedicated (single-user) clusters ONLY**
- Serverless NOT supported
- Shared access mode NOT supported
- DLT/Lakeflow Pipelines NOT supported
- **Autoscaling**: **MUST be DISABLED** - RTM requires fixed cluster size
- **Photon**: **NOT supported** - do not enable Photon acceleration
- **Output Mode**: **`update` mode REQUIRED** - append/complete not supported
- **Kafka**: Confluent Cloud, Redpanda, or self-managed Kafka cluster
- **Permissions**: Access to create checkpoints in Unity Catalog Volumes

> **Critical**: RTM has strict compute requirements. Use dedicated clusters with fixed worker count, no Photon, and `outputMode("update")`.

## Supported Operations

| Supported | NOT Supported |
|-----------|---------------|
| Stateless transformations | forEachBatch |
| Aggregations (count, sum) | Stream-stream joins |
| Tumbling/Sliding windows | Session windows |
| Deduplication (dropDuplicates) | mapPartitions |
| Stream-table joins (broadcast) | mapGroupsWithState |
| transformWithState | Self-union (same source) |
| Union of multiple streams | |
| foreachWriter (custom sinks) | |

## Supported Sources & Sinks

**Sources:** Kafka, AWS MSK, Event Hubs (Kafka connector), Kinesis (EFO mode only)

**Sinks:** Kafka, Event Hubs, Delta Lake, foreachWriter (for JDBC/custom)

## Files

| File | Description |
|------|-------------|
| `rtm_stateless_guardrail.py` | Main notebook - RTM guardrail pipeline (Kafka to Kafka) |
| `cluster_config.template.json` | Cluster configuration with RTM settings |
| `test_rtm_guardrail.py` | Local Python tests for regex patterns and validation logic |
| `produce_test_data.py` | Test data producer for sending sample events |
| `README.md` | This documentation |

## Quick Start

### Step 1: Create RTM-Enabled Cluster

Edit `cluster_config.template.json` for your cloud:
1. Replace `REPLACE_WITH_YOUR_INSTANCE_TYPE` with an instance from your cloud provider:
- **AWS**: `i3.xlarge`, `r5.xlarge`
- **Azure**: `Standard_DS3_v2`, `Standard_E4ds_v4`
- **GCP**: `n1-highmem-4`, `n2-highmem-4`
2. Replace `REPLACE_WITH_YOUR_USER_EMAIL` with the identity that will own the cluster
3. Add the appropriate cloud attributes block (`aws_attributes`, `azure_attributes`, or `gcp_attributes`)
4. Remove all keys starting with `_` (these are comments)
5. Create the cluster via UI or CLI: `databricks clusters create --json @cluster_config.template.json`

**Important:** This demo checkpoints to a Unity Catalog Volume under `/Volumes/...`, so the cluster must run in **Single User** access mode. Shared or standard access modes may fail when the stream initializes its checkpoint location.

### Step 2: Configure Kafka Secrets

Create a secret scope with your Kafka credentials:
```bash
# Create secret scope (use any name you want)
databricks secrets create-scope my-kafka-scope

# Add your Kafka credentials
databricks secrets put-secret my-kafka-scope kafka-bootstrap-servers
databricks secrets put-secret my-kafka-scope kafka-username
databricks secrets put-secret my-kafka-scope kafka-password
```

### Step 3: Create Kafka Topics

Create the required topics in your Kafka cluster:
- Input topic (e.g., `ethereum-blocks`)
- Output topics: `<output-topic>-allowed`, `<output-topic>-quarantine`

If your Kafka provider does **not** auto-create topics, create all three topics explicitly before starting the notebook.

### Step 4: Configure and Run the Notebook

The notebook uses **Databricks widgets** for easy configuration. Set these values in the widget panel:

| Widget | Description | Default |
|--------|-------------|---------|
| `secret_scope` | Your Databricks secret scope name | `rtm-demo` |
| `input_topic` | Kafka topic to read from | `ethereum-blocks` |
| `output_topic` | Base name for output topics | `ethereum-validated` |
| `catalog` | Unity Catalog for checkpoints | `main` |
| `schema` | Schema for checkpoints | `default` |
| `checkpoint_interval` | RTM checkpoint frequency | `5 minutes` |

Run the notebook cells in order. The pipeline will start processing and routing events.

**Replay behavior:** The notebook now reads with `startingOffsets = "earliest"` so seeded backlog is replayed during demos and integration tests. If you only want new records produced after the stream starts, change this back to `"latest"`.

**Notebook flow after the stream starts:**
- **Section 11, "Verify Routed Output Topics"** reads the Kafka output topics back and shows what was actually written to `-allowed` and `-quarantine`
- **Section 12, "Stream Management"** shows query status and provides the cells you use to stop the stream cleanly when you are done validating it

## Testing

### End-to-End Test Results

This demo was validated on the `e2-dogfood` staging workspace using Databricks Runtime 16.4 LTS and Redpanda Serverless.

**Test Setup:**
- Workspace: `https://e2-dogfood.staging.cloud.databricks.com`
- Cluster: `rtm-guardrail-cluster` (`0313-063110-u4ldfaiy`)
- Cluster mode: dedicated, single-user, fixed workers, RTM enabled
- Kafka: Redpanda Serverless with SASL/SCRAM over SSL
- Secret scope: `rtm-demo`

**What Was Verified:**
- Pattern detection tests passed locally for email, SSN, credit card, AWS key, JWT, and Ethereum private key cases
- The notebook was uploaded to the workspace and executed on the RTM-enabled cluster
- The stream successfully read from `ethereum-blocks` and wrote to:
- `ethereum-validated-allowed`
- `ethereum-validated-quarantine`
- Output records were inspected directly in Kafka to confirm routing decisions and validation reasons

**Live Routing Checks:**
1. Block `2000001` (clean input) → **ALLOW** → `ethereum-validated-allowed`
2. Block `2000002` (high gas usage) → **QUARANTINE** → `ethereum-validated-quarantine` with `HIGH_GAS_USAGE`
3. Block `2000003` (email in `extra_data`) → **QUARANTINE** → `ethereum-validated-quarantine` with `PII_EMAIL`

**Notes:**
- The notebook now uses `startingOffsets = "earliest"` so pre-seeded test data can be replayed during demos and integration tests
- End-to-end validation here focused on correctness of routing and guardrail behavior rather than publishing a latency benchmark

## Checkpoint Best Practices

### Stable Checkpoint Paths

**CRITICAL**: Never use dynamic values (UUIDs, timestamps) in checkpoint paths for production streams.

```python
# BAD - breaks recovery after restart
CHECKPOINT_LOCATION = f"/Volumes/.../rtm_guardrail_{uuid.uuid4()}"

# GOOD - stable path enables recovery
CHECKPOINT_LOCATION = "/Volumes/catalog/schema/checkpoints/rtm_guardrail_ethereum_blocks"
```

**Why it matters:**
- Checkpoints contain the query ID and offset tracking state
- A new checkpoint path = new query ID = cannot resume from previous offsets
- After restart, the stream would either miss data or reprocess everything

### Checkpoint Naming Convention

Follow a meaningful naming pattern:
```
/Volumes/{catalog}/{schema}/checkpoints/{pipeline_name}_{source_topic}
```

### Checkpoint Protection

1. **Never delete checkpoints** in production without understanding the implications
2. **Enable access logging** on the checkpoint volume for audit trails
3. **Use Unity Catalog Volumes** with proper access controls
4. **Document checkpoint ownership** - which job/team owns each checkpoint

## Rate Limiting

**IMPORTANT**: `maxOffsetsPerTrigger` is **NOT compatible with Real-Time Mode**. RTM processes records as they arrive without rate limiting.

For traditional micro-batch streaming (non-RTM), you can use `maxOffsetsPerTrigger`:

```python
# NOT compatible with RTM
.option("maxOffsetsPerTrigger", 100000) # Only for micro-batch mode
```

**For RTM pipelines:**
- RTM processes all available records immediately (no rate limiting)
- Control throughput via cluster sizing and partition count
- Use Kafka topic partitions to control parallelism
- Scale compute resources to handle peak load

## Compute Sizing for RTM

RTM requires all query stages to run simultaneously. Calculate required task slots:

```
Required slots = source_partitions + (shuffle_partitions x number_of_stages)
```

| Pipeline Type | Configuration | Required Slots |
|---------------|---------------|----------------|
| Single-stage stateless | 8 Kafka partitions | 8 |
| Two-stage stateful | 8 partitions + 20 shuffle | 28 |
| Three-stage complex | 8 + 20 + 20 shuffle | 48 |

**Best Practices:**
- Target ~50% cluster utilization for headroom
- Set `maxPartitions` to consolidate Kafka partitions per task
- Use `spark.sql.shuffle.partitions` = 8-20 (not default 200)

## State Store Configuration

### RocksDB for All Streaming Jobs

Even for "stateless" pipelines, configure RocksDB as the state store provider:

```python
spark.conf.set("spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled",
"true")
```

**Why:**
- Better performance for any stateful operations
- Faster checkpoint recovery with changelog checkpointing
- Required if you ever add stateful operations (aggregations, dedup, joins)
- Consistent behavior across all streaming pipelines

## Kafka Configuration

### Timeout Settings

Configure appropriate timeouts for production stability:

```python
kafka_options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.request.timeout.ms": "60000", # 60 seconds
"kafka.session.timeout.ms": "30000", # 30 seconds
"kafka.group.id": "rtm-guardrail-app" # Consumer group tracking
# Note: maxOffsetsPerTrigger is NOT compatible with RTM
}
```

### Consumer Group ID

Always set `kafka.group.id` for:
- Tracking consumer lag in Kafka monitoring tools
- Identifying your application in broker logs
- Proper offset management

## Production Considerations

### Secrets Management

**Never hardcode credentials.** Use Databricks secrets:

```python
# Good - secrets manager
KAFKA_USERNAME = dbutils.secrets.get(scope="rtm-demo", key="kafka-username")
KAFKA_PASSWORD = dbutils.secrets.get(scope="rtm-demo", key="kafka-password")

# Bad - hardcoded
KAFKA_USERNAME = "my-username" # NEVER DO THIS
```

### Monitoring Alerts

Set up alerts for:

| Metric | Alert Threshold | Action |
|--------|-----------------|--------|
| `inputRowsPerSecond` == 0 | > 5 minutes | Check Kafka connectivity |
| `processedRowsPerSecond` < `inputRowsPerSecond` | Sustained | Scale cluster or increase partitions |
| Batch duration | > 1 second for RTM | Check for data skew or resource contention |
| Query status | Not active | Page on-call |

### Dynamic Topic Routing

Route ALLOW and QUARANTINE events to separate topics for different downstream processing:

```python
df_with_topic = df_enriched.withColumn(
"topic",
F.when(F.col("is_quarantined"), F.lit(f"{OUTPUT_TOPIC}-quarantine"))
.otherwise(F.lit(f"{OUTPUT_TOPIC}-allowed"))
)
```

Benefits:
- Quarantined events can be processed by a separate investigation pipeline
- Allowed events flow directly to production consumers
- Easier to monitor and alert on quarantine rate

## Troubleshooting

### Query Not Starting

1. Verify RTM is enabled: `spark.conf.get("spark.databricks.streaming.realTimeMode.enabled")`
2. Check DBR version is 16.4+
3. Ensure `outputMode("update")` is set (required for RTM)

### High Latency

1. Reduce `spark.sql.shuffle.partitions` (try 4-8 for RTM)
2. Check for data skew in partition keys
3. Verify cluster has sufficient resources
4. Reduce Kafka topic partition count if oversubscribed
5. Scale up cluster to handle increased throughput

### Checkpoint Recovery Issues

1. Verify checkpoint path hasn't changed
2. Check checkpoint directory permissions
3. Review checkpoint contents with `dbutils.fs.ls(checkpoint_path)`
4. Look for corrupt metadata files

## References

- [Unlocking Sub-Second Latency with RTM](https://www.canadiandataguy.com/p/unlocking-sub-second-latency-with) - Canadian Data Guy
- [Databricks Structured Streaming Guide](https://docs.databricks.com/structured-streaming/)

## Author

Jitesh Soni - [Canadian Data Guy](https://www.canadiandataguy.com)

---

*Last updated: March 2026*
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading