Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
<module>zeppelin-jupyter-interpreter-shaded</module>
<module>groovy</module>
<module>spark</module>
<module>spark-connect</module>
<module>spark-submit</module>
<module>submarine</module>
<module>markdown</module>
Expand Down
44 changes: 35 additions & 9 deletions scripts/docker/zeppelin-interpreter/env_python_3_with_R.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,55 @@ channels:
- defaults
dependencies:
- python >=3.9,<3.10
- pyspark=3.3.2
- pyspark=3.5
- pycodestyle
- scipy
# --- Core data libraries ---
- pandas
- numpy
- scipy
- pyarrow
# --- Spark Connect protocol ---
- grpcio
- protobuf
# --- HTTP / networking ---
- requests
- urllib3
# --- File format support ---
- openpyxl
- xlrd
- pyyaml
- tabulate
# --- GCP access ---
- google-cloud-storage
- google-auth
- gcsfs
# --- Visualization ---
- matplotlib
- seaborn
- plotly
- plotnine
- altair
- vega_datasets
- hvplot
# --- SQL on pandas ---
- pandasql
# --- ML ---
- scikit-learn
- xgboost
# --- IPython / kernel ---
- ipython
- ipykernel
- jupyter_client
- hvplot
- plotnine
- seaborn
# --- Data connectors ---
- intake
- intake-parquet
- intake-xarray
- altair
- vega_datasets
- plotly
# --- pip-only packages ---
- pip
- pip:
# works for regular pip packages
- bkzep==0.6.1
- delta-spark==3.2.1
# --- R support ---
- r-base=3
- r-data.table
- r-evaluate
Expand Down
130 changes: 130 additions & 0 deletions spark-connect/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>zeppelin-interpreter-parent</artifactId>
<groupId>org.apache.zeppelin</groupId>
<version>0.11.2</version>
<relativePath>../zeppelin-interpreter-parent/pom.xml</relativePath>
</parent>

<artifactId>spark-connect-interpreter</artifactId>
<packaging>jar</packaging>
<name>Zeppelin: Spark Connect Interpreter</name>
<description>Zeppelin Spark Connect support via gRPC client</description>

<properties>
<interpreter.name>spark-connect</interpreter.name>
<spark.connect.version>3.5.3</spark.connect.version>
<spark.scala.binary.version>2.12</spark.scala.binary.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-connect-client-jvm_${spark.scala.binary.version}</artifactId>
<version>${spark.connect.version}</version>
</dependency>

<dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-python</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
<artifactSet>
<excludes>
<exclude>org.apache.zeppelin:zeppelin-interpreter-shaded</exclude>
</excludes>
</artifactSet>
<relocations>
<relocation>
<pattern>io.netty</pattern>
<shadedPattern>org.apache.zeppelin.spark.connect.io.netty</shadedPattern>
</relocation>
<relocation>
<pattern>com.google</pattern>
<shadedPattern>org.apache.zeppelin.spark.connect.com.google</shadedPattern>
</relocation>
<relocation>
<pattern>io.grpc</pattern>
<shadedPattern>org.apache.zeppelin.spark.connect.io.grpc</shadedPattern>
</relocation>
</relocations>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>spark-connect-3.5</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spark.connect.version>3.5.3</spark.connect.version>
</properties>
</profile>
</profiles>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.spark;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.python.IPythonInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

/**
* PySpark Connect Interpreter which uses IPython underlying.
* Reuses the Java SparkSession from SparkConnectInterpreter via Py4j.
*/
public class IPySparkConnectInterpreter extends IPythonInterpreter {

private static final Logger LOGGER = LoggerFactory.getLogger(IPySparkConnectInterpreter.class);

private SparkConnectInterpreter sparkConnectInterpreter;
private PySparkConnectInterpreter pySparkConnectInterpreter;
private boolean opened = false;
private InterpreterContext curIntpContext;

public IPySparkConnectInterpreter(Properties property) {
super(property);
}

@Override
public synchronized void open() throws InterpreterException {
if (opened) {
return;
}

this.sparkConnectInterpreter =
getInterpreterInTheSameSessionByClassName(SparkConnectInterpreter.class);
this.pySparkConnectInterpreter =
getInterpreterInTheSameSessionByClassName(PySparkConnectInterpreter.class, false);

sparkConnectInterpreter.open();

setProperty("zeppelin.python", pySparkConnectInterpreter.getPythonExec());
setUseBuiltinPy4j(true);
setAdditionalPythonInitFile("python/zeppelin_isparkconnect.py");
super.open();
opened = true;
Comment on lines +54 to +65
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Rollback the Spark Connect session if IPython startup fails.

After Line 59 succeeds, any exception from Python setup or super.open() leaves sparkConnectInterpreter running even though this wrapper never finished opening.

Suggested fix
   `@Override`
   public synchronized void open() throws InterpreterException {
     if (opened) {
       return;
     }
 
-    this.sparkConnectInterpreter =
-        getInterpreterInTheSameSessionByClassName(SparkConnectInterpreter.class);
-    this.pySparkConnectInterpreter =
-        getInterpreterInTheSameSessionByClassName(PySparkConnectInterpreter.class, false);
-
-    sparkConnectInterpreter.open();
-
-    setProperty("zeppelin.python", pySparkConnectInterpreter.getPythonExec());
-    setUseBuiltinPy4j(true);
-    setAdditionalPythonInitFile("python/zeppelin_isparkconnect.py");
-    super.open();
-    opened = true;
+    try {
+      this.sparkConnectInterpreter =
+          getInterpreterInTheSameSessionByClassName(SparkConnectInterpreter.class);
+      this.pySparkConnectInterpreter =
+          getInterpreterInTheSameSessionByClassName(PySparkConnectInterpreter.class, false);
+
+      sparkConnectInterpreter.open();
+
+      setProperty("zeppelin.python", pySparkConnectInterpreter.getPythonExec());
+      setUseBuiltinPy4j(true);
+      setAdditionalPythonInitFile("python/zeppelin_isparkconnect.py");
+      super.open();
+      opened = true;
+    } catch (InterpreterException | RuntimeException e) {
+      try {
+        if (sparkConnectInterpreter != null) {
+          sparkConnectInterpreter.close();
+        }
+      } catch (InterpreterException closeError) {
+        LOGGER.warn("Error rolling back Spark Connect interpreter after open failure",
+            closeError);
+      }
+      sparkConnectInterpreter = null;
+      pySparkConnectInterpreter = null;
+      throw e;
+    }
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/IPySparkConnectInterpreter.java`
around lines 54 - 65, After opening sparkConnectInterpreter, ensure you roll
back/close it if any following Python setup or super.open() fails: wrap the
calls that use pySparkConnectInterpreter (setProperty("zeppelin.python", ...),
setUseBuiltinPy4j(true), setAdditionalPythonInitFile(...)) and super.open() in a
try/catch (or try/finally) and on any exception call
sparkConnectInterpreter.close() (and set opened=false if you set it before)
before rethrowing the exception; reference the existing symbols
sparkConnectInterpreter, pySparkConnectInterpreter, setProperty,
setUseBuiltinPy4j, setAdditionalPythonInitFile, super.open, and opened to locate
where to add the rollback.

}

@Override
public org.apache.zeppelin.interpreter.InterpreterResult interpret(String st,
InterpreterContext context) throws InterpreterException {
InterpreterContext.set(context);
this.curIntpContext = context;
String setInptContextStmt = "intp.setInterpreterContextInPython()";
org.apache.zeppelin.interpreter.InterpreterResult result =
super.interpret(setInptContextStmt, context);
if (result.code().equals(org.apache.zeppelin.interpreter.InterpreterResult.Code.ERROR)) {
return new org.apache.zeppelin.interpreter.InterpreterResult(
org.apache.zeppelin.interpreter.InterpreterResult.Code.ERROR,
"Fail to setCurIntpContext");
}

return super.interpret(st, context);
}

public void setInterpreterContextInPython() {
InterpreterContext.set(curIntpContext);
}

public SparkSession getSparkSession() {
if (sparkConnectInterpreter != null) {
return sparkConnectInterpreter.getSparkSession();
}
return null;
}

@Override
public void cancel(InterpreterContext context) throws InterpreterException {
super.cancel(context);
if (sparkConnectInterpreter != null) {
sparkConnectInterpreter.cancel(context);
}
}

@Override
public void close() throws InterpreterException {
LOGGER.info("Close IPySparkConnectInterpreter (opened={})", opened);
try {
super.close();
} finally {
opened = false;
sparkConnectInterpreter = null;
pySparkConnectInterpreter = null;
LOGGER.info("IPySparkConnectInterpreter closed and state reset — ready for re-open");
Comment on lines +105 to +113
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Close the underlying SparkConnectInterpreter here too.

open() explicitly opens sparkConnectInterpreter, but close() only tears down the IPython side. Reopening this interpreter can therefore reuse a stale Spark Connect session and keep the per-user session slot occupied.

Suggested fix
   `@Override`
   public void close() throws InterpreterException {
     LOGGER.info("Close IPySparkConnectInterpreter (opened={})", opened);
     try {
       super.close();
     } finally {
+      if (sparkConnectInterpreter != null) {
+        try {
+          sparkConnectInterpreter.close();
+        } catch (InterpreterException e) {
+          LOGGER.warn("Error closing SparkConnectInterpreter", e);
+        }
+      }
       opened = false;
+      curIntpContext = null;
       sparkConnectInterpreter = null;
       pySparkConnectInterpreter = null;
       LOGGER.info("IPySparkConnectInterpreter closed and state reset — ready for re-open");
     }
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public void close() throws InterpreterException {
LOGGER.info("Close IPySparkConnectInterpreter (opened={})", opened);
try {
super.close();
} finally {
opened = false;
sparkConnectInterpreter = null;
pySparkConnectInterpreter = null;
LOGGER.info("IPySparkConnectInterpreter closed and state reset — ready for re-open");
public void close() throws InterpreterException {
LOGGER.info("Close IPySparkConnectInterpreter (opened={})", opened);
try {
super.close();
} finally {
if (sparkConnectInterpreter != null) {
try {
sparkConnectInterpreter.close();
} catch (InterpreterException e) {
LOGGER.warn("Error closing SparkConnectInterpreter", e);
}
}
opened = false;
curIntpContext = null;
sparkConnectInterpreter = null;
pySparkConnectInterpreter = null;
LOGGER.info("IPySparkConnectInterpreter closed and state reset — ready for re-open");
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark-connect/src/main/java/org/apache/zeppelin/spark/IPySparkConnectInterpreter.java`
around lines 105 - 113, The close() method currently resets IPySpark state but
does not close the underlying SparkConnectInterpreter, causing stale sessions;
update IPySparkConnectInterpreter.close() to check if sparkConnectInterpreter is
non-null and call its close() (handling/propagating InterpreterException as
appropriate) before nullifying it, ensuring you call super.close() and then
close sparkConnectInterpreter (or close it inside the try/finally) so the
underlying SparkConnectInterpreter is properly closed and its session slot
released.

}
}

@Override
public int getProgress(InterpreterContext context) throws InterpreterException {
return 0;
}

public int getMaxResult() {
if (sparkConnectInterpreter != null) {
return sparkConnectInterpreter.getMaxResult();
}
return 1000;
}

@SuppressWarnings("unchecked")
public String formatDataFrame(Object df, int maxResult) {
return SparkConnectUtils.showDataFrame((Dataset<Row>) df, maxResult);
}
}
Loading
Loading