From 5c8965872146fddbf29ee733a63cbc47fd7f29ba Mon Sep 17 00:00:00 2001 From: Sarah Asad Date: Sun, 3 May 2026 23:13:10 -0700 Subject: [PATCH 01/13] Files added --- .../pythonvirtualenvironment/PveManager.scala | 132 ++++++++++++++-- .../PveResource.scala | 9 +- .../PveWebsocketResource.scala | 36 ++++- .../computing-unit-selection.component.html | 84 +++++++++- .../computing-unit-selection.component.ts | 148 ++++++++++++------ .../virtual-environment.service.ts | 9 +- frontend/src/styles.scss | 10 +- 7 files changed, 352 insertions(+), 76 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala index 0399e386ba7..3231565e0ff 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala @@ -32,7 +32,8 @@ import org.apache.texera.amber.config.PythonUtils * for each Computing Unit * * It supports: - * - Creating and initializing isolated Python environments + * - Creating and initializing isolated Python environments (with system packages) + * - installing user defined packages * - Streaming pip output logs back to the caller * * Each PVE is stored under: @@ -41,6 +42,11 @@ import org.apache.texera.amber.config.PythonUtils object PveManager { + case class PvePackageResponse( + pveName: String, + userPackages: Seq[String] + ) + private val VenvRoot: Path = Paths.get("/tmp/texera-pve/venvs") private def cuidDir(cuid: Int, pveName: String): Path = { @@ -121,18 +127,6 @@ object PveManager { return } - if (!Files.exists(requirementsPath)) { - queue.put(s"[PVE][ERR] requirements.txt not found at ${requirementsPath.toAbsolutePath}") - return - } - - if (!Files.exists(operatorRequirementsPath)) { - queue.put( - s"[PVE][ERR] operator-requirements.txt not found at ${operatorRequirementsPath.toAbsolutePath}" - ) - return - } - queue.put( s"[PVE] Installing requirements from ${requirementsPath.toAbsolutePath} and ${operatorRequirementsPath.toAbsolutePath}" ) @@ -170,7 +164,8 @@ object PveManager { queue.put(s"[PVE] Created new environment for cuid = $cuid") } - def getEnvironments(cuid: Int): List[String] = { + // returns list of PVE names and corresponding user packages for a given CU + def getEnvironments(cuid: Int): List[PvePackageResponse] = { val cuPath = VenvRoot.resolve(cuid.toString) @@ -185,7 +180,27 @@ object PveManager { .iterator() .asScala .filter(path => Files.isDirectory(path)) - .map(path => path.getFileName.toString) + .map { path => + val pveName = path.getFileName.toString + val metadataPath = path.resolve("user-packages.txt") + + val userPackages = + if (Files.exists(metadataPath)) { + Files + .readAllLines(metadataPath) + .asScala + .map(_.trim) + .filter(_.nonEmpty) + .toSeq + } else { + Seq() + } + + PvePackageResponse( + pveName = pveName, + userPackages = userPackages + ) + } .toList } finally { stream.close() @@ -212,4 +227,91 @@ object PveManager { stream.close() } } + + /** + * Installs user requested Python packages into the PVE. + * + * 1. Executes pip install for each package + * 2. Updates user metadata file + * 3. Streams logs back via queue + */ + def installUserPackages( + packages: List[String], + cuid: Int, + queue: BlockingQueue[String], + pveName: String + ): Unit = { + + val python = pythonBinPath(cuid, pveName).toAbsolutePath.toString + val envVars = pipEnv + + if (!Files.exists(Paths.get(python))) { + queue.put(s"[PVE][ERR] Python executable not found for PVE: $python") + return + } + + val metadataPath = cuidDir(cuid, pveName).resolve("user-packages.txt") + Files.createDirectories(metadataPath.getParent) + + var installedPackages = + if (Files.exists(metadataPath)) { + Files + .readAllLines(metadataPath) + .asScala + .map(_.trim) + .filter(_.nonEmpty) + .toSet + } else { + Set[String]() + } + + packages.foreach { pkg => + val trimmedPkg = pkg.trim + + if (trimmedPkg.nonEmpty) { + queue.put(s"[PVE] Installing package: $trimmedPkg") + + val code = Process( + Seq( + python, + "-u", + "-m", + "pip", + "install", + "--progress-bar", + "off", + "--no-input", + trimmedPkg + ), + None, + envVars.toSeq: _* + ).!( + ProcessLogger( + out => queue.put(s"[pip] $out"), + err => queue.put(s"[pip][ERR] $err") + ) + ) + + queue.put(s"[pip] install($trimmedPkg) finished with exit code $code") + + if (code != 0) { + queue.put(s"[PVE][ERR] Failed to install package: $trimmedPkg") + return + } + + installedPackages = installedPackages + trimmedPkg + + Files.write( + metadataPath, + installedPackages.toSeq.sorted.asJava + ) + } + } + + queue.put("[PVE] Final user package list:") + + installedPackages.toSeq.sorted.foreach { pkg => + queue.put(s"[user-package] $pkg") + } + } } diff --git a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResource.scala index 1040fd64ea4..0a058ed6f5c 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResource.scala @@ -28,7 +28,7 @@ import java.util @Consumes(Array(MediaType.APPLICATION_JSON)) class PveResource { // -------------------------------------------------- - // Get installed packages + // Get system packages // -------------------------------------------------- @GET @Path("/system") @@ -45,7 +45,7 @@ class PveResource { } // -------------------------------------------------- - // Fetch PVEs + // Fetch PVEs and Installed User Packages // -------------------------------------------------- @GET @Path("/pves") @@ -54,9 +54,10 @@ class PveResource { try { PveManager .getEnvironments(cuid) - .map { pveName => + .map { pve => Map( - "pveName" -> pveName.asInstanceOf[Object] + "pveName" -> pve.pveName.asInstanceOf[Object], + "userPackages" -> pve.userPackages.asJava.asInstanceOf[Object] ).asJava } .asJava diff --git a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveWebsocketResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveWebsocketResource.scala index b93d1bfde03..48931bf37c3 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveWebsocketResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveWebsocketResource.scala @@ -26,9 +26,9 @@ import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global /** - * WebSocket endpoint for PVE creation that streams pip installation logs - * to the frontend in real time. The environment setup runs asynchronously, - * and output is pushed to the client until completion. + * WebSocket endpoint for PVE creation and user pacakge installation that streams + * pip installation logs to the frontend in real time. The environment setup runs + * asynchronously, and output is pushed to the client until completion. */ @ServerEndpoint("/wsapi/pve") @@ -42,12 +42,33 @@ class PveWebsocketResource { val cuid = params.get("cuid").get(0).toInt val pveName = params.get("pveName").get(0) val isLocal = params.get("isLocal").get(0).toBoolean + val action = params.getOrDefault("action", java.util.List.of("create")).get(0) val queue = new LinkedBlockingQueue[String]() Future { try { - PveManager.createNewPve(cuid, queue, pveName, isLocal) + action match { + case "create" => + PveManager.createNewPve(cuid, queue, pveName, isLocal) + + case "install" => + val packages = + params + .getOrDefault("packages", java.util.List.of("[]")) + .get(0) + .stripPrefix("[") + .stripSuffix("]") + .split(",") + .toList + .map(_.replace("\"", "").trim) + .filter(_.nonEmpty) + + PveManager.installUserPackages(packages, cuid, queue, pveName) + + case _ => + queue.put(s"[ERR] Unknown action: $action") + } } catch { case e: Exception => queue.put(s"[ERR] ${e.getMessage}") @@ -60,11 +81,10 @@ class PveWebsocketResource { var done = false while (!done && session.isOpen) { - val line = queue.take() - - session.getBasicRemote.sendText(line) + val msg = queue.take() + session.getBasicRemote.sendText(msg) - if (line == "__DONE__") { + if (msg == "__DONE__") { done = true session.close() } diff --git a/frontend/src/app/workspace/component/power-button/computing-unit-selection.component.html b/frontend/src/app/workspace/component/power-button/computing-unit-selection.component.html index b742c71581c..1a0dc7ef339 100644 --- a/frontend/src/app/workspace/component/power-button/computing-unit-selection.component.html +++ b/frontend/src/app/workspace/component/power-button/computing-unit-selection.component.html @@ -480,7 +480,7 @@
-
+
+ + +
+
+
+ +
+ +
+ +
+ +
+ +
+
+
+ + +
+
+
+ + +
+ +
+ + + + + + +
+ +
+ + +
+
+
+ +
+ +
+