diff --git a/python/pyspark/core/context.py b/python/pyspark/core/context.py index 8bdea00dad4e..2cd0dafde875 100644 --- a/python/pyspark/core/context.py +++ b/python/pyspark/core/context.py @@ -339,7 +339,7 @@ def _do_init( ) os.environ["SPARK_BUFFER_SIZE"] = str(self._jvm.PythonUtils.getSparkBufferSize(self._jsc)) - self.pythonExec = os.environ.get("PYSPARK_PYTHON", "python3") + self.pythonExec = self._get_python_exec_from_conf() self.pythonVer = "%d.%d" % sys.version_info[:2] # Broadcast's __reduce__ method stores Broadcast instances here. @@ -416,6 +416,51 @@ def signal_handler(signal: Any, frame: Any) -> NoReturn: ): signal.signal(signal.SIGINT, signal_handler) + def _get_python_exec_from_conf(self) -> str: + """ + Determine the Python executable to use for the driver. + + The priority order (highest to lowest): + 1. PYSPARK_DRIVER_PYTHON environment variable + 2. PYSPARK_PYTHON environment variable + 3. spark.pyspark.driver.python configuration + 4. spark.pyspark.python configuration + 5. spark.executorEnv.PYSPARK_DRIVER_PYTHON configuration + 6. spark.executorEnv.PYSPARK_PYTHON configuration + 7. Default: 'python3' + + This ensures driver and executor Python versions are consistent, + especially in client mode where launch_container.sh may not run. + + Returns + ------- + str + The path to the Python executable. + """ + # Check environment variables first (highest priority) + env_driver_python = os.environ.get("PYSPARK_DRIVER_PYTHON") + if env_driver_python and env_driver_python.strip(): + return env_driver_python.strip() + + env_pyspark_python = os.environ.get("PYSPARK_PYTHON") + if env_pyspark_python and env_pyspark_python.strip(): + return env_pyspark_python.strip() + + # Fall back to Spark configuration keys + config_keys = [ + "spark.pyspark.driver.python", + "spark.pyspark.python", + "spark.executorEnv.PYSPARK_DRIVER_PYTHON", + "spark.executorEnv.PYSPARK_PYTHON", + ] + for key in config_keys: + python_exec = self._conf.get(key, None) + if python_exec is not None and python_exec.strip() != "": + return python_exec.strip() + + # Default fallback + return "python3" + def __repr__(self) -> str: return "".format( master=self.master, diff --git a/python/pyspark/tests/test_context.py b/python/pyspark/tests/test_context.py index 66bb0ebdd097..9d199711aec4 100644 --- a/python/pyspark/tests/test_context.py +++ b/python/pyspark/tests/test_context.py @@ -26,6 +26,7 @@ from pyspark import SparkConf, SparkFiles, SparkContext from pyspark.testing.sqlutils import SPARK_HOME from pyspark.testing.utils import ReusedPySparkTestCase, PySparkTestCase, QuietTest +from unittest.mock import MagicMock class CheckpointTests(ReusedPySparkTestCase): @@ -321,6 +322,71 @@ def create_spark_context(): with SparkContext("local-cluster[3, 1, 1024]") as sc: sc.range(2).foreach(lambda _: create_spark_context()) + def test_python_executable_from_spark_conf(self): + # SPARK-52669: Test _get_python_exec_from_conf method directly + # Priority: PYSPARK_DRIVER_PYTHON env > PYSPARK_PYTHON env > + # spark.pyspark.driver.python > spark.pyspark.python > + # spark.executorEnv.PYSPARK_DRIVER_PYTHON > spark.executorEnv.PYSPARK_PYTHON > + # 'python3' (default) + + # Backup original environment + original_env = os.environ.copy() + + try: + # Clean up python-related env vars for test isolation + for key in ["PYSPARK_DRIVER_PYTHON", "PYSPARK_PYTHON"]: + if key in os.environ: + del os.environ[key] + + # Test 1: spark.pyspark.driver.python takes precedence over spark.pyspark.python + conf1 = SparkConf() + conf1.set("spark.pyspark.python", "python_fallback") + conf1.set("spark.pyspark.driver.python", "python_driver") + mock_sc1 = MagicMock() + mock_sc1._conf = conf1 + result1 = SparkContext._get_python_exec_from_conf(mock_sc1) + self.assertEqual(result1, "python_driver") + + # Test 2: spark.pyspark.python fallback when driver.python not set + conf2 = SparkConf() + conf2.set("spark.pyspark.python", "python_shared") + mock_sc2 = MagicMock() + mock_sc2._conf = conf2 + result2 = SparkContext._get_python_exec_from_conf(mock_sc2) + self.assertEqual(result2, "python_shared") + + # Test 3: PYSPARK_DRIVER_PYTHON env has highest priority + os.environ["PYSPARK_DRIVER_PYTHON"] = "python_env_driver" + conf3 = SparkConf() + conf3.set("spark.pyspark.python", "python_shared") + mock_sc3 = MagicMock() + mock_sc3._conf = conf3 + result3 = SparkContext._get_python_exec_from_conf(mock_sc3) + self.assertEqual(result3, "python_env_driver") + del os.environ["PYSPARK_DRIVER_PYTHON"] + + # Test 4: PYSPARK_PYTHON env overrides spark config + os.environ["PYSPARK_PYTHON"] = "python_env_shared" + conf4 = SparkConf() + conf4.set("spark.pyspark.driver.python", "python_driver_conf") + mock_sc4 = MagicMock() + mock_sc4._conf = conf4 + result4 = SparkContext._get_python_exec_from_conf(mock_sc4) + self.assertEqual(result4, "python_env_shared") + del os.environ["PYSPARK_PYTHON"] + + # Test 5: default fallback to python3 + conf5 = SparkConf() + mock_sc5 = MagicMock() + mock_sc5._conf = conf5 + result5 = SparkContext._get_python_exec_from_conf(mock_sc5) + self.assertEqual(result5, "python3") + + finally: + # Restore original environment + os.environ.clear() + os.environ.update(original_env) + class ContextTestsWithResources(unittest.TestCase): def setUp(self):