From 8355bad98c38df4e9837995e040d1f25555fae58 Mon Sep 17 00:00:00 2001 From: yunyad Date: Sat, 7 Mar 2026 13:00:44 -0800 Subject: [PATCH 1/7] proto --- .../amber/engine/architecture/rpc/controlreturns.proto | 7 ++++++- .../amber/engine/architecture/rpc/workerservice.proto | 1 + .../amber/engine/architecture/worker/statistics.proto | 1 + 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto index 43613b5cfdc..05c7906bf8b 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto @@ -43,6 +43,7 @@ message ControlReturn { WorkerStateResponse workerStateResponse = 50; WorkerMetricsResponse workerMetricsResponse = 51; FinalizeCheckpointResponse finalizeCheckpointResponse = 52; + FlowControlUsageResponse flowControlUsageResponse = 53; // common responses ControlError controlError = 101; @@ -138,4 +139,8 @@ message WorkerStateResponse { message WorkerMetricsResponse { worker.WorkerMetrics metrics = 1 [(scalapb.field).no_box = true]; -} \ No newline at end of file +} + +message FlowControlUsageResponse { + map channel_usage_bytes = 1; +} diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto index dbcd6d8a5e0..6eaf7e27c95 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto @@ -39,6 +39,7 @@ service WorkerService { rpc OpenExecutor(EmptyRequest) returns (EmptyReturn); rpc PauseWorker(EmptyRequest) returns (WorkerStateResponse); rpc PrepareCheckpoint(PrepareCheckpointRequest) returns (EmptyReturn); + rpc QueryFlowControlUsage(EmptyRequest) returns (FlowControlUsageResponse); rpc QueryStatistics(EmptyRequest) returns (WorkerMetricsResponse); rpc ResumeWorker(EmptyRequest) returns (WorkerStateResponse); rpc RetrieveState(EmptyRequest) returns (EmptyReturn); diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/worker/statistics.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/worker/statistics.proto index 85d1fcf4aaa..679ca62b2de 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/worker/statistics.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/worker/statistics.proto @@ -55,6 +55,7 @@ message WorkerStatistics { int64 data_processing_time = 3; int64 control_processing_time = 4; int64 idle_time = 5; + map channel_usage_bytes = 6; } message WorkerMetrics { From 470b486391afdc1de3d3fdf4ae932b9852b9a241 Mon Sep 17 00:00:00 2001 From: yunyad Date: Sat, 7 Mar 2026 13:01:53 -0800 Subject: [PATCH 2/7] proto --- .../amber/engine/common/executionruntimestate.proto | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/common/executionruntimestate.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/common/executionruntimestate.proto index e712b3adc8a..310eefb5bfe 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/common/executionruntimestate.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/common/executionruntimestate.proto @@ -84,11 +84,20 @@ message OperatorMetrics{ OperatorStatistics operator_statistics = 2 [(scalapb.field).no_box = true]; } +message EdgeStatistics{ + string from_op_id = 1; + int32 from_port_id = 2; + string to_op_id = 3; + int32 to_port_id = 4; + int64 usage_bytes = 5; +} + message ExecutionStatsStore { int64 startTimeStamp = 1; int64 endTimeStamp = 2; map operator_info = 3; repeated OperatorWorkerMapping operator_worker_mapping = 4; + repeated EdgeStatistics edge_info = 5; } From 113d81894869fd2634030e23281a579db6c6deeb Mon Sep 17 00:00:00 2001 From: yunyad Date: Sat, 7 Mar 2026 13:05:55 -0800 Subject: [PATCH 3/7] basic --- .../engine/architecture/controller/ClientEvent.scala | 7 +++++-- .../engine/architecture/controller/Controller.scala | 3 ++- .../controller/promisehandlers/PauseHandler.scala | 5 +++-- .../controller/promisehandlers/ResumeHandler.scala | 9 ++++----- .../promisehandlers/WorkerStateUpdatedHandler.scala | 3 ++- .../deploysemantics/layer/WorkerExecution.scala | 2 +- .../worker/DataProcessorRPCHandlerInitializer.scala | 1 + .../web/model/websocket/event/TexeraWebSocketEvent.scala | 1 + 8 files changed, 19 insertions(+), 12 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala index 1092af15e77..2c8ff1d28b3 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala @@ -23,13 +23,16 @@ import org.apache.texera.amber.core.tuple.Tuple import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState import org.apache.texera.amber.engine.common.ambermessage.WorkflowFIFOMessagePayload -import org.apache.texera.amber.engine.common.executionruntimestate.OperatorMetrics +import org.apache.texera.amber.engine.common.executionruntimestate.{EdgeStatistics, OperatorMetrics} trait ClientEvent extends WorkflowFIFOMessagePayload case class ExecutionStateUpdate(state: WorkflowAggregatedState) extends ClientEvent -case class ExecutionStatsUpdate(operatorMetrics: Map[String, OperatorMetrics]) extends ClientEvent +case class ExecutionStatsUpdate( + operatorMetrics: Map[String, OperatorMetrics], + edgeStatistics: Seq[EdgeStatistics] = Seq.empty +) extends ClientEvent case class RuntimeStatisticsPersist(operatorMetrics: Map[String, OperatorMetrics]) extends ClientEvent diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala index b0e4f3fdc32..cbe88c132f6 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala @@ -244,7 +244,8 @@ class Controller( outputMessages.foreach(transferService.send) cp.asyncRPCClient.sendToClient( ExecutionStatsUpdate( - cp.workflowExecution.getAllRegionExecutionsStats + cp.workflowExecution.getAllRegionExecutionsStats, + cp.workflowExecution.getAllRegionEdgeStatistics ) ) globalReplayManager.markRecoveryStatus(CONTROLLER, isRecovering = false) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PauseHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PauseHandler.scala index 35a85f56ae9..3516fdb9a06 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PauseHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PauseHandler.scala @@ -85,10 +85,11 @@ trait PauseHandler { .map { _ => // update frontend workflow status and persist statistics val stats = cp.workflowExecution.getAllRegionExecutionsStats - sendToClient(ExecutionStatsUpdate(stats)) + val edgeStats = cp.workflowExecution.getAllRegionEdgeStatistics + sendToClient(ExecutionStatsUpdate(stats, edgeStats)) sendToClient(RuntimeStatisticsPersist(stats)) sendToClient(ExecutionStateUpdate(cp.workflowExecution.getState)) - logger.info(s"workflow paused") + logger.info("workflow paused") } EmptyReturn() } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala index c94ba91c205..850f804be31 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala @@ -60,12 +60,11 @@ trait ResumeHandler { .map { _ => // update frontend status and persist statistics val stats = cp.workflowExecution.getAllRegionExecutionsStats - sendToClient(ExecutionStatsUpdate(stats)) + val edgeStats = cp.workflowExecution.getAllRegionEdgeStatistics + sendToClient(ExecutionStatsUpdate(stats, edgeStats)) sendToClient(RuntimeStatisticsPersist(stats)) - cp.controllerTimerService - .enableStatusUpdate() //re-enabled it since it is disabled in pause - cp.controllerTimerService - .enableRuntimeStatisticsCollection() //re-enabled it since it is disabled in pause + cp.controllerTimerService.enableStatusUpdate() + cp.controllerTimerService.enableRuntimeStatisticsCollection() EmptyReturn() } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerStateUpdatedHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerStateUpdatedHandler.scala index 5ee98a4918d..addcdf30a0b 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerStateUpdatedHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerStateUpdatedHandler.scala @@ -52,7 +52,8 @@ trait WorkerStateUpdatedHandler { operatorExecution.getWorkerExecution(ctx.sender).update(System.nanoTime(), msg.state) ) val stats = cp.workflowExecution.getAllRegionExecutionsStats - sendToClient(ExecutionStatsUpdate(stats)) + val edgeStats = cp.workflowExecution.getAllRegionEdgeStatistics + sendToClient(ExecutionStatsUpdate(stats, edgeStats)) sendToClient(RuntimeStatisticsPersist(stats)) EmptyReturn() } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/deploysemantics/layer/WorkerExecution.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/deploysemantics/layer/WorkerExecution.scala index 55e1e309181..a78bdb948bd 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/deploysemantics/layer/WorkerExecution.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/deploysemantics/layer/WorkerExecution.scala @@ -35,7 +35,7 @@ case class WorkerExecution() extends Serializable { private var state: WorkerState = UNINITIALIZED private var stats: WorkerStatistics = { - WorkerStatistics(Seq.empty, Seq.empty, 0, 0, 0) + WorkerStatistics(Seq.empty, Seq.empty, 0, 0, 0, Map.empty) } private var lastUpdateTimeStamp = 0L diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala index 2abcdf66975..2fd4b2e228e 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala @@ -41,6 +41,7 @@ class DataProcessorRPCHandlerInitializer(val dp: DataProcessor) with OpenExecutorHandler with PauseHandler with AddPartitioningHandler + with QueryFlowControlUsageHandler with QueryStatisticsHandler with ResumeHandler with StartHandler diff --git a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala index da072c80ea5..f408a5f00bb 100644 --- a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala +++ b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala @@ -32,6 +32,7 @@ import org.apache.texera.web.model.websocket.response.{HeartBeatResponse, Modify new Type(value = classOf[WorkflowErrorEvent]), new Type(value = classOf[WorkflowStateEvent]), new Type(value = classOf[OperatorStatisticsUpdateEvent]), + new Type(value = classOf[EdgeStatisticsUpdateEvent]), new Type(value = classOf[WebResultUpdateEvent]), new Type(value = classOf[ConsoleUpdateEvent]), new Type(value = classOf[CacheStatusUpdateEvent]), From 244c32ef7962d51bc31ab07c3f7281edcc110964 Mon Sep 17 00:00:00 2001 From: yunyad Date: Sat, 7 Mar 2026 13:18:49 -0800 Subject: [PATCH 4/7] revert --- .../controller/promisehandlers/ResumeHandler.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala index 850f804be31..87f0d2ccc25 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala @@ -63,8 +63,10 @@ trait ResumeHandler { val edgeStats = cp.workflowExecution.getAllRegionEdgeStatistics sendToClient(ExecutionStatsUpdate(stats, edgeStats)) sendToClient(RuntimeStatisticsPersist(stats)) - cp.controllerTimerService.enableStatusUpdate() - cp.controllerTimerService.enableRuntimeStatisticsCollection() + cp.controllerTimerService + .enableStatusUpdate() //re-enabled it since it is disabled in pause + cp.controllerTimerService + .enableRuntimeStatisticsCollection() //re-enabled it since it is disabled in pause EmptyReturn() } } From 3649f6f684ad11e7c09926d78cb6be7bbae3008e Mon Sep 17 00:00:00 2001 From: yunyad Date: Sat, 7 Mar 2026 13:20:43 -0800 Subject: [PATCH 5/7] frontend --- .../types/workflow-websocket.interface.ts | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/frontend/src/app/workspace/types/workflow-websocket.interface.ts b/frontend/src/app/workspace/types/workflow-websocket.interface.ts index afd5ea6f04a..9bcb01344e1 100644 --- a/frontend/src/app/workspace/types/workflow-websocket.interface.ts +++ b/frontend/src/app/workspace/types/workflow-websocket.interface.ts @@ -185,6 +185,19 @@ export type RegionStateEvent = Readonly<{ state: string; }>; +export type EdgeStatistics = Readonly<{ + fromOpId: string; + fromPortId: number; + toOpId: string; + toPortId: number; + usageBytes: number; +}>; + +export type EdgeStatisticsUpdateEvent = Readonly<{ + edgeStatistics: ReadonlyArray; + maxCreditAllowedInBytesPerChannel: number; +}>; + export type ModifyLogicResponse = Readonly<{ opId: string; isValid: boolean; @@ -243,6 +256,7 @@ export type TexeraWebsocketEventTypeMap = { ClusterStatusUpdateEvent: ClusterStatusUpdateEvent; RegionUpdateEvent: RegionUpdateEvent; RegionStateEvent: RegionStateEvent; + EdgeStatisticsUpdateEvent: EdgeStatisticsUpdateEvent; }; // helper type definitions to generate the request and event types From dfefb48e06263a19d197179f81b4f79de8d0043f Mon Sep 17 00:00:00 2001 From: yunyad Date: Sat, 7 Mar 2026 13:23:35 -0800 Subject: [PATCH 6/7] stats manager --- .../core/architecture/managers/statistics_manager.py | 9 +++++++-- .../architecture/worker/managers/StatisticsManager.scala | 8 ++++++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/amber/src/main/python/core/architecture/managers/statistics_manager.py b/amber/src/main/python/core/architecture/managers/statistics_manager.py index 6b36b78e577..b59b6cf4fda 100644 --- a/amber/src/main/python/core/architecture/managers/statistics_manager.py +++ b/amber/src/main/python/core/architecture/managers/statistics_manager.py @@ -16,7 +16,7 @@ # under the License. from collections import defaultdict -from typing import DefaultDict +from typing import DefaultDict, Optional from proto.org.apache.texera.amber.core import PortIdentity from proto.org.apache.texera.amber.engine.architecture.worker import ( @@ -40,7 +40,11 @@ def __init__(self) -> None: self._total_execution_time: int = 0 self._worker_start_time: int = 0 - def get_statistics(self) -> WorkerStatistics: + def get_statistics( + self, channel_usage_bytes: Optional[dict[str, int]] = None + ) -> WorkerStatistics: + if channel_usage_bytes is None: + channel_usage_bytes = {} # Compile and return worker statistics return WorkerStatistics( [ @@ -56,6 +60,7 @@ def get_statistics(self) -> WorkerStatistics: self._total_execution_time - self._data_processing_time - self._control_processing_time, + channel_usage_bytes, ) def increase_input_statistics(self, port_id: PortIdentity, size: int) -> None: diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala index 8ae0419f0a3..16f173d33df 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala @@ -47,7 +47,10 @@ class StatisticsManager { * @param operator the operator executor * @return a WorkerStatistics object containing the statistics */ - def getStatistics(operator: OperatorExecutor): WorkerStatistics = { + def getStatistics( + operator: OperatorExecutor, + channelUsageBytes: Map[String, Long] + ): WorkerStatistics = { WorkerStatistics( inputStatistics.map { case (portId, (tupleCount, tupleSize)) => @@ -59,7 +62,8 @@ class StatisticsManager { }.toSeq, dataProcessingTime, controlProcessingTime, - totalExecutionTime - dataProcessingTime - controlProcessingTime + totalExecutionTime - dataProcessingTime - controlProcessingTime, + channelUsageBytes ) } From 469a1b3e777455bc334debc67723858659296cf4 Mon Sep 17 00:00:00 2001 From: yunyad Date: Sat, 7 Mar 2026 13:30:05 -0800 Subject: [PATCH 7/7] update details --- .../engine/architecture/messaginglayer/FlowControl.scala | 4 ++++ .../amber/engine/architecture/worker/WorkflowWorker.scala | 1 + 2 files changed, 5 insertions(+) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/FlowControl.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/FlowControl.scala index d4b24dad1d8..9a4912a9c8a 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/FlowControl.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/FlowControl.scala @@ -119,4 +119,8 @@ class FlowControl { def getCredit: Long = { maxByteAllowed - inflightCredit - queuedCredit } + + def getUsedBytes: Long = { + inflightCredit + queuedCredit + } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/WorkflowWorker.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/WorkflowWorker.scala index d1a0a300d93..bc4ceb4cf3e 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/WorkflowWorker.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/WorkflowWorker.scala @@ -172,6 +172,7 @@ class WorkflowWorker( logger.info("output messages restored.") dp = dpState // overwrite dp state dp.outputHandler = logManager.sendCommitted + dp.setChannelUsageBytesProvider(() => transferService.getChannelUsageBytes) dp.initTimerService(timerService) logger.info("start re-initialize executor from checkpoint.") val (executor, iter) = dp.serializationManager.restoreExecutorState(chkpt)