Skip to content

Latest commit

 

History

History
106 lines (65 loc) · 5.44 KB

File metadata and controls

106 lines (65 loc) · 5.44 KB

1. Big-picture timeline

Query Spark UI “Details for Query” timestamp Wall-clock run-time
Q1 (pdf #1) 15:25 – 15:34 ≈ 9 min
Q2 (pdf #2) 15:52 – 16:01 ≈ 9 min

Both queries finish, but several stages last > 1 h while the driver shows only ~9 min of wall-time because the long-running stages reuse cached material from earlier jobs; they are still the performance ceiling.


2. Hot-spots that dominate the critical path

Component Evidence (single worst stage) Why it hurts
Shuffle Exchange on id & value 724 GiB of records read, 60 GiB written, 8.6 min writer time, 200 partitions Large partitions ⇒ long tasks & high network I/O.
Sort-Merge Join Peak task memory 7.1 GiB, aggregate 619 GiB, spilled ≈ 290 GiB Data skew & only 200 shuffle partitions force repeated spill-to-disk.
Window (max logical_ts) Window stage keeps 64 GiB in RAM per executor; no spill yet but close to limit Requires a full sort inside each partition.
Remote shuffle fetch 1.17 h cumulative fetch-request latency for a single Exchange Cross-node traffic becomes serialized when partitions are few & large.
Whole-Stage Codegen blocks 6 different WSCG operators run 1.3 – 1.7 h each (Q2 p. 3) They encapsulate cascaded filters / projections but inherit shuffle cost.

3. Root causes

  1. Too few shuffle partitions
    200 partitions for 700 + GiB ⇒ each partition ≈ 3.5 GiB; a single task must sort/join GB-scale data and spills frequently.

  2. Partition skew on id
    High-frequency IDs cluster heavy keys in just a few of the 200 buckets, amplifying spills and long tails.

  3. Unnecessary in-memory caching of 1.5 TiB
    TableCacheQueryStage → InMemoryTableScan keeps the entire 1.84 B-row time-slice cached for repeated reads; that is faster than re-scanning Iceberg, but memory pressure forces executor spills during joins/windows.

  4. Filters not pushed to Iceberg partition columns
    The wall_timestamp BETWEEN T-10min AND T condition appears only after the scan; because the table isn’t partitioned on wall_timestamp, full manifests are still loaded and cached.

  5. Small “state” table not broadcast
    quantization_state (~7.8 k rows after hash-agg) is still hash-partitioned and sorted, triggering the heavy Sort-Merge join.


4. Actionable recommendations

4.1 Parameter quick-wins (same code)

Knob Suggested value Expected effect
spark.sql.shuffle.partitions 200 → 2000 (or enable AQE coalesce) Reduce partition size to < 400 MiB, cutting spill & skew impact.
spark.sql.adaptive.enabled true + spark.sql.adaptive.skewJoin.enabled=true Splits abnormally large shuffle blocks and handles key-skew.
spark.sql.autoBroadcastJoinThreshold 128m (default) → larger than state table (~30 MiB) Broadcast quantization_stateand avoid Sort-Merge join.
spark.sql.windowExec.buffer.spill.threshold Lower (e.g. 128 MiB) Force controlled spilling in window instead of executor OOM.

4.2 Query-level refactors

  1. Replace window (max) with grouped agg + semi-join

    val maxTs = df.groupBy($"id").agg(max($"logical_timestamp").as("max_ts"))
    val trimmed = df.join(maxTs, Seq("id"))
                    .filter($"logical_timestamp" < $"max_ts")

    avoids the expensive Window + Sort per-partition.

  2. Pre-bucket the raw table on id and wall_timestamp
    A 512-bucket Iceberg write or Z-ordering on (id, wall_timestamp) enables scan‐time pruning and aligns with shuffle keys.

  3. Drop the global cache for the 10-minute slice
    Use persist(StorageLevel.DISK_ONLY_2) or no cache; the slice is re-scanned only twice (q1/q2) and cheaper than RAM pressure.

  4. Avoid AppendColumns + MapGroups if possible
    If the mapGroups step is just post-aggregation formatting, prefer withColumn + vectorized write; this eliminates the wide hash-partition (value#826).

  5. Use incremental MERGE/UPDATE instead of full-batch
    Writing only the changed aggregates to bq_quantization_state_v1 shrinks the Exchange(46) that currently rebalances all rows.

4.3 Infrastructure & cluster tuning

  • Give executors 8 – 10 × more cores but the same memory to capitalize on narrower partitions after increasing shuffle.partitions.

  • Enable Remote Shuffle Service (Spark 3.5) or the cloud vendor’s external shuffle to reduce executor-side spill by leveraging SSD on shuffle nodes.

  • If running on Kubernetes, pin executors with the Window stage to high-mem node pool.


5. Validation next-steps

  1. Re-run Q2 after broadcast-joining the state table; target ≤ 200 MB shuffle write and no spills.

  2. Enable AQE & skew join: monitor stage-level partition counts – you should see 1800–2000 output partitions vs current 200.

  3. Compare SortMergeJoin spill metric; aim for < 5 GiB total spill and 0 OOM warnings.

  4. Time the full pipeline; with reduced shuffle + spills, a single batch should finish < 5 min, meeting your SLA.


Bottom line

The job’s wall-time hides serious back-pressure in shuffle and sort stages. Increasing partition granularity, broadcasting the small side, and pruning/caching more judiciously will convert those hour-long inner stages into second-long operations and make the 5-minute target realistic.