From 31bf1e0c090396f2c1c91c4d21da8971c7ce3244 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Tue, 5 May 2026 08:15:59 -0700 Subject: [PATCH] refactor(amber): rename remaining Akka* identifiers to Pekko* MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The project moved off Akka onto Apache Pekko, but several internal Scala identifiers still carried the `Akka` prefix even though they wrap Pekko APIs — confusing when grepping the codebase for the actor runtime. Pure rename: - `AkkaConfig` → `PekkoConfig` (object + file) - `AkkaActorService` → `PekkoActorService` (class + file) - `AkkaActorRefMappingService` → `PekkoActorRefMappingService` - `AkkaMessageTransferService` → `PekkoMessageTransferService` - `akkaConfig`, `akkaActorService` (methods/params) → pekko-prefixed. No string literals, config keys, or serialization registrations change — `cluster.conf` already uses `pekko.*` keys, kryo registry doesn't reference these classes by name. The intentional `"akka"` comment in `DeployStrategiesSpec.scala` (which contrasts pekko vs akka address strings) stays. Closes #4948. --- .../common/ExecutorDeployment.scala | 2 +- ...cala => PekkoActorRefMappingService.scala} | 2 +- ...rService.scala => PekkoActorService.scala} | 2 +- ...cala => PekkoMessageTransferService.scala} | 6 +++--- .../architecture/common/WorkflowActor.scala | 8 ++++---- .../controller/ControllerProcessor.scala | 20 +++++++++---------- .../controller/ControllerTimerService.scala | 6 +++--- .../messaginglayer/WorkerTimerService.scala | 4 ++-- .../RegionExecutionCoordinator.scala | 10 +++++----- .../WorkflowExecutionCoordinator.scala | 10 +++++----- .../amber/engine/common/AmberRuntime.scala | 10 +++++----- .../e2e/ReconfigurationIntegrationSpec.scala | 2 +- .../logreplay/LogreplayPrimitivesSpec.scala | 2 +- .../CongestionControlSpec.scala | 2 +- .../RegionCoordinatorTestSupport.scala | 10 +++++----- .../RegionExecutionCoordinatorSpec.scala | 6 +++--- .../WorkflowExecutionCoordinatorSpec.scala | 2 +- .../architecture/worker/WorkerSpec.scala | 2 +- .../common/CheckpointSubsystemSpec.scala | 2 +- .../amber/engine/e2e/DataProcessingSpec.scala | 2 +- .../texera/amber/engine/e2e/PauseSpec.scala | 2 +- .../engine/e2e/ReconfigurationSpec.scala | 2 +- .../faulttolerance/CheckpointSpec.scala | 2 +- .../engine/faulttolerance/LoggingSpec.scala | 2 +- .../{AkkaConfig.scala => PekkoConfig.scala} | 4 ++-- 25 files changed, 61 insertions(+), 61 deletions(-) rename amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/{AkkaActorRefMappingService.scala => PekkoActorRefMappingService.scala} (98%) rename amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/{AkkaActorService.scala => PekkoActorService.scala} (96%) rename amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/{AkkaMessageTransferService.scala => PekkoMessageTransferService.scala} (98%) rename common/config/src/main/scala/org/apache/texera/amber/config/{AkkaConfig.scala => PekkoConfig.scala} (90%) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala index cf41297c981..fbb5b99ce6b 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala @@ -38,7 +38,7 @@ object ExecutorDeployment { def createWorkers( op: PhysicalOp, - controllerActorService: AkkaActorService, + controllerActorService: PekkoActorService, operatorExecution: OperatorExecution, operatorConfig: OperatorConfig, stateRestoreConfig: Option[StateRestoreConfig], diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaActorRefMappingService.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoActorRefMappingService.scala similarity index 98% rename from amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaActorRefMappingService.scala rename to amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoActorRefMappingService.scala index 6cad3147031..323435891ee 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaActorRefMappingService.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoActorRefMappingService.scala @@ -33,7 +33,7 @@ import org.apache.texera.amber.util.VirtualIdentityUtils import scala.collection.mutable -class AkkaActorRefMappingService(actorService: AkkaActorService) extends AmberLogging { +class PekkoActorRefMappingService(actorService: PekkoActorService) extends AmberLogging { override def actorId: ActorVirtualIdentity = actorService.id diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaActorService.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoActorService.scala similarity index 96% rename from amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaActorService.scala rename to amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoActorService.scala index 10a6d7a38c6..f5bbd0619c9 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaActorService.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoActorService.scala @@ -28,7 +28,7 @@ import org.apache.texera.amber.engine.common.FutureBijection._ import scala.concurrent.ExecutionContext import scala.concurrent.duration.{DurationInt, FiniteDuration} -class AkkaActorService(val id: ActorVirtualIdentity, actorContext: ActorContext) { +class PekkoActorService(val id: ActorVirtualIdentity, actorContext: ActorContext) { implicit def ec: ExecutionContext = actorContext.dispatcher diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaMessageTransferService.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoMessageTransferService.scala similarity index 98% rename from amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaMessageTransferService.scala rename to amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoMessageTransferService.scala index 3401e3ff639..cba9b0b2ee4 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaMessageTransferService.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoMessageTransferService.scala @@ -30,9 +30,9 @@ import org.apache.texera.amber.engine.common.ambermessage.WorkflowFIFOMessage import scala.collection.mutable import scala.concurrent.duration.DurationInt -class AkkaMessageTransferService( - actorService: AkkaActorService, - refService: AkkaActorRefMappingService, +class PekkoMessageTransferService( + actorService: PekkoActorService, + refService: PekkoActorRefMappingService, handleBackpressure: Boolean => Unit ) extends AmberLogging { diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/WorkflowActor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/WorkflowActor.scala index 5ce64a0a3e0..3253554bdf2 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/WorkflowActor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/WorkflowActor.scala @@ -85,7 +85,7 @@ abstract class WorkflowActor( // // Akka related components: // - val actorService: AkkaActorService = new AkkaActorService(actorId, this.context) + val actorService: PekkoActorService = new PekkoActorService(actorId, this.context) actorService.getAvailableNodeAddressesFunc = () => { implicit val timeout: Timeout = 5.seconds Await @@ -95,12 +95,12 @@ abstract class WorkflowActor( ) .asInstanceOf[Array[Address]] } - val actorRefMappingService: AkkaActorRefMappingService = new AkkaActorRefMappingService( + val actorRefMappingService: PekkoActorRefMappingService = new PekkoActorRefMappingService( actorService ) actorRefMappingService.registerActorRef(actorId, self) - val transferService: AkkaMessageTransferService = - new AkkaMessageTransferService(actorService, actorRefMappingService, handleBackpressure) + val transferService: PekkoMessageTransferService = + new PekkoMessageTransferService(actorService, actorRefMappingService, handleBackpressure) logger.info(s"worker replay log writing conf: $replayLogConfOpt") diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala index 3ff8e7d978a..ef33174b6b0 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala @@ -22,9 +22,9 @@ package org.apache.texera.amber.engine.architecture.controller import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity import org.apache.texera.amber.core.workflow.WorkflowContext import org.apache.texera.amber.engine.architecture.common.{ - AkkaActorRefMappingService, - AkkaActorService, - AkkaMessageTransferService, + PekkoActorRefMappingService, + PekkoActorService, + PekkoMessageTransferService, AmberProcessor } import org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution @@ -57,21 +57,21 @@ class ControllerProcessor( this.controllerTimerService = controllerTimerService } - @transient var transferService: AkkaMessageTransferService = _ + @transient var transferService: PekkoMessageTransferService = _ - def setupTransferService(transferService: AkkaMessageTransferService): Unit = { + def setupTransferService(transferService: PekkoMessageTransferService): Unit = { this.transferService = transferService } - @transient var actorService: AkkaActorService = _ + @transient var actorService: PekkoActorService = _ - def setupActorService(akkaActorService: AkkaActorService): Unit = { - this.actorService = akkaActorService + def setupActorService(pekkoActorService: PekkoActorService): Unit = { + this.actorService = pekkoActorService } - @transient var actorRefService: AkkaActorRefMappingService = _ + @transient var actorRefService: PekkoActorRefMappingService = _ - def setupActorRefService(actorRefService: AkkaActorRefMappingService): Unit = { + def setupActorRefService(actorRefService: PekkoActorRefMappingService): Unit = { this.actorRefService = actorRefService this.workflowExecutionCoordinator.setupActorRefService(this.actorRefService) } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerTimerService.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerTimerService.scala index a778a27c46c..a4ad0898c9b 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerTimerService.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerTimerService.scala @@ -20,7 +20,7 @@ package org.apache.texera.amber.engine.architecture.controller import org.apache.pekko.actor.Cancellable -import org.apache.texera.amber.engine.architecture.common.AkkaActorService +import org.apache.texera.amber.engine.architecture.common.PekkoActorService import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ AsyncRPCContext, QueryStatisticsRequest, @@ -34,7 +34,7 @@ import scala.concurrent.duration.{DurationInt, FiniteDuration, MILLISECONDS} class ControllerTimerService( controllerConfig: ControllerConfig, - akkaActorService: AkkaActorService + pekkoActorService: PekkoActorService ) { var statusUpdateAskHandle: Option[Cancellable] = None var runtimeStatisticsAskHandle: Option[Cancellable] = None @@ -46,7 +46,7 @@ class ControllerTimerService( ): Option[Cancellable] = { if (intervalMs.nonEmpty && handleOpt.isEmpty) { Option( - akkaActorService.sendToSelfWithFixedDelay( + pekkoActorService.sendToSelfWithFixedDelay( 0.milliseconds, FiniteDuration.apply(intervalMs.get, MILLISECONDS), ControlInvocation( diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/WorkerTimerService.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/WorkerTimerService.scala index 3bb87febd93..006c9614fd7 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/WorkerTimerService.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/WorkerTimerService.scala @@ -21,7 +21,7 @@ package org.apache.texera.amber.engine.architecture.messaginglayer import org.apache.pekko.actor.Cancellable import org.apache.texera.amber.config.ApplicationConfig -import org.apache.texera.amber.engine.architecture.common.AkkaActorService +import org.apache.texera.amber.engine.architecture.common.PekkoActorService import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ AsyncRPCContext, EmptyRequest @@ -33,7 +33,7 @@ import org.apache.texera.amber.engine.common.virtualidentity.util.SELF import scala.concurrent.duration.{DurationInt, FiniteDuration, MILLISECONDS} -class WorkerTimerService(actorService: AkkaActorService) { +class WorkerTimerService(actorService: PekkoActorService) { private val enabledAdaptiveBatching = ApplicationConfig.enableAdaptiveNetworkBuffering private val adaptiveBatchInterval = ApplicationConfig.adaptiveBufferingTimeoutMs diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 254c16bf34b..2971e4c4f4e 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -26,8 +26,8 @@ import org.apache.texera.amber.core.storage.VFSURIFactory.decodeURI import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PhysicalLink, PhysicalOp} import org.apache.texera.amber.engine.architecture.common.{ - AkkaActorRefMappingService, - AkkaActorService, + PekkoActorRefMappingService, + PekkoActorService, ExecutorDeployment } import org.apache.texera.amber.engine.architecture.controller.execution.{ @@ -95,8 +95,8 @@ class RegionExecutionCoordinator( workflowExecution: WorkflowExecution, asyncRPCClient: AsyncRPCClient, controllerConfig: ControllerConfig, - actorService: AkkaActorService, - actorRefService: AkkaActorRefMappingService + actorService: PekkoActorService, + actorRefService: PekkoActorRefMappingService ) extends AmberLogging { initRegionExecution() @@ -374,7 +374,7 @@ class RegionExecutionCoordinator( } private def buildOperator( - actorService: AkkaActorService, + actorService: PekkoActorService, physicalOp: PhysicalOp, operatorConfig: OperatorConfig, operatorExecution: OperatorExecution diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala index df504bf92d2..deb753beb37 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala @@ -23,8 +23,8 @@ import com.twitter.util.Future import com.typesafe.scalalogging.LazyLogging import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PhysicalLink} import org.apache.texera.amber.engine.architecture.common.{ - AkkaActorRefMappingService, - AkkaActorService + PekkoActorRefMappingService, + PekkoActorService } import org.apache.texera.amber.engine.architecture.controller.ControllerConfig import org.apache.texera.amber.engine.architecture.controller.ExecutionStateUpdate @@ -49,9 +49,9 @@ class WorkflowExecutionCoordinator( mutable.HashMap() private val completionNotified: AtomicBoolean = new AtomicBoolean(false) - @transient var actorRefService: AkkaActorRefMappingService = _ + @transient var actorRefService: PekkoActorRefMappingService = _ - def setupActorRefService(actorRefService: AkkaActorRefMappingService): Unit = { + def setupActorRefService(actorRefService: PekkoActorRefMappingService): Unit = { this.actorRefService = actorRefService } @@ -62,7 +62,7 @@ class WorkflowExecutionCoordinator( * * After the syncs, if there are no running region(s), it will start new regions (if available). */ - def coordinateRegionExecutors(actorService: AkkaActorService): Future[Unit] = { + def coordinateRegionExecutors(actorService: PekkoActorService): Future[Unit] = { val unfinishedRegionCoordinators = regionExecutionCoordinators.values.filter(!_.isCompleted).toSeq diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/common/AmberRuntime.scala b/amber/src/main/scala/org/apache/texera/amber/engine/common/AmberRuntime.scala index 7078f766a63..03234a277e4 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/common/AmberRuntime.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/common/AmberRuntime.scala @@ -23,7 +23,7 @@ import org.apache.pekko.actor.{ActorSystem, Address, Cancellable, DeadLetter, Pr import org.apache.pekko.serialization.{Serialization, SerializationExtension} import com.typesafe.config.{Config, ConfigFactory} import org.apache.texera.amber.clustering.ClusterListener -import org.apache.texera.amber.config.AkkaConfig +import org.apache.texera.amber.config.PekkoConfig import org.apache.texera.amber.engine.architecture.messaginglayer.DeadLetterMonitorActor import java.io.{BufferedReader, InputStreamReader} @@ -39,7 +39,7 @@ object AmberRuntime { def serde: Serialization = { if (_serde == null) { if (_actorSystem == null) { - _serde = SerializationExtension(ActorSystem("Amber", akkaConfig)) + _serde = SerializationExtension(ActorSystem("Amber", pekkoConfig)) } else { _serde = SerializationExtension(_actorSystem) } @@ -83,13 +83,13 @@ object AmberRuntime { pekko.remote.artery.canonical.hostname = $localIpAddress pekko.cluster.seed-nodes = [ "pekko://Amber@$localIpAddress:2552" ] """) - .withFallback(akkaConfig) + .withFallback(pekkoConfig) .resolve() AmberConfig.masterNodeAddr = createMasterAddress(localIpAddress) createAmberSystem(masterConfig) } - def akkaConfig: Config = AkkaConfig.akkaConfig + def pekkoConfig: Config = PekkoConfig.pekkoConfig private def createMasterAddress(addr: String): Address = Address("pekko", "Amber", addr, 2552) @@ -105,7 +105,7 @@ object AmberRuntime { pekko.remote.artery.canonical.port = 0 pekko.cluster.seed-nodes = [ "pekko://Amber@$addr:2552" ] """) - .withFallback(akkaConfig) + .withFallback(pekkoConfig) .resolve() AmberConfig.masterNodeAddr = createMasterAddress(addr) createAmberSystem(workerConfig) diff --git a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala index 768caf079f4..850054ddeb0 100644 --- a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala +++ b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala @@ -52,7 +52,7 @@ import scala.concurrent.duration._ */ @IntegrationTest class ReconfigurationIntegrationSpec - extends TestKit(ActorSystem("ReconfigurationIntegrationSpec", AmberRuntime.akkaConfig)) + extends TestKit(ActorSystem("ReconfigurationIntegrationSpec", AmberRuntime.pekkoConfig)) with ImplicitSender with AnyFlatSpecLike with BeforeAndAfterAll diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/LogreplayPrimitivesSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/LogreplayPrimitivesSpec.scala index ddb6440c25c..6ec33a6988b 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/LogreplayPrimitivesSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/LogreplayPrimitivesSpec.scala @@ -62,7 +62,7 @@ class LogreplayPrimitivesSpec extends AnyFlatSpec with BeforeAndAfterAll { // so no Pekko threads outlive the suite. (Same pattern as // CheckpointSubsystemSpec.) private val testSystem: ActorSystem = - ActorSystem("LogreplayPrimitivesSpec-test", AmberRuntime.akkaConfig) + ActorSystem("LogreplayPrimitivesSpec-test", AmberRuntime.pekkoConfig) private val testSerde: Serialization = SerializationExtension(testSystem) private def setAmberRuntimeField(name: String, value: AnyRef): Unit = { diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/CongestionControlSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/CongestionControlSpec.scala index 322f96924d7..30fa8ec8b45 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/CongestionControlSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/CongestionControlSpec.scala @@ -214,7 +214,7 @@ class CongestionControlSpec extends AnyFlatSpec { } it should "return only the messages whose sentTime is older than resendTimeLimit" in { - // Cover the AkkaMessageTransferService.checkResend() retransmission path: + // Cover the PekkoMessageTransferService.checkResend() retransmission path: // the in-transit message that has been sitting past the 60s // resendTimeLimit must surface; the freshly-sent one must not. val cc = new CongestionControl() diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala index facba102415..5673c02691f 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala @@ -35,8 +35,8 @@ import org.apache.texera.amber.core.workflow.WorkflowContext.{ DEFAULT_WORKFLOW_ID } import org.apache.texera.amber.engine.architecture.common.{ - AkkaActorRefMappingService, - AkkaActorService, + PekkoActorRefMappingService, + PekkoActorService, WorkflowActor } import org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution @@ -78,8 +78,8 @@ object RegionCoordinatorTestSupport { ) case class ControllerHarnessFixture( - actorService: AkkaActorService, - actorRefService: AkkaActorRefMappingService + actorService: PekkoActorService, + actorRefService: PekkoActorRefMappingService ) /** @@ -231,7 +231,7 @@ trait RegionCoordinatorTestSupport { self: TestKit => } protected def registerLiveWorker( - actorRefService: AkkaActorRefMappingService, + actorRefService: PekkoActorRefMappingService, workerId: ActorVirtualIdentity ): ActorRef = { val workerRef = system.actorOf(Props(new IdleActor), s"worker-${System.nanoTime()}") diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala index 8fab3b67fca..6efbe5e4ca5 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala @@ -24,7 +24,7 @@ import org.apache.pekko.actor.ActorSystem import org.apache.pekko.testkit.TestKit import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity} import org.apache.texera.amber.core.workflow.PhysicalOp -import org.apache.texera.amber.engine.architecture.common.AkkaActorRefMappingService +import org.apache.texera.amber.engine.architecture.common.PekkoActorRefMappingService import org.apache.texera.amber.engine.architecture.controller.ControllerConfig import org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution import org.apache.texera.amber.engine.architecture.rpc.controlreturns._ @@ -51,7 +51,7 @@ import java.util.concurrent.atomic * workers terminated, and allow the next region to start. */ class RegionExecutionCoordinatorSpec - extends TestKit(ActorSystem("RegionExecutionCoordinatorSpec", AmberRuntime.akkaConfig)) + extends TestKit(ActorSystem("RegionExecutionCoordinatorSpec", AmberRuntime.pekkoConfig)) with AnyFlatSpecLike with BeforeAndAfterAll with RegionCoordinatorTestSupport { @@ -117,7 +117,7 @@ class RegionExecutionCoordinatorSpec region: Region, physicalOp: PhysicalOp, workerId: ActorVirtualIdentity, - actorRefService: AkkaActorRefMappingService + actorRefService: PekkoActorRefMappingService ) private def createSingleRegionFixture( diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala index f5fc17f8e01..1a1e4afa319 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala @@ -38,7 +38,7 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.flatspec.AnyFlatSpecLike class WorkflowExecutionCoordinatorSpec - extends TestKit(ActorSystem("WorkflowExecutionCoordinatorSpec", AmberRuntime.akkaConfig)) + extends TestKit(ActorSystem("WorkflowExecutionCoordinatorSpec", AmberRuntime.pekkoConfig)) with AnyFlatSpecLike with BeforeAndAfterAll with RegionCoordinatorTestSupport { diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala index 890fe97b852..f6f33ebbb05 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala @@ -64,7 +64,7 @@ class DummyOperatorExecutor extends OperatorExecutor { } class WorkerSpec - extends TestKit(ActorSystem("WorkerSpec", AmberRuntime.akkaConfig)) + extends TestKit(ActorSystem("WorkerSpec", AmberRuntime.pekkoConfig)) with ImplicitSender with AnyFlatSpecLike with BeforeAndAfterAll diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/common/CheckpointSubsystemSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/common/CheckpointSubsystemSpec.scala index 45b1727afc3..6b46030b6bc 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/common/CheckpointSubsystemSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/common/CheckpointSubsystemSpec.scala @@ -35,7 +35,7 @@ class CheckpointSubsystemSpec extends AnyFlatSpec with BeforeAndAfterAll { // and AmberRuntime's reference are torn down in afterAll, so no Pekko // threads outlive the test (matching ControllerSpec/WorkerSpec hygiene). private val testSystem: ActorSystem = - ActorSystem("CheckpointSubsystemSpec-test", AmberRuntime.akkaConfig) + ActorSystem("CheckpointSubsystemSpec-test", AmberRuntime.pekkoConfig) private val testSerde: Serialization = SerializationExtension(testSystem) private def setAmberRuntimeField(name: String, value: AnyRef): Unit = { diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala index f93909f53f5..d070fefb275 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala @@ -55,7 +55,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Outcome, Retries} import scala.concurrent.duration.DurationInt class DataProcessingSpec - extends TestKit(ActorSystem("DataProcessingSpec", AmberRuntime.akkaConfig)) + extends TestKit(ActorSystem("DataProcessingSpec", AmberRuntime.pekkoConfig)) with ImplicitSender with AnyFlatSpecLike with BeforeAndAfterAll diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala index b459533c573..2cc268608f1 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala @@ -51,7 +51,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Outcome, Retries} import scala.concurrent.duration._ class PauseSpec - extends TestKit(ActorSystem("PauseSpec", AmberRuntime.akkaConfig)) + extends TestKit(ActorSystem("PauseSpec", AmberRuntime.pekkoConfig)) with ImplicitSender with AnyFlatSpecLike with BeforeAndAfterAll diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala index 8cabf8684ac..2cd3559736e 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala @@ -42,7 +42,7 @@ import org.scalatest.flatspec.AnyFlatSpecLike import scala.concurrent.duration._ class ReconfigurationSpec - extends TestKit(ActorSystem("ReconfigurationSpec", AmberRuntime.akkaConfig)) + extends TestKit(ActorSystem("ReconfigurationSpec", AmberRuntime.pekkoConfig)) with ImplicitSender with AnyFlatSpecLike with BeforeAndAfterAll diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala index 73d4601a920..fbc7e8044df 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala @@ -59,7 +59,7 @@ class CheckpointSpec extends AnyFlatSpecLike with BeforeAndAfterAll { ) override def beforeAll(): Unit = { - system = ActorSystem("CheckpointSpec", AmberRuntime.akkaConfig) + system = ActorSystem("CheckpointSpec", AmberRuntime.pekkoConfig) system.actorOf(Props[SingleNodeListener](), "cluster-info") } diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/LoggingSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/LoggingSpec.scala index 87e3ca148ee..9a388119158 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/LoggingSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/LoggingSpec.scala @@ -61,7 +61,7 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime import java.net.URI class LoggingSpec - extends TestKit(ActorSystem("LoggingSpec", AmberRuntime.akkaConfig)) + extends TestKit(ActorSystem("LoggingSpec", AmberRuntime.pekkoConfig)) with ImplicitSender with AnyFlatSpecLike with BeforeAndAfterAll diff --git a/common/config/src/main/scala/org/apache/texera/amber/config/AkkaConfig.scala b/common/config/src/main/scala/org/apache/texera/amber/config/PekkoConfig.scala similarity index 90% rename from common/config/src/main/scala/org/apache/texera/amber/config/AkkaConfig.scala rename to common/config/src/main/scala/org/apache/texera/amber/config/PekkoConfig.scala index 7f2097063a9..33ba24d2476 100644 --- a/common/config/src/main/scala/org/apache/texera/amber/config/AkkaConfig.scala +++ b/common/config/src/main/scala/org/apache/texera/amber/config/PekkoConfig.scala @@ -20,11 +20,11 @@ package org.apache.texera.amber.config import com.typesafe.config.{Config, ConfigFactory} -object AkkaConfig { +object PekkoConfig { // Load configuration private val conf: Config = ConfigFactory.parseResources("cluster.conf").resolve() // Return the complete Pekko configuration with fallback to default application config - def akkaConfig: Config = conf.withFallback(ConfigFactory.defaultApplication()).resolve() + def pekkoConfig: Config = conf.withFallback(ConfigFactory.defaultApplication()).resolve() }