Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ message ControlReturn {
WorkerStateResponse workerStateResponse = 50;
WorkerMetricsResponse workerMetricsResponse = 51;
FinalizeCheckpointResponse finalizeCheckpointResponse = 52;
FlowControlUsageResponse flowControlUsageResponse = 53;

// common responses
ControlError controlError = 101;
Expand Down Expand Up @@ -138,4 +139,8 @@ message WorkerStateResponse {

message WorkerMetricsResponse {
worker.WorkerMetrics metrics = 1 [(scalapb.field).no_box = true];
}
}

message FlowControlUsageResponse {
map<string, int64> channel_usage_bytes = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ service WorkerService {
rpc OpenExecutor(EmptyRequest) returns (EmptyReturn);
rpc PauseWorker(EmptyRequest) returns (WorkerStateResponse);
rpc PrepareCheckpoint(PrepareCheckpointRequest) returns (EmptyReturn);
rpc QueryFlowControlUsage(EmptyRequest) returns (FlowControlUsageResponse);
rpc QueryStatistics(EmptyRequest) returns (WorkerMetricsResponse);
rpc ResumeWorker(EmptyRequest) returns (WorkerStateResponse);
rpc RetrieveState(EmptyRequest) returns (EmptyReturn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ message WorkerStatistics {
int64 data_processing_time = 3;
int64 control_processing_time = 4;
int64 idle_time = 5;
map<string, int64> channel_usage_bytes = 6;
}

message WorkerMetrics {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,20 @@ message OperatorMetrics{
OperatorStatistics operator_statistics = 2 [(scalapb.field).no_box = true];
}

message EdgeStatistics{
string from_op_id = 1;
int32 from_port_id = 2;
string to_op_id = 3;
int32 to_port_id = 4;
int64 usage_bytes = 5;
}

message ExecutionStatsStore {
int64 startTimeStamp = 1;
int64 endTimeStamp = 2;
map<string, OperatorMetrics> operator_info = 3;
repeated OperatorWorkerMapping operator_worker_mapping = 4;
repeated EdgeStatistics edge_info = 5;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.

from collections import defaultdict
from typing import DefaultDict
from typing import DefaultDict, Optional

from proto.org.apache.texera.amber.core import PortIdentity
from proto.org.apache.texera.amber.engine.architecture.worker import (
Expand All @@ -40,7 +40,11 @@ def __init__(self) -> None:
self._total_execution_time: int = 0
self._worker_start_time: int = 0

def get_statistics(self) -> WorkerStatistics:
def get_statistics(
self, channel_usage_bytes: Optional[dict[str, int]] = None
) -> WorkerStatistics:
if channel_usage_bytes is None:
channel_usage_bytes = {}
# Compile and return worker statistics
return WorkerStatistics(
[
Expand All @@ -56,6 +60,7 @@ def get_statistics(self) -> WorkerStatistics:
self._total_execution_time
- self._data_processing_time
- self._control_processing_time,
channel_usage_bytes,
)

def increase_input_statistics(self, port_id: PortIdentity, size: int) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ import org.apache.texera.amber.core.tuple.Tuple
import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState
import org.apache.texera.amber.engine.common.ambermessage.WorkflowFIFOMessagePayload
import org.apache.texera.amber.engine.common.executionruntimestate.OperatorMetrics
import org.apache.texera.amber.engine.common.executionruntimestate.{EdgeStatistics, OperatorMetrics}

trait ClientEvent extends WorkflowFIFOMessagePayload

case class ExecutionStateUpdate(state: WorkflowAggregatedState) extends ClientEvent

case class ExecutionStatsUpdate(operatorMetrics: Map[String, OperatorMetrics]) extends ClientEvent
case class ExecutionStatsUpdate(
operatorMetrics: Map[String, OperatorMetrics],
edgeStatistics: Seq[EdgeStatistics] = Seq.empty
) extends ClientEvent

case class RuntimeStatisticsPersist(operatorMetrics: Map[String, OperatorMetrics])
extends ClientEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ class Controller(
outputMessages.foreach(transferService.send)
cp.asyncRPCClient.sendToClient(
ExecutionStatsUpdate(
cp.workflowExecution.getAllRegionExecutionsStats
cp.workflowExecution.getAllRegionExecutionsStats,
cp.workflowExecution.getAllRegionEdgeStatistics
)
)
globalReplayManager.markRecoveryStatus(CONTROLLER, isRecovering = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ trait PauseHandler {
.map { _ =>
// update frontend workflow status and persist statistics
val stats = cp.workflowExecution.getAllRegionExecutionsStats
sendToClient(ExecutionStatsUpdate(stats))
val edgeStats = cp.workflowExecution.getAllRegionEdgeStatistics
sendToClient(ExecutionStatsUpdate(stats, edgeStats))
sendToClient(RuntimeStatisticsPersist(stats))
sendToClient(ExecutionStateUpdate(cp.workflowExecution.getState))
logger.info(s"workflow paused")
logger.info("workflow paused")
}
EmptyReturn()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ trait ResumeHandler {
.map { _ =>
// update frontend status and persist statistics
val stats = cp.workflowExecution.getAllRegionExecutionsStats
sendToClient(ExecutionStatsUpdate(stats))
val edgeStats = cp.workflowExecution.getAllRegionEdgeStatistics
sendToClient(ExecutionStatsUpdate(stats, edgeStats))
sendToClient(RuntimeStatisticsPersist(stats))
cp.controllerTimerService
.enableStatusUpdate() //re-enabled it since it is disabled in pause
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ trait WorkerStateUpdatedHandler {
operatorExecution.getWorkerExecution(ctx.sender).update(System.nanoTime(), msg.state)
)
val stats = cp.workflowExecution.getAllRegionExecutionsStats
sendToClient(ExecutionStatsUpdate(stats))
val edgeStats = cp.workflowExecution.getAllRegionEdgeStatistics
sendToClient(ExecutionStatsUpdate(stats, edgeStats))
sendToClient(RuntimeStatisticsPersist(stats))
EmptyReturn()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ case class WorkerExecution() extends Serializable {

private var state: WorkerState = UNINITIALIZED
private var stats: WorkerStatistics = {
WorkerStatistics(Seq.empty, Seq.empty, 0, 0, 0)
WorkerStatistics(Seq.empty, Seq.empty, 0, 0, 0, Map.empty)
}
private var lastUpdateTimeStamp = 0L

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,8 @@ class FlowControl {
def getCredit: Long = {
maxByteAllowed - inflightCredit - queuedCredit
}

def getUsedBytes: Long = {
inflightCredit + queuedCredit
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class DataProcessorRPCHandlerInitializer(val dp: DataProcessor)
with OpenExecutorHandler
with PauseHandler
with AddPartitioningHandler
with QueryFlowControlUsageHandler
with QueryStatisticsHandler
with ResumeHandler
with StartHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ class WorkflowWorker(
logger.info("output messages restored.")
dp = dpState // overwrite dp state
dp.outputHandler = logManager.sendCommitted
dp.setChannelUsageBytesProvider(() => transferService.getChannelUsageBytes)
dp.initTimerService(timerService)
logger.info("start re-initialize executor from checkpoint.")
val (executor, iter) = dp.serializationManager.restoreExecutorState(chkpt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ class StatisticsManager {
* @param operator the operator executor
* @return a WorkerStatistics object containing the statistics
*/
def getStatistics(operator: OperatorExecutor): WorkerStatistics = {
def getStatistics(
operator: OperatorExecutor,
channelUsageBytes: Map[String, Long]
): WorkerStatistics = {
WorkerStatistics(
inputStatistics.map {
case (portId, (tupleCount, tupleSize)) =>
Expand All @@ -59,7 +62,8 @@ class StatisticsManager {
}.toSeq,
dataProcessingTime,
controlProcessingTime,
totalExecutionTime - dataProcessingTime - controlProcessingTime
totalExecutionTime - dataProcessingTime - controlProcessingTime,
channelUsageBytes
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.texera.web.model.websocket.response.{HeartBeatResponse, Modify
new Type(value = classOf[WorkflowErrorEvent]),
new Type(value = classOf[WorkflowStateEvent]),
new Type(value = classOf[OperatorStatisticsUpdateEvent]),
new Type(value = classOf[EdgeStatisticsUpdateEvent]),
new Type(value = classOf[WebResultUpdateEvent]),
new Type(value = classOf[ConsoleUpdateEvent]),
new Type(value = classOf[CacheStatusUpdateEvent]),
Expand Down
14 changes: 14 additions & 0 deletions frontend/src/app/workspace/types/workflow-websocket.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,19 @@ export type RegionStateEvent = Readonly<{
state: string;
}>;

export type EdgeStatistics = Readonly<{
fromOpId: string;
fromPortId: number;
toOpId: string;
toPortId: number;
usageBytes: number;
}>;

export type EdgeStatisticsUpdateEvent = Readonly<{
edgeStatistics: ReadonlyArray<EdgeStatistics>;
maxCreditAllowedInBytesPerChannel: number;
}>;

export type ModifyLogicResponse = Readonly<{
opId: string;
isValid: boolean;
Expand Down Expand Up @@ -243,6 +256,7 @@ export type TexeraWebsocketEventTypeMap = {
ClusterStatusUpdateEvent: ClusterStatusUpdateEvent;
RegionUpdateEvent: RegionUpdateEvent;
RegionStateEvent: RegionStateEvent;
EdgeStatisticsUpdateEvent: EdgeStatisticsUpdateEvent;
};

// helper type definitions to generate the request and event types
Expand Down
Loading