| Query | Spark UI “Details for Query” timestamp | Wall-clock run-time |
|---|---|---|
| Q1 (pdf #1) | 15:25 – 15:34 | ≈ 9 min |
| Q2 (pdf #2) | 15:52 – 16:01 | ≈ 9 min |
Both queries finish, but several stages last > 1 h while the driver shows only ~9 min of wall-time because the long-running stages reuse cached material from earlier jobs; they are still the performance ceiling.
| Component | Evidence (single worst stage) | Why it hurts |
|---|---|---|
Shuffle Exchange on id & value |
724 GiB of records read, 60 GiB written, 8.6 min writer time, 200 partitions | Large partitions ⇒ long tasks & high network I/O. |
| Sort-Merge Join | Peak task memory 7.1 GiB, aggregate 619 GiB, spilled ≈ 290 GiB | Data skew & only 200 shuffle partitions force repeated spill-to-disk. |
| Window (max logical_ts) | Window stage keeps 64 GiB in RAM per executor; no spill yet but close to limit | Requires a full sort inside each partition. |
| Remote shuffle fetch | 1.17 h cumulative fetch-request latency for a single Exchange | Cross-node traffic becomes serialized when partitions are few & large. |
| Whole-Stage Codegen blocks | 6 different WSCG operators run 1.3 – 1.7 h each (Q2 p. 3) | They encapsulate cascaded filters / projections but inherit shuffle cost. |
-
Too few shuffle partitions
200 partitions for 700 + GiB ⇒ each partition ≈ 3.5 GiB; a single task must sort/join GB-scale data and spills frequently. -
Partition skew on
id
High-frequency IDs cluster heavy keys in just a few of the 200 buckets, amplifying spills and long tails. -
Unnecessary in-memory caching of 1.5 TiB
TableCacheQueryStage → InMemoryTableScankeeps the entire 1.84 B-row time-slice cached for repeated reads; that is faster than re-scanning Iceberg, but memory pressure forces executor spills during joins/windows. -
Filters not pushed to Iceberg partition columns
Thewall_timestamp BETWEEN T-10min AND Tcondition appears only after the scan; because the table isn’t partitioned onwall_timestamp, full manifests are still loaded and cached. -
Small “state” table not broadcast
quantization_state(~7.8 k rows after hash-agg) is still hash-partitioned and sorted, triggering the heavy Sort-Merge join.
| Knob | Suggested value | Expected effect |
|---|---|---|
spark.sql.shuffle.partitions |
200 → 2000 (or enable AQE coalesce) | Reduce partition size to < 400 MiB, cutting spill & skew impact. |
spark.sql.adaptive.enabled |
true + spark.sql.adaptive.skewJoin.enabled=true |
Splits abnormally large shuffle blocks and handles key-skew. |
spark.sql.autoBroadcastJoinThreshold |
128m (default) → larger than state table (~30 MiB) |
Broadcast quantization_stateand avoid Sort-Merge join. |
spark.sql.windowExec.buffer.spill.threshold |
Lower (e.g. 128 MiB) | Force controlled spilling in window instead of executor OOM. |
-
Replace window (max) with grouped agg + semi-join
val maxTs = df.groupBy($"id").agg(max($"logical_timestamp").as("max_ts")) val trimmed = df.join(maxTs, Seq("id")) .filter($"logical_timestamp" < $"max_ts")
avoids the expensive
Window + Sortper-partition. -
Pre-bucket the raw table on
idandwall_timestamp
A 512-bucket Iceberg write or Z-ordering on(id, wall_timestamp)enables scan‐time pruning and aligns with shuffle keys. -
Drop the global cache for the 10-minute slice
Usepersist(StorageLevel.DISK_ONLY_2)or no cache; the slice is re-scanned only twice (q1/q2) and cheaper than RAM pressure. -
Avoid
AppendColumns + MapGroupsif possible
If the mapGroups step is just post-aggregation formatting, preferwithColumn+ vectorized write; this eliminates the wide hash-partition (value#826). -
Use incremental MERGE/UPDATE instead of full-batch
Writing only the changed aggregates tobq_quantization_state_v1shrinks the Exchange(46) that currently rebalances all rows.
-
Give executors 8 – 10 × more cores but the same memory to capitalize on narrower partitions after increasing
shuffle.partitions. -
Enable Remote Shuffle Service (Spark 3.5) or the cloud vendor’s external shuffle to reduce executor-side spill by leveraging SSD on shuffle nodes.
-
If running on Kubernetes, pin executors with the Window stage to high-mem node pool.
-
Re-run Q2 after broadcast-joining the state table; target ≤ 200 MB shuffle write and no spills.
-
Enable AQE & skew join: monitor stage-level partition counts – you should see 1800–2000 output partitions vs current 200.
-
Compare
SortMergeJoinspill metric; aim for < 5 GiB total spill and 0 OOM warnings. -
Time the full pipeline; with reduced shuffle + spills, a single batch should finish < 5 min, meeting your SLA.
The job’s wall-time hides serious back-pressure in shuffle and sort stages. Increasing partition granularity, broadcasting the small side, and pruning/caching more judiciously will convert those hour-long inner stages into second-long operations and make the 5-minute target realistic.