Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object ExecutorDeployment {

def createWorkers(
op: PhysicalOp,
controllerActorService: AkkaActorService,
controllerActorService: PekkoActorService,
operatorExecution: OperatorExecution,
operatorConfig: OperatorConfig,
stateRestoreConfig: Option[StateRestoreConfig],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.texera.amber.util.VirtualIdentityUtils

import scala.collection.mutable

class AkkaActorRefMappingService(actorService: AkkaActorService) extends AmberLogging {
class PekkoActorRefMappingService(actorService: PekkoActorService) extends AmberLogging {

override def actorId: ActorVirtualIdentity = actorService.id

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.texera.amber.engine.common.FutureBijection._
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.{DurationInt, FiniteDuration}

class AkkaActorService(val id: ActorVirtualIdentity, actorContext: ActorContext) {
class PekkoActorService(val id: ActorVirtualIdentity, actorContext: ActorContext) {

implicit def ec: ExecutionContext = actorContext.dispatcher

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ import org.apache.texera.amber.engine.common.ambermessage.WorkflowFIFOMessage
import scala.collection.mutable
import scala.concurrent.duration.DurationInt

class AkkaMessageTransferService(
actorService: AkkaActorService,
refService: AkkaActorRefMappingService,
class PekkoMessageTransferService(
actorService: PekkoActorService,
refService: PekkoActorRefMappingService,
handleBackpressure: Boolean => Unit
) extends AmberLogging {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ abstract class WorkflowActor(
//
// Akka related components:
//
val actorService: AkkaActorService = new AkkaActorService(actorId, this.context)
val actorService: PekkoActorService = new PekkoActorService(actorId, this.context)
actorService.getAvailableNodeAddressesFunc = () => {
implicit val timeout: Timeout = 5.seconds
Await
Expand All @@ -95,12 +95,12 @@ abstract class WorkflowActor(
)
.asInstanceOf[Array[Address]]
}
val actorRefMappingService: AkkaActorRefMappingService = new AkkaActorRefMappingService(
val actorRefMappingService: PekkoActorRefMappingService = new PekkoActorRefMappingService(
actorService
)
actorRefMappingService.registerActorRef(actorId, self)
val transferService: AkkaMessageTransferService =
new AkkaMessageTransferService(actorService, actorRefMappingService, handleBackpressure)
val transferService: PekkoMessageTransferService =
new PekkoMessageTransferService(actorService, actorRefMappingService, handleBackpressure)

logger.info(s"worker replay log writing conf: $replayLogConfOpt")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ package org.apache.texera.amber.engine.architecture.controller
import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
import org.apache.texera.amber.core.workflow.WorkflowContext
import org.apache.texera.amber.engine.architecture.common.{
AkkaActorRefMappingService,
AkkaActorService,
AkkaMessageTransferService,
PekkoActorRefMappingService,
PekkoActorService,
PekkoMessageTransferService,
AmberProcessor
}
import org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
Expand Down Expand Up @@ -57,21 +57,21 @@ class ControllerProcessor(
this.controllerTimerService = controllerTimerService
}

@transient var transferService: AkkaMessageTransferService = _
@transient var transferService: PekkoMessageTransferService = _

def setupTransferService(transferService: AkkaMessageTransferService): Unit = {
def setupTransferService(transferService: PekkoMessageTransferService): Unit = {
this.transferService = transferService
}

@transient var actorService: AkkaActorService = _
@transient var actorService: PekkoActorService = _

def setupActorService(akkaActorService: AkkaActorService): Unit = {
this.actorService = akkaActorService
def setupActorService(pekkoActorService: PekkoActorService): Unit = {
this.actorService = pekkoActorService
}

@transient var actorRefService: AkkaActorRefMappingService = _
@transient var actorRefService: PekkoActorRefMappingService = _

def setupActorRefService(actorRefService: AkkaActorRefMappingService): Unit = {
def setupActorRefService(actorRefService: PekkoActorRefMappingService): Unit = {
this.actorRefService = actorRefService
this.workflowExecutionCoordinator.setupActorRefService(this.actorRefService)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.apache.texera.amber.engine.architecture.controller

import org.apache.pekko.actor.Cancellable
import org.apache.texera.amber.engine.architecture.common.AkkaActorService
import org.apache.texera.amber.engine.architecture.common.PekkoActorService
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
AsyncRPCContext,
QueryStatisticsRequest,
Expand All @@ -34,7 +34,7 @@ import scala.concurrent.duration.{DurationInt, FiniteDuration, MILLISECONDS}

class ControllerTimerService(
controllerConfig: ControllerConfig,
akkaActorService: AkkaActorService
pekkoActorService: PekkoActorService
) {
var statusUpdateAskHandle: Option[Cancellable] = None
var runtimeStatisticsAskHandle: Option[Cancellable] = None
Expand All @@ -46,7 +46,7 @@ class ControllerTimerService(
): Option[Cancellable] = {
if (intervalMs.nonEmpty && handleOpt.isEmpty) {
Option(
akkaActorService.sendToSelfWithFixedDelay(
pekkoActorService.sendToSelfWithFixedDelay(
0.milliseconds,
FiniteDuration.apply(intervalMs.get, MILLISECONDS),
ControlInvocation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.texera.amber.engine.architecture.messaginglayer

import org.apache.pekko.actor.Cancellable
import org.apache.texera.amber.config.ApplicationConfig
import org.apache.texera.amber.engine.architecture.common.AkkaActorService
import org.apache.texera.amber.engine.architecture.common.PekkoActorService
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
AsyncRPCContext,
EmptyRequest
Expand All @@ -33,7 +33,7 @@ import org.apache.texera.amber.engine.common.virtualidentity.util.SELF

import scala.concurrent.duration.{DurationInt, FiniteDuration, MILLISECONDS}

class WorkerTimerService(actorService: AkkaActorService) {
class WorkerTimerService(actorService: PekkoActorService) {

private val enabledAdaptiveBatching = ApplicationConfig.enableAdaptiveNetworkBuffering
private val adaptiveBatchInterval = ApplicationConfig.adaptiveBufferingTimeoutMs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import org.apache.texera.amber.core.storage.VFSURIFactory.decodeURI
import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PhysicalLink, PhysicalOp}
import org.apache.texera.amber.engine.architecture.common.{
AkkaActorRefMappingService,
AkkaActorService,
PekkoActorRefMappingService,
PekkoActorService,
ExecutorDeployment
}
import org.apache.texera.amber.engine.architecture.controller.execution.{
Expand Down Expand Up @@ -95,8 +95,8 @@ class RegionExecutionCoordinator(
workflowExecution: WorkflowExecution,
asyncRPCClient: AsyncRPCClient,
controllerConfig: ControllerConfig,
actorService: AkkaActorService,
actorRefService: AkkaActorRefMappingService
actorService: PekkoActorService,
actorRefService: PekkoActorRefMappingService
) extends AmberLogging {

initRegionExecution()
Expand Down Expand Up @@ -374,7 +374,7 @@ class RegionExecutionCoordinator(
}

private def buildOperator(
actorService: AkkaActorService,
actorService: PekkoActorService,
physicalOp: PhysicalOp,
operatorConfig: OperatorConfig,
operatorExecution: OperatorExecution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import com.twitter.util.Future
import com.typesafe.scalalogging.LazyLogging
import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PhysicalLink}
import org.apache.texera.amber.engine.architecture.common.{
AkkaActorRefMappingService,
AkkaActorService
PekkoActorRefMappingService,
PekkoActorService
}
import org.apache.texera.amber.engine.architecture.controller.ControllerConfig
import org.apache.texera.amber.engine.architecture.controller.ExecutionStateUpdate
Expand All @@ -49,9 +49,9 @@ class WorkflowExecutionCoordinator(
mutable.HashMap()
private val completionNotified: AtomicBoolean = new AtomicBoolean(false)

@transient var actorRefService: AkkaActorRefMappingService = _
@transient var actorRefService: PekkoActorRefMappingService = _

def setupActorRefService(actorRefService: AkkaActorRefMappingService): Unit = {
def setupActorRefService(actorRefService: PekkoActorRefMappingService): Unit = {
this.actorRefService = actorRefService
}

Expand All @@ -62,7 +62,7 @@ class WorkflowExecutionCoordinator(
*
* After the syncs, if there are no running region(s), it will start new regions (if available).
*/
def coordinateRegionExecutors(actorService: AkkaActorService): Future[Unit] = {
def coordinateRegionExecutors(actorService: PekkoActorService): Future[Unit] = {
val unfinishedRegionCoordinators =
regionExecutionCoordinators.values.filter(!_.isCompleted).toSeq

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.pekko.actor.{ActorSystem, Address, Cancellable, DeadLetter, Pr
import org.apache.pekko.serialization.{Serialization, SerializationExtension}
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.texera.amber.clustering.ClusterListener
import org.apache.texera.amber.config.AkkaConfig
import org.apache.texera.amber.config.PekkoConfig
import org.apache.texera.amber.engine.architecture.messaginglayer.DeadLetterMonitorActor

import java.io.{BufferedReader, InputStreamReader}
Expand All @@ -39,7 +39,7 @@ object AmberRuntime {
def serde: Serialization = {
if (_serde == null) {
if (_actorSystem == null) {
_serde = SerializationExtension(ActorSystem("Amber", akkaConfig))
_serde = SerializationExtension(ActorSystem("Amber", pekkoConfig))
} else {
_serde = SerializationExtension(_actorSystem)
}
Expand Down Expand Up @@ -83,13 +83,13 @@ object AmberRuntime {
pekko.remote.artery.canonical.hostname = $localIpAddress
pekko.cluster.seed-nodes = [ "pekko://Amber@$localIpAddress:2552" ]
""")
.withFallback(akkaConfig)
.withFallback(pekkoConfig)
.resolve()
AmberConfig.masterNodeAddr = createMasterAddress(localIpAddress)
createAmberSystem(masterConfig)
}

def akkaConfig: Config = AkkaConfig.akkaConfig
def pekkoConfig: Config = PekkoConfig.pekkoConfig

private def createMasterAddress(addr: String): Address = Address("pekko", "Amber", addr, 2552)

Expand All @@ -105,7 +105,7 @@ object AmberRuntime {
pekko.remote.artery.canonical.port = 0
pekko.cluster.seed-nodes = [ "pekko://Amber@$addr:2552" ]
""")
.withFallback(akkaConfig)
.withFallback(pekkoConfig)
.resolve()
AmberConfig.masterNodeAddr = createMasterAddress(addr)
createAmberSystem(workerConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import scala.concurrent.duration._
*/
@IntegrationTest
class ReconfigurationIntegrationSpec
extends TestKit(ActorSystem("ReconfigurationIntegrationSpec", AmberRuntime.akkaConfig))
extends TestKit(ActorSystem("ReconfigurationIntegrationSpec", AmberRuntime.pekkoConfig))
with ImplicitSender
with AnyFlatSpecLike
with BeforeAndAfterAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class LogreplayPrimitivesSpec extends AnyFlatSpec with BeforeAndAfterAll {
// so no Pekko threads outlive the suite. (Same pattern as
// CheckpointSubsystemSpec.)
private val testSystem: ActorSystem =
ActorSystem("LogreplayPrimitivesSpec-test", AmberRuntime.akkaConfig)
ActorSystem("LogreplayPrimitivesSpec-test", AmberRuntime.pekkoConfig)
private val testSerde: Serialization = SerializationExtension(testSystem)

private def setAmberRuntimeField(name: String, value: AnyRef): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ class CongestionControlSpec extends AnyFlatSpec {
}

it should "return only the messages whose sentTime is older than resendTimeLimit" in {
// Cover the AkkaMessageTransferService.checkResend() retransmission path:
// Cover the PekkoMessageTransferService.checkResend() retransmission path:
// the in-transit message that has been sitting past the 60s
// resendTimeLimit must surface; the freshly-sent one must not.
val cc = new CongestionControl()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ import org.apache.texera.amber.core.workflow.WorkflowContext.{
DEFAULT_WORKFLOW_ID
}
import org.apache.texera.amber.engine.architecture.common.{
AkkaActorRefMappingService,
AkkaActorService,
PekkoActorRefMappingService,
PekkoActorService,
WorkflowActor
}
import org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
Expand Down Expand Up @@ -78,8 +78,8 @@ object RegionCoordinatorTestSupport {
)

case class ControllerHarnessFixture(
actorService: AkkaActorService,
actorRefService: AkkaActorRefMappingService
actorService: PekkoActorService,
actorRefService: PekkoActorRefMappingService
)

/**
Expand Down Expand Up @@ -231,7 +231,7 @@ trait RegionCoordinatorTestSupport { self: TestKit =>
}

protected def registerLiveWorker(
actorRefService: AkkaActorRefMappingService,
actorRefService: PekkoActorRefMappingService,
workerId: ActorVirtualIdentity
): ActorRef = {
val workerRef = system.actorOf(Props(new IdleActor), s"worker-${System.nanoTime()}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.testkit.TestKit
import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity}
import org.apache.texera.amber.core.workflow.PhysicalOp
import org.apache.texera.amber.engine.architecture.common.AkkaActorRefMappingService
import org.apache.texera.amber.engine.architecture.common.PekkoActorRefMappingService
import org.apache.texera.amber.engine.architecture.controller.ControllerConfig
import org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
import org.apache.texera.amber.engine.architecture.rpc.controlreturns._
Expand All @@ -51,7 +51,7 @@ import java.util.concurrent.atomic
* workers terminated, and allow the next region to start.
*/
class RegionExecutionCoordinatorSpec
extends TestKit(ActorSystem("RegionExecutionCoordinatorSpec", AmberRuntime.akkaConfig))
extends TestKit(ActorSystem("RegionExecutionCoordinatorSpec", AmberRuntime.pekkoConfig))
with AnyFlatSpecLike
with BeforeAndAfterAll
with RegionCoordinatorTestSupport {
Expand Down Expand Up @@ -117,7 +117,7 @@ class RegionExecutionCoordinatorSpec
region: Region,
physicalOp: PhysicalOp,
workerId: ActorVirtualIdentity,
actorRefService: AkkaActorRefMappingService
actorRefService: PekkoActorRefMappingService
)

private def createSingleRegionFixture(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpecLike

class WorkflowExecutionCoordinatorSpec
extends TestKit(ActorSystem("WorkflowExecutionCoordinatorSpec", AmberRuntime.akkaConfig))
extends TestKit(ActorSystem("WorkflowExecutionCoordinatorSpec", AmberRuntime.pekkoConfig))
with AnyFlatSpecLike
with BeforeAndAfterAll
with RegionCoordinatorTestSupport {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class DummyOperatorExecutor extends OperatorExecutor {
}

class WorkerSpec
extends TestKit(ActorSystem("WorkerSpec", AmberRuntime.akkaConfig))
extends TestKit(ActorSystem("WorkerSpec", AmberRuntime.pekkoConfig))
with ImplicitSender
with AnyFlatSpecLike
with BeforeAndAfterAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class CheckpointSubsystemSpec extends AnyFlatSpec with BeforeAndAfterAll {
// and AmberRuntime's reference are torn down in afterAll, so no Pekko
// threads outlive the test (matching ControllerSpec/WorkerSpec hygiene).
private val testSystem: ActorSystem =
ActorSystem("CheckpointSubsystemSpec-test", AmberRuntime.akkaConfig)
ActorSystem("CheckpointSubsystemSpec-test", AmberRuntime.pekkoConfig)
private val testSerde: Serialization = SerializationExtension(testSystem)

private def setAmberRuntimeField(name: String, value: AnyRef): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Outcome, Retries}
import scala.concurrent.duration.DurationInt

class DataProcessingSpec
extends TestKit(ActorSystem("DataProcessingSpec", AmberRuntime.akkaConfig))
extends TestKit(ActorSystem("DataProcessingSpec", AmberRuntime.pekkoConfig))
with ImplicitSender
with AnyFlatSpecLike
with BeforeAndAfterAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Outcome, Retries}
import scala.concurrent.duration._

class PauseSpec
extends TestKit(ActorSystem("PauseSpec", AmberRuntime.akkaConfig))
extends TestKit(ActorSystem("PauseSpec", AmberRuntime.pekkoConfig))
with ImplicitSender
with AnyFlatSpecLike
with BeforeAndAfterAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.scalatest.flatspec.AnyFlatSpecLike
import scala.concurrent.duration._

class ReconfigurationSpec
extends TestKit(ActorSystem("ReconfigurationSpec", AmberRuntime.akkaConfig))
extends TestKit(ActorSystem("ReconfigurationSpec", AmberRuntime.pekkoConfig))
with ImplicitSender
with AnyFlatSpecLike
with BeforeAndAfterAll
Expand Down
Loading
Loading