diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala index 4ab3d18056f..affbd786f9b 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -235,6 +235,11 @@ class OutputManager( /** * Singal the port storage writer to flush the remaining buffer and wait for commits to finish so that * the output port is properly completed. If the output port does not need storage, no action will be done. + * + * If the writer thread captured a failure (e.g., iceberg commit retries + * exhausted), re-throw it here so the DP thread surfaces a FatalError + * to the controller via pekko's supervisor strategy. Otherwise the worker + * would announce port completion as if the result was durably written. */ def closeOutputStorageWriterIfNeeded(outputPortId: PortIdentity): Unit = { this.outputPortResultWriterThreads.get(outputPortId) match { @@ -243,6 +248,7 @@ class OutputManager( writerThread.queue.put(Right(PortStorageWriterTerminateSignal)) // Blocking call writerThread.join() + writerThread.getFailure.foreach(throw _) case None => } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala index 28e5d2af667..4223d920da5 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala @@ -24,6 +24,7 @@ import org.apache.texera.amber.core.storage.model.BufferedItemWriter import org.apache.texera.amber.core.tuple.Tuple import java.util.concurrent.LinkedBlockingQueue +import scala.util.control.NonFatal sealed trait TerminateSignal case object PortStorageWriterTerminateSignal extends TerminateSignal @@ -35,15 +36,38 @@ class OutputPortResultWriterThread( val queue: LinkedBlockingQueue[Either[Tuple, TerminateSignal]] = Queues.newLinkedBlockingQueue[Either[Tuple, TerminateSignal]]() + // Captured failure from put-one or close() so the worker DP thread can + // re-throw and let the controller's pekko supervisor surface a FatalError + // to the client. Without this, the writer thread dies silently and the + // worker keeps reporting normal port completion to the controller while + // results are missing or stale, leading to e2e timeouts that hide the + // real cause. + @volatile private var failure: Option[Throwable] = None + def getFailure: Option[Throwable] = failure + override def run(): Unit = { - var internalStop = false - while (!internalStop) { - val queueContent = queue.take() - queueContent match { - case Left(tuple) => bufferedItemWriter.putOne(tuple) - case Right(_) => internalStop = true + try { + var internalStop = false + while (!internalStop) { + queue.take() match { + case Left(tuple) => bufferedItemWriter.putOne(tuple) + case Right(_) => internalStop = true + } + } + } catch { + case NonFatal(e) => failure = Some(e) + } finally { + // close() runs even when the loop threw, so a putOne failure does + // not leak the underlying writer's file handles. If both legs fail, + // attach close()'s exception as suppressed on the original. + try bufferedItemWriter.close() + catch { + case NonFatal(e) => + failure match { + case Some(orig) => orig.addSuppressed(e) + case None => failure = Some(e) + } } } - bufferedItemWriter.close() } } diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThreadSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThreadSpec.scala new file mode 100644 index 00000000000..31d8c41611d --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThreadSpec.scala @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.worker.managers + +import org.apache.texera.amber.core.storage.model.BufferedItemWriter +import org.apache.texera.amber.core.tuple.Tuple +import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity +import org.apache.texera.amber.core.workflow.PortIdentity +import org.apache.texera.amber.engine.architecture.messaginglayer.{ + NetworkOutputGateway, + OutputManager +} +import org.apache.texera.amber.engine.common.ambermessage.WorkflowFIFOMessage +import org.scalatest.flatspec.AnyFlatSpec + +import scala.collection.mutable + +class OutputPortResultWriterThreadSpec extends AnyFlatSpec { + + private class StubWriter( + onPutOne: () => Unit = () => (), + onClose: () => Unit = () => () + ) extends BufferedItemWriter[Tuple] { + val bufferSize: Int = 1024 + var closeCalled = false + def open(): Unit = () + def putOne(item: Tuple): Unit = onPutOne() + def removeOne(item: Tuple): Unit = () + def close(): Unit = { + closeCalled = true + onClose() + } + } + + private def throwing(msg: String): () => Unit = () => throw new RuntimeException(msg) + + "OutputPortResultWriterThread" should "leave getFailure empty on a clean run" in { + val writer = new StubWriter() + val thread = new OutputPortResultWriterThread(writer) + thread.start() + thread.queue.put(Right(PortStorageWriterTerminateSignal)) + thread.join() + assert(thread.getFailure.isEmpty) + assert(writer.closeCalled) + } + + it should "capture a close() exception in getFailure so the worker can re-throw" in { + val writer = new StubWriter(onClose = throwing("test close failure")) + val thread = new OutputPortResultWriterThread(writer) + thread.start() + thread.queue.put(Right(PortStorageWriterTerminateSignal)) + thread.join() + assert(thread.getFailure.exists(_.getMessage.contains("test close failure"))) + assert(writer.closeCalled) + } + + it should "capture a putOne exception and still call close()" in { + val writer = new StubWriter(onPutOne = throwing("test putOne failure")) + val thread = new OutputPortResultWriterThread(writer) + thread.start() + thread.queue.put(Left(null.asInstanceOf[Tuple])) + thread.queue.put(Right(PortStorageWriterTerminateSignal)) + thread.join() + assert(thread.getFailure.exists(_.getMessage.contains("test putOne failure"))) + // The finally clause must run close() even after putOne threw, or + // the underlying writer leaks file handles. + assert(writer.closeCalled) + } + + it should "preserve both errors when putOne and close() fail in the same run" in { + val writer = new StubWriter( + onPutOne = throwing("test putOne failure"), + onClose = throwing("test close failure") + ) + val thread = new OutputPortResultWriterThread(writer) + thread.start() + thread.queue.put(Left(null.asInstanceOf[Tuple])) + thread.queue.put(Right(PortStorageWriterTerminateSignal)) + thread.join() + val captured = thread.getFailure.getOrElse(fail("expected putOne failure")) + assert(captured.getMessage.contains("test putOne failure")) + assert( + captured.getSuppressed.exists(_.getMessage.contains("test close failure")), + "close() failure should be attached as suppressed on the original putOne failure" + ) + } + + // Reach into OutputManager's private outputPortResultWriterThreads map to + // install a writer thread whose close() has already failed. This pins the + // contract that closeOutputStorageWriterIfNeeded re-throws the captured + // failure, which is the bridge from the writer thread to the DP thread → + // worker actor → controller supervisor → FatalError to client. + private def installWriterThread( + manager: OutputManager, + portId: PortIdentity, + thread: OutputPortResultWriterThread + ): Unit = { + val field = classOf[OutputManager] + .getDeclaredField("outputPortResultWriterThreads") + field.setAccessible(true) + field + .get(manager) + .asInstanceOf[mutable.HashMap[PortIdentity, OutputPortResultWriterThread]] + .put(portId, thread) + } + + "OutputManager.closeOutputStorageWriterIfNeeded" should + "re-throw the writer thread's captured failure" in { + val identifier = ActorVirtualIdentity("test-worker") + val outputManager = new OutputManager( + identifier, + new NetworkOutputGateway(identifier, (_: WorkflowFIFOMessage) => ()) + ) + val portId = PortIdentity() + val failingWriter = new StubWriter(onClose = throwing("test close failure")) + val failingThread = new OutputPortResultWriterThread(failingWriter) + failingThread.start() + installWriterThread(outputManager, portId, failingThread) + val ex = intercept[RuntimeException] { + outputManager.closeOutputStorageWriterIfNeeded(portId) + } + assert(ex.getMessage.contains("test close failure")) + } + + it should "be a no-op when the port has no writer thread" in { + val identifier = ActorVirtualIdentity("test-worker") + val outputManager = new OutputManager( + identifier, + new NetworkOutputGateway(identifier, (_: WorkflowFIFOMessage) => ()) + ) + // No installWriterThread call — the port has never had a writer. + outputManager.closeOutputStorageWriterIfNeeded(PortIdentity()) + } +}