Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion python/pyspark/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ def _do_init(
)
os.environ["SPARK_BUFFER_SIZE"] = str(self._jvm.PythonUtils.getSparkBufferSize(self._jsc))

self.pythonExec = os.environ.get("PYSPARK_PYTHON", "python3")
self.pythonExec = self._get_python_exec_from_conf()
self.pythonVer = "%d.%d" % sys.version_info[:2]

# Broadcast's __reduce__ method stores Broadcast instances here.
Expand Down Expand Up @@ -416,6 +416,51 @@ def signal_handler(signal: Any, frame: Any) -> NoReturn:
):
signal.signal(signal.SIGINT, signal_handler)

def _get_python_exec_from_conf(self) -> str:
"""
Determine the Python executable to use for the driver.

The priority order (highest to lowest):
1. PYSPARK_DRIVER_PYTHON environment variable
2. PYSPARK_PYTHON environment variable
3. spark.pyspark.driver.python configuration
4. spark.pyspark.python configuration
5. spark.executorEnv.PYSPARK_DRIVER_PYTHON configuration
6. spark.executorEnv.PYSPARK_PYTHON configuration
7. Default: 'python3'

This ensures driver and executor Python versions are consistent,
especially in client mode where launch_container.sh may not run.

Returns
-------
str
The path to the Python executable.
"""
# Check environment variables first (highest priority)
env_driver_python = os.environ.get("PYSPARK_DRIVER_PYTHON")
if env_driver_python and env_driver_python.strip():
return env_driver_python.strip()

env_pyspark_python = os.environ.get("PYSPARK_PYTHON")
if env_pyspark_python and env_pyspark_python.strip():
return env_pyspark_python.strip()

# Fall back to Spark configuration keys
config_keys = [
"spark.pyspark.driver.python",
"spark.pyspark.python",
"spark.executorEnv.PYSPARK_DRIVER_PYTHON",
"spark.executorEnv.PYSPARK_PYTHON",
]
for key in config_keys:
python_exec = self._conf.get(key, None)
if python_exec is not None and python_exec.strip() != "":
return python_exec.strip()

# Default fallback
return "python3"

def __repr__(self) -> str:
return "<SparkContext master={master} appName={appName}>".format(
master=self.master,
Expand Down
66 changes: 66 additions & 0 deletions python/pyspark/tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from pyspark import SparkConf, SparkFiles, SparkContext
from pyspark.testing.sqlutils import SPARK_HOME
from pyspark.testing.utils import ReusedPySparkTestCase, PySparkTestCase, QuietTest
from unittest.mock import MagicMock


class CheckpointTests(ReusedPySparkTestCase):
Expand Down Expand Up @@ -321,6 +322,71 @@ def create_spark_context():
with SparkContext("local-cluster[3, 1, 1024]") as sc:
sc.range(2).foreach(lambda _: create_spark_context())

def test_python_executable_from_spark_conf(self):
# SPARK-52669: Test _get_python_exec_from_conf method directly
# Priority: PYSPARK_DRIVER_PYTHON env > PYSPARK_PYTHON env >
# spark.pyspark.driver.python > spark.pyspark.python >
# spark.executorEnv.PYSPARK_DRIVER_PYTHON > spark.executorEnv.PYSPARK_PYTHON >
# 'python3' (default)

# Backup original environment
original_env = os.environ.copy()

try:
# Clean up python-related env vars for test isolation
for key in ["PYSPARK_DRIVER_PYTHON", "PYSPARK_PYTHON"]:
if key in os.environ:
del os.environ[key]

# Test 1: spark.pyspark.driver.python takes precedence over spark.pyspark.python
conf1 = SparkConf()
conf1.set("spark.pyspark.python", "python_fallback")
conf1.set("spark.pyspark.driver.python", "python_driver")
mock_sc1 = MagicMock()
mock_sc1._conf = conf1
result1 = SparkContext._get_python_exec_from_conf(mock_sc1)
self.assertEqual(result1, "python_driver")

# Test 2: spark.pyspark.python fallback when driver.python not set
conf2 = SparkConf()
conf2.set("spark.pyspark.python", "python_shared")
mock_sc2 = MagicMock()
mock_sc2._conf = conf2
result2 = SparkContext._get_python_exec_from_conf(mock_sc2)
self.assertEqual(result2, "python_shared")

# Test 3: PYSPARK_DRIVER_PYTHON env has highest priority
os.environ["PYSPARK_DRIVER_PYTHON"] = "python_env_driver"
conf3 = SparkConf()
conf3.set("spark.pyspark.python", "python_shared")
mock_sc3 = MagicMock()
mock_sc3._conf = conf3
result3 = SparkContext._get_python_exec_from_conf(mock_sc3)
self.assertEqual(result3, "python_env_driver")
del os.environ["PYSPARK_DRIVER_PYTHON"]

# Test 4: PYSPARK_PYTHON env overrides spark config
os.environ["PYSPARK_PYTHON"] = "python_env_shared"
conf4 = SparkConf()
conf4.set("spark.pyspark.driver.python", "python_driver_conf")
mock_sc4 = MagicMock()
mock_sc4._conf = conf4
result4 = SparkContext._get_python_exec_from_conf(mock_sc4)
self.assertEqual(result4, "python_env_shared")
del os.environ["PYSPARK_PYTHON"]

# Test 5: default fallback to python3
conf5 = SparkConf()
mock_sc5 = MagicMock()
mock_sc5._conf = conf5
result5 = SparkContext._get_python_exec_from_conf(mock_sc5)
self.assertEqual(result5, "python3")

finally:
# Restore original environment
os.environ.clear()
os.environ.update(original_env)


class ContextTestsWithResources(unittest.TestCase):
def setUp(self):
Expand Down