From c08ff86e2b8fc62b9e66ab2444d566f4d73efcc2 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sat, 2 May 2026 12:45:54 -0700 Subject: [PATCH 1/3] fix(amber): surface writer-thread failure as FatalError instead of silent hang OutputPortResultWriterThread previously let exceptions in close() escape Thread.run(), so the iceberg commit failure was invisible to the worker, the controller still saw normal portCompleted, downstream operators read incomplete data, and the test/user observed only a 1-minute Await timeout. Capture the failure, re-throw on closeOutputStorageWriterIfNeeded, let DPThread's existing MainThreadDelegateMessage path route it to the worker actor, and let the Controller's AllForOneStrategy supervisor emit FatalError to the client immediately. Closes #4682. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../messaginglayer/OutputManager.scala | 6 +++++ .../OutputPortResultWriterThread.scala | 27 ++++++++++++++----- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala index 4ab3d18056f..affbd786f9b 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -235,6 +235,11 @@ class OutputManager( /** * Singal the port storage writer to flush the remaining buffer and wait for commits to finish so that * the output port is properly completed. If the output port does not need storage, no action will be done. + * + * If the writer thread captured a failure (e.g., iceberg commit retries + * exhausted), re-throw it here so the DP thread surfaces a FatalError + * to the controller via pekko's supervisor strategy. Otherwise the worker + * would announce port completion as if the result was durably written. */ def closeOutputStorageWriterIfNeeded(outputPortId: PortIdentity): Unit = { this.outputPortResultWriterThreads.get(outputPortId) match { @@ -243,6 +248,7 @@ class OutputManager( writerThread.queue.put(Right(PortStorageWriterTerminateSignal)) // Blocking call writerThread.join() + writerThread.getFailure.foreach(throw _) case None => } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala index 28e5d2af667..2a376cb3da3 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala @@ -24,6 +24,7 @@ import org.apache.texera.amber.core.storage.model.BufferedItemWriter import org.apache.texera.amber.core.tuple.Tuple import java.util.concurrent.LinkedBlockingQueue +import scala.util.control.NonFatal sealed trait TerminateSignal case object PortStorageWriterTerminateSignal extends TerminateSignal @@ -35,15 +36,27 @@ class OutputPortResultWriterThread( val queue: LinkedBlockingQueue[Either[Tuple, TerminateSignal]] = Queues.newLinkedBlockingQueue[Either[Tuple, TerminateSignal]]() + // Captured failure from put-one or close() so the worker DP thread can + // re-throw and let the controller's pekko supervisor surface a FatalError + // to the client. Without this, the writer thread dies silently and the + // worker keeps reporting normal port completion to the controller while + // results are missing or stale, leading to e2e timeouts that hide the + // real cause. + @volatile private var failure: Option[Throwable] = None + def getFailure: Option[Throwable] = failure + override def run(): Unit = { - var internalStop = false - while (!internalStop) { - val queueContent = queue.take() - queueContent match { - case Left(tuple) => bufferedItemWriter.putOne(tuple) - case Right(_) => internalStop = true + try { + var internalStop = false + while (!internalStop) { + queue.take() match { + case Left(tuple) => bufferedItemWriter.putOne(tuple) + case Right(_) => internalStop = true + } } + bufferedItemWriter.close() + } catch { + case NonFatal(e) => failure = Some(e) } - bufferedItemWriter.close() } } From 3fa2da06971774934a6205eea29607b5565bb933 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sat, 2 May 2026 15:32:29 -0700 Subject: [PATCH 2/3] test(amber): cover writer-thread failure capture and re-throw MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add OutputPortResultWriterThreadSpec with 4 tests: - OutputPortResultWriterThread leaves getFailure empty on a clean run. - OutputPortResultWriterThread captures a close() exception in getFailure so the worker can re-throw it. - OutputManager.closeOutputStorageWriterIfNeeded re-throws the writer thread's captured failure (this is the bridge from the writer thread to the DP thread → worker actor → controller supervisor → FatalError to client). - OutputManager.closeOutputStorageWriterIfNeeded is a no-op when the port has no writer thread. Together these pin every link of the fatal-error chain that this PR introduces. The OutputManager test reaches into the private outputPortResultWriterThreads map by reflection rather than going through addPort, which would require a real iceberg URI; the test file otherwise only depends on a 4-method stub BufferedItemWriter. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../OutputPortResultWriterThreadSpec.scala | 115 ++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThreadSpec.scala diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThreadSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThreadSpec.scala new file mode 100644 index 00000000000..3fb459ac15f --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThreadSpec.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.worker.managers + +import org.apache.texera.amber.core.storage.model.BufferedItemWriter +import org.apache.texera.amber.core.tuple.Tuple +import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity +import org.apache.texera.amber.core.workflow.PortIdentity +import org.apache.texera.amber.engine.architecture.messaginglayer.{ + NetworkOutputGateway, + OutputManager +} +import org.apache.texera.amber.engine.common.ambermessage.WorkflowFIFOMessage +import org.scalatest.flatspec.AnyFlatSpec + +import scala.collection.mutable + +class OutputPortResultWriterThreadSpec extends AnyFlatSpec { + + private class StubWriter(throwOnClose: Boolean = false) extends BufferedItemWriter[Tuple] { + val bufferSize: Int = 1024 + var closeCalled = false + def open(): Unit = () + def putOne(item: Tuple): Unit = () + def removeOne(item: Tuple): Unit = () + def close(): Unit = { + closeCalled = true + if (throwOnClose) throw new RuntimeException("test close failure") + } + } + + "OutputPortResultWriterThread" should "leave getFailure empty on a clean run" in { + val writer = new StubWriter() + val thread = new OutputPortResultWriterThread(writer) + thread.start() + thread.queue.put(Right(PortStorageWriterTerminateSignal)) + thread.join() + assert(thread.getFailure.isEmpty) + assert(writer.closeCalled) + } + + it should "capture a close() exception in getFailure so the worker can re-throw" in { + val writer = new StubWriter(throwOnClose = true) + val thread = new OutputPortResultWriterThread(writer) + thread.start() + thread.queue.put(Right(PortStorageWriterTerminateSignal)) + thread.join() + assert(thread.getFailure.exists(_.getMessage.contains("test close failure"))) + assert(writer.closeCalled) + } + + // Reach into OutputManager's private outputPortResultWriterThreads map to + // install a writer thread whose close() has already failed. This pins the + // contract that closeOutputStorageWriterIfNeeded re-throws the captured + // failure, which is the bridge from the writer thread to the DP thread → + // worker actor → controller supervisor → FatalError to client. + private def installWriterThread( + manager: OutputManager, + portId: PortIdentity, + thread: OutputPortResultWriterThread + ): Unit = { + val field = classOf[OutputManager] + .getDeclaredField("outputPortResultWriterThreads") + field.setAccessible(true) + field + .get(manager) + .asInstanceOf[mutable.HashMap[PortIdentity, OutputPortResultWriterThread]] + .put(portId, thread) + } + + "OutputManager.closeOutputStorageWriterIfNeeded" should + "re-throw the writer thread's captured failure" in { + val identifier = ActorVirtualIdentity("test-worker") + val outputManager = new OutputManager( + identifier, + new NetworkOutputGateway(identifier, (_: WorkflowFIFOMessage) => ()) + ) + val portId = PortIdentity() + val failingWriter = new StubWriter(throwOnClose = true) + val failingThread = new OutputPortResultWriterThread(failingWriter) + failingThread.start() + installWriterThread(outputManager, portId, failingThread) + val ex = intercept[RuntimeException] { + outputManager.closeOutputStorageWriterIfNeeded(portId) + } + assert(ex.getMessage.contains("test close failure")) + } + + it should "be a no-op when the port has no writer thread" in { + val identifier = ActorVirtualIdentity("test-worker") + val outputManager = new OutputManager( + identifier, + new NetworkOutputGateway(identifier, (_: WorkflowFIFOMessage) => ()) + ) + // No installWriterThread call — the port has never had a writer. + outputManager.closeOutputStorageWriterIfNeeded(PortIdentity()) + } +} From b215402d9e78c1b37915e97893784b316f34360d Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Tue, 5 May 2026 23:42:21 -0700 Subject: [PATCH 3/3] fix(amber): close() writer in finally + cover putOne failure paths Address review: a putOne failure mid-loop bypassed close() and leaked the underlying writer's file handles. Move close() into a finally clause; if both legs fail, attach close()'s exception as suppressed on the original. Tests: rework StubWriter to take onPutOne/onClose thunks, add a putOne-failure test (asserts close() still ran) and a both-fail test (asserts the original is captured with the close() failure as suppressed). --- .../OutputPortResultWriterThread.scala | 13 +++++- .../OutputPortResultWriterThreadSpec.scala | 46 +++++++++++++++++-- 2 files changed, 53 insertions(+), 6 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala index 2a376cb3da3..4223d920da5 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala @@ -54,9 +54,20 @@ class OutputPortResultWriterThread( case Right(_) => internalStop = true } } - bufferedItemWriter.close() } catch { case NonFatal(e) => failure = Some(e) + } finally { + // close() runs even when the loop threw, so a putOne failure does + // not leak the underlying writer's file handles. If both legs fail, + // attach close()'s exception as suppressed on the original. + try bufferedItemWriter.close() + catch { + case NonFatal(e) => + failure match { + case Some(orig) => orig.addSuppressed(e) + case None => failure = Some(e) + } + } } } } diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThreadSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThreadSpec.scala index 3fb459ac15f..31d8c41611d 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThreadSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThreadSpec.scala @@ -34,18 +34,23 @@ import scala.collection.mutable class OutputPortResultWriterThreadSpec extends AnyFlatSpec { - private class StubWriter(throwOnClose: Boolean = false) extends BufferedItemWriter[Tuple] { + private class StubWriter( + onPutOne: () => Unit = () => (), + onClose: () => Unit = () => () + ) extends BufferedItemWriter[Tuple] { val bufferSize: Int = 1024 var closeCalled = false def open(): Unit = () - def putOne(item: Tuple): Unit = () + def putOne(item: Tuple): Unit = onPutOne() def removeOne(item: Tuple): Unit = () def close(): Unit = { closeCalled = true - if (throwOnClose) throw new RuntimeException("test close failure") + onClose() } } + private def throwing(msg: String): () => Unit = () => throw new RuntimeException(msg) + "OutputPortResultWriterThread" should "leave getFailure empty on a clean run" in { val writer = new StubWriter() val thread = new OutputPortResultWriterThread(writer) @@ -57,7 +62,7 @@ class OutputPortResultWriterThreadSpec extends AnyFlatSpec { } it should "capture a close() exception in getFailure so the worker can re-throw" in { - val writer = new StubWriter(throwOnClose = true) + val writer = new StubWriter(onClose = throwing("test close failure")) val thread = new OutputPortResultWriterThread(writer) thread.start() thread.queue.put(Right(PortStorageWriterTerminateSignal)) @@ -66,6 +71,37 @@ class OutputPortResultWriterThreadSpec extends AnyFlatSpec { assert(writer.closeCalled) } + it should "capture a putOne exception and still call close()" in { + val writer = new StubWriter(onPutOne = throwing("test putOne failure")) + val thread = new OutputPortResultWriterThread(writer) + thread.start() + thread.queue.put(Left(null.asInstanceOf[Tuple])) + thread.queue.put(Right(PortStorageWriterTerminateSignal)) + thread.join() + assert(thread.getFailure.exists(_.getMessage.contains("test putOne failure"))) + // The finally clause must run close() even after putOne threw, or + // the underlying writer leaks file handles. + assert(writer.closeCalled) + } + + it should "preserve both errors when putOne and close() fail in the same run" in { + val writer = new StubWriter( + onPutOne = throwing("test putOne failure"), + onClose = throwing("test close failure") + ) + val thread = new OutputPortResultWriterThread(writer) + thread.start() + thread.queue.put(Left(null.asInstanceOf[Tuple])) + thread.queue.put(Right(PortStorageWriterTerminateSignal)) + thread.join() + val captured = thread.getFailure.getOrElse(fail("expected putOne failure")) + assert(captured.getMessage.contains("test putOne failure")) + assert( + captured.getSuppressed.exists(_.getMessage.contains("test close failure")), + "close() failure should be attached as suppressed on the original putOne failure" + ) + } + // Reach into OutputManager's private outputPortResultWriterThreads map to // install a writer thread whose close() has already failed. This pins the // contract that closeOutputStorageWriterIfNeeded re-throws the captured @@ -93,7 +129,7 @@ class OutputPortResultWriterThreadSpec extends AnyFlatSpec { new NetworkOutputGateway(identifier, (_: WorkflowFIFOMessage) => ()) ) val portId = PortIdentity() - val failingWriter = new StubWriter(throwOnClose = true) + val failingWriter = new StubWriter(onClose = throwing("test close failure")) val failingThread = new OutputPortResultWriterThread(failingWriter) failingThread.start() installWriterThread(outputManager, portId, failingThread)