From 7adc78779963daff1dac2cf5af314896e2559e70 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 28 Jan 2026 18:40:58 +0000 Subject: [PATCH 1/3] Initial plan From 79141404863e02bbe13df19aa50216686856b4c0 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 28 Jan 2026 19:13:50 +0000 Subject: [PATCH 2/3] Add dataframe conversion support Co-authored-by: AzulGarza <10517170+AzulGarza@users.noreply.github.com> --- README.md | 2 +- docs/examples/dask-dataframe.ipynb | 71 +++++++++++++++++++ docs/examples/ray-dataframe.ipynb | 90 +++++++++++++++++++++++++ docs/examples/spark-dataframe.ipynb | 89 ++++++++++++++++++++++++ docs/getting-started/installation.md | 9 ++- mkdocs.yml | 27 ++++---- pyproject.toml | 5 ++ tests/utils/test_dataframes.py | 56 +++++++++++++++ timecopilot/agent.py | 20 +++--- timecopilot/forecaster.py | 8 +++ timecopilot/utils/dataframes.py | 56 +++++++++++++++ timecopilot/utils/experiment_handler.py | 3 + 12 files changed, 414 insertions(+), 22 deletions(-) create mode 100644 docs/examples/dask-dataframe.ipynb create mode 100644 docs/examples/ray-dataframe.ipynb create mode 100644 docs/examples/spark-dataframe.ipynb create mode 100644 tests/utils/test_dataframes.py create mode 100644 timecopilot/utils/dataframes.py diff --git a/README.md b/README.md index d393dccb..8a709bfc 100644 --- a/README.md +++ b/README.md @@ -141,6 +141,7 @@ from timecopilot import TimeCopilot # - unique_id: Unique identifier for each time series (string) # - ds: Date column (datetime format) # - y: Target variable for forecasting (float format) +# Spark, Ray, and Dask dataframes are accepted and converted to pandas at entry. # The pandas frequency will be inferred from the ds column, if not provided. # If the seasonality is not provided, it will be inferred based on the frequency. # If the horizon is not set, it will default to 2 times the inferred seasonality. @@ -323,4 +324,3 @@ Our pre-print paper is [available in arxiv](https://arxiv.org/abs/2509.00616). } ``` - diff --git a/docs/examples/dask-dataframe.ipynb b/docs/examples/dask-dataframe.ipynb new file mode 100644 index 00000000..61c12908 --- /dev/null +++ b/docs/examples/dask-dataframe.ipynb @@ -0,0 +1,71 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Using Dask DataFrames\n", + "\n", + "TimeCopilot converts Dask DataFrames to pandas at entry so you can use them directly.\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "execution_count": null, + "outputs": [], + "source": [ + "import pandas as pd\n", + "import dask.dataframe as dd\n", + "\n", + "from timecopilot import TimeCopilotForecaster\n", + "from timecopilot.models.stats import SeasonalNaive\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "execution_count": null, + "outputs": [], + "source": [ + "df = pd.read_csv(\n", + " \"https://timecopilot.s3.amazonaws.com/public/data/air_passengers.csv\",\n", + " parse_dates=[\"ds\"],\n", + ")\n", + "dask_df = dd.from_pandas(df, npartitions=1)\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "execution_count": null, + "outputs": [], + "source": [ + "tcf = TimeCopilotForecaster(models=[SeasonalNaive()])\n", + "fcst_df = tcf.forecast(df=dask_df, h=12, freq=\"MS\")\n", + "fcst_df.head()\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.12" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/docs/examples/ray-dataframe.ipynb b/docs/examples/ray-dataframe.ipynb new file mode 100644 index 00000000..6b544390 --- /dev/null +++ b/docs/examples/ray-dataframe.ipynb @@ -0,0 +1,90 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Using Ray Datasets\n", + "\n", + "TimeCopilot converts Ray datasets to pandas at entry so you can use them directly.\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "execution_count": null, + "outputs": [], + "source": [ + "import pandas as pd\n", + "import ray\n", + "import ray.data as ray_data\n", + "\n", + "from timecopilot import TimeCopilotForecaster\n", + "from timecopilot.models.stats import SeasonalNaive\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "execution_count": null, + "outputs": [], + "source": [ + "ray.init(ignore_reinit_error=True)\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "execution_count": null, + "outputs": [], + "source": [ + "df = pd.read_csv(\n", + " \"https://timecopilot.s3.amazonaws.com/public/data/air_passengers.csv\",\n", + " parse_dates=[\"ds\"],\n", + ")\n", + "ray_df = ray_data.from_pandas(df)\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "execution_count": null, + "outputs": [], + "source": [ + "tcf = TimeCopilotForecaster(models=[SeasonalNaive()])\n", + "fcst_df = tcf.forecast(df=ray_df, h=12, freq=\"MS\")\n", + "fcst_df.head()\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "execution_count": null, + "outputs": [], + "source": [ + "ray.shutdown()\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.12" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/docs/examples/spark-dataframe.ipynb b/docs/examples/spark-dataframe.ipynb new file mode 100644 index 00000000..6af9157d --- /dev/null +++ b/docs/examples/spark-dataframe.ipynb @@ -0,0 +1,89 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Using Spark DataFrames\n", + "\n", + "TimeCopilot converts Spark DataFrames to pandas at entry so you can use them directly.\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "execution_count": null, + "outputs": [], + "source": [ + "import pandas as pd\n", + "from pyspark.sql import SparkSession\n", + "\n", + "from timecopilot import TimeCopilotForecaster\n", + "from timecopilot.models.stats import SeasonalNaive\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "execution_count": null, + "outputs": [], + "source": [ + "spark = SparkSession.builder.master(\"local[1]\").appName(\"timecopilot\").getOrCreate()\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "execution_count": null, + "outputs": [], + "source": [ + "df = pd.read_csv(\n", + " \"https://timecopilot.s3.amazonaws.com/public/data/air_passengers.csv\",\n", + " parse_dates=[\"ds\"],\n", + ")\n", + "spark_df = spark.createDataFrame(df)\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "execution_count": null, + "outputs": [], + "source": [ + "tcf = TimeCopilotForecaster(models=[SeasonalNaive()])\n", + "fcst_df = tcf.forecast(df=spark_df, h=12, freq=\"MS\")\n", + "fcst_df.head()\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "execution_count": null, + "outputs": [], + "source": [ + "spark.stop()\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.12" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/docs/getting-started/installation.md b/docs/getting-started/installation.md index d2468548..2579f384 100644 --- a/docs/getting-started/installation.md +++ b/docs/getting-started/installation.md @@ -14,10 +14,17 @@ TimeCopilot is available on PyPI as [timecopilot](https://pypi.org/project/timec uv add timecopilot ``` +!!! tip + + Optional dataframe dependencies (Spark, Ray, Dask) can be installed with: + + ```bash + uv sync --group dataframes + ``` + Requires Python 3.10 or later. !!! tip If you don't have a prior experience with `uv`, go to [uv getting started](https://docs.astral.sh/uv/getting-started/) section. - diff --git a/mkdocs.yml b/mkdocs.yml index 385147d4..ed7b2c10 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -18,18 +18,21 @@ nav: - Introduction: getting-started/introduction.md - Quickstart: getting-started/quickstart.md - Installation: getting-started/installation.md - - Examples: - - examples/agent-quickstart.ipynb - - examples/llm-providers.ipynb - - examples/aws-bedrock.ipynb - - examples/google-llms.ipynb - - examples/forecaster-quickstart.ipynb - - examples/anomaly-detection-forecaster-quickstart.ipynb - - examples/ts-foundation-models-comparison-quickstart.ipynb - - examples/gift-eval.ipynb - - examples/chronos-family.ipynb - - examples/cryptocurrency-quickstart.ipynb - - examples/sktime.ipynb + - Examples: + - examples/agent-quickstart.ipynb + - examples/llm-providers.ipynb + - examples/aws-bedrock.ipynb + - examples/google-llms.ipynb + - examples/forecaster-quickstart.ipynb + - examples/anomaly-detection-forecaster-quickstart.ipynb + - examples/ts-foundation-models-comparison-quickstart.ipynb + - examples/gift-eval.ipynb + - examples/chronos-family.ipynb + - examples/cryptocurrency-quickstart.ipynb + - examples/sktime.ipynb + - examples/spark-dataframe.ipynb + - examples/ray-dataframe.ipynb + - examples/dask-dataframe.ipynb - Experiments: - experiments/gift-eval.md - experiments/fev.md diff --git a/pyproject.toml b/pyproject.toml index e995d9db..af0af3af 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,11 @@ docs = [ "modal>=1.0.4", "ruff>=0.12.1", ] +dataframes = [ + "dask[dataframe]>=2024.9.1", + "pyspark>=3.5.1", + "ray[data]>=2.52.1", +] [project] authors = [ diff --git a/tests/utils/test_dataframes.py b/tests/utils/test_dataframes.py new file mode 100644 index 00000000..e407d40d --- /dev/null +++ b/tests/utils/test_dataframes.py @@ -0,0 +1,56 @@ +import pandas as pd +import pytest + +from timecopilot.utils.dataframes import to_pandas + + +def _make_frame() -> pd.DataFrame: + dates = pd.date_range("2024-01-01", periods=3, freq="D") + return pd.DataFrame( + { + "unique_id": ["series_1"] * len(dates), + "ds": dates, + "y": [1.0, 2.0, 3.0], + } + ) + + +def test_to_pandas_with_pandas(): + df = _make_frame() + assert to_pandas(df) is df + + +def test_to_pandas_with_dask(): + dd = pytest.importorskip("dask.dataframe") + df = _make_frame() + dask_df = dd.from_pandas(df, npartitions=1) + out = to_pandas(dask_df) + pd.testing.assert_frame_equal(out, df) + + +def test_to_pandas_with_ray(): + ray = pytest.importorskip("ray") + ray_data = pytest.importorskip("ray.data") + if not ray.is_initialized(): + ray.init(ignore_reinit_error=True) + try: + df = _make_frame() + ray_df = ray_data.from_pandas(df) + out = to_pandas(ray_df) + pd.testing.assert_frame_equal(out.sort_values("ds").reset_index(drop=True), df) + finally: + ray.shutdown() + + +def test_to_pandas_with_spark(): + pyspark_sql = pytest.importorskip("pyspark.sql") + spark = pyspark_sql.SparkSession.builder.master("local[1]").appName( + "timecopilot-tests" + ).getOrCreate() + try: + df = _make_frame() + spark_df = spark.createDataFrame(df) + out = to_pandas(spark_df) + pd.testing.assert_frame_equal(out.sort_values("ds").reset_index(drop=True), df) + finally: + spark.stop() diff --git a/timecopilot/agent.py b/timecopilot/agent.py index 059cf832..231f435e 100644 --- a/timecopilot/agent.py +++ b/timecopilot/agent.py @@ -1195,8 +1195,9 @@ def analyze( Args: df: The time-series data. Can be one of: - - a *pandas* `DataFrame` with at least the columns - `["unique_id", "ds", "y"]`. + - a *pandas*, Spark, Ray, or Dask dataframe with at least the + columns `["unique_id", "ds", "y"]`. Non-pandas dataframes are + converted to pandas at entry. - a file path or URL pointing to a CSV / Parquet file with the same columns (it will be read automatically). h: Forecast horizon. Number of future periods to predict. If @@ -1258,8 +1259,9 @@ def forecast( Args: df: The time-series data. Can be one of: - - a *pandas* `DataFrame` with at least the columns - `["unique_id", "ds", "y"]`. + - a *pandas*, Spark, Ray, or Dask dataframe with at least the + columns `["unique_id", "ds", "y"]`. Non-pandas dataframes are + converted to pandas at entry. - a file path or URL pointing to a CSV / Parquet file with the same columns (it will be read automatically). h: Forecast horizon. Number of future periods to predict. If @@ -1414,8 +1416,9 @@ async def analyze( Args: df: The time-series data. Can be one of: - - a *pandas* `DataFrame` with at least the columns - `["unique_id", "ds", "y"]`. + - a *pandas*, Spark, Ray, or Dask dataframe with at least the + columns `["unique_id", "ds", "y"]`. Non-pandas dataframes are + converted to pandas at entry. - You must always work with time series data with the columns ds (date) and y (target value), if these are missing, attempt to infer them from similar column names or, if unsure, request @@ -1488,8 +1491,9 @@ async def forecast( Args: df: The time-series data. Can be one of: - - a *pandas* `DataFrame` with at least the columns - `["unique_id", "ds", "y"]`. + - a *pandas*, Spark, Ray, or Dask dataframe with at least the + columns `["unique_id", "ds", "y"]`. Non-pandas dataframes are + converted to pandas at entry. - You must always work with time series data with the columns ds (date) and y (target value), if these are missing, attempt to infer them from similar column names or, if unsure, request diff --git a/timecopilot/forecaster.py b/timecopilot/forecaster.py index a27ac5f4..33b22d50 100644 --- a/timecopilot/forecaster.py +++ b/timecopilot/forecaster.py @@ -1,6 +1,7 @@ import pandas as pd from .models.utils.forecaster import Forecaster +from .utils.dataframes import to_pandas class TimeCopilotForecaster(Forecaster): @@ -73,6 +74,7 @@ def _call_models( quantiles: list[float] | None, **kwargs, ) -> pd.DataFrame: + df = to_pandas(df) # infer just once to avoid multiple calls to _maybe_infer_freq freq = self._maybe_infer_freq(df, freq) res_df: pd.DataFrame | None = None @@ -141,6 +143,8 @@ def forecast( - "unique_id": an ID column to distinguish multiple series. - "ds": a time column indicating timestamps or periods. - "y": a target column with the observed values. + Spark, Ray, and Dask dataframes are accepted and converted to + pandas at entry. h (int): Forecast horizon specifying how many future steps to predict. @@ -207,6 +211,8 @@ def cross_validation( - "unique_id": an ID column to distinguish multiple series. - "ds": a time column indicating timestamps or periods. - "y": a target column with the observed values. + Spark, Ray, and Dask dataframes are accepted and converted to + pandas at entry. h (int): Forecast horizon specifying how many future steps to predict in @@ -279,6 +285,8 @@ def detect_anomalies( Args: df (pd.DataFrame): DataFrame containing the time series to detect anomalies. + Spark, Ray, and Dask dataframes are accepted and converted to + pandas at entry. h (int, optional): Forecast horizon specifying how many future steps to predict. In each cross validation window. If not provided, the seasonality diff --git a/timecopilot/utils/dataframes.py b/timecopilot/utils/dataframes.py new file mode 100644 index 00000000..b48fcd3c --- /dev/null +++ b/timecopilot/utils/dataframes.py @@ -0,0 +1,56 @@ +from typing import Any + +import pandas as pd + + +def to_pandas(df: Any) -> pd.DataFrame: + """Convert supported dataframe inputs to pandas.""" + if isinstance(df, pd.DataFrame): + return df + + spark_df = _maybe_spark_to_pandas(df) + if spark_df is not None: + return spark_df + + ray_df = _maybe_ray_to_pandas(df) + if ray_df is not None: + return ray_df + + dask_df = _maybe_dask_to_pandas(df) + if dask_df is not None: + return dask_df + + raise TypeError( + "Unsupported dataframe type. Provide a pandas DataFrame or a compatible " + "Spark, Ray, or Dask dataframe." + ) + + +def _maybe_spark_to_pandas(df: Any) -> pd.DataFrame | None: + try: + from pyspark.sql import DataFrame as SparkDataFrame + except ImportError: + return None + if isinstance(df, SparkDataFrame): + return df.toPandas() + return None + + +def _maybe_ray_to_pandas(df: Any) -> pd.DataFrame | None: + try: + from ray.data import Dataset + except ImportError: + return None + if isinstance(df, Dataset): + return df.to_pandas() + return None + + +def _maybe_dask_to_pandas(df: Any) -> pd.DataFrame | None: + try: + import dask.dataframe as dd + except ImportError: + return None + if isinstance(df, dd.DataFrame): + return df.compute() + return None diff --git a/timecopilot/utils/experiment_handler.py b/timecopilot/utils/experiment_handler.py index 5e9bc4cb..9e869e9b 100644 --- a/timecopilot/utils/experiment_handler.py +++ b/timecopilot/utils/experiment_handler.py @@ -17,6 +17,7 @@ maybe_convert_col_to_datetime, maybe_infer_freq, ) +from .dataframes import to_pandas warnings.simplefilter( action="ignore", @@ -103,6 +104,8 @@ def read_df(path: str | Path) -> pd.DataFrame: def _validate_df(df: pd.DataFrame | str | Path) -> pd.DataFrame: if isinstance(df, str | Path): df = ExperimentDatasetParser.read_df(df) + else: + df = to_pandas(df) if "unique_id" not in df.columns: df["unique_id"] = "series_0" return maybe_convert_col_to_datetime(df, "ds") From e3024586cde90c331c143d14165a8b4cab5fc337 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 28 Jan 2026 19:18:22 +0000 Subject: [PATCH 3/3] Refine dataframe support docs/tests Co-authored-by: AzulGarza <10517170+AzulGarza@users.noreply.github.com> --- docs/getting-started/installation.md | 6 ++++ mkdocs.yml | 30 +++++++++---------- pyproject.toml | 5 ++++ tests/utils/test_dataframes.py | 10 +++++-- timecopilot/utils/dataframes.py | 45 ++++++++++++++++++++-------- 5 files changed, 66 insertions(+), 30 deletions(-) diff --git a/docs/getting-started/installation.md b/docs/getting-started/installation.md index 2579f384..69caaa2b 100644 --- a/docs/getting-started/installation.md +++ b/docs/getting-started/installation.md @@ -18,6 +18,12 @@ TimeCopilot is available on PyPI as [timecopilot](https://pypi.org/project/timec Optional dataframe dependencies (Spark, Ray, Dask) can be installed with: + ```bash + pip install "timecopilot[dataframes]" + ``` + + or + ```bash uv sync --group dataframes ``` diff --git a/mkdocs.yml b/mkdocs.yml index ed7b2c10..8989a63f 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -18,21 +18,21 @@ nav: - Introduction: getting-started/introduction.md - Quickstart: getting-started/quickstart.md - Installation: getting-started/installation.md - - Examples: - - examples/agent-quickstart.ipynb - - examples/llm-providers.ipynb - - examples/aws-bedrock.ipynb - - examples/google-llms.ipynb - - examples/forecaster-quickstart.ipynb - - examples/anomaly-detection-forecaster-quickstart.ipynb - - examples/ts-foundation-models-comparison-quickstart.ipynb - - examples/gift-eval.ipynb - - examples/chronos-family.ipynb - - examples/cryptocurrency-quickstart.ipynb - - examples/sktime.ipynb - - examples/spark-dataframe.ipynb - - examples/ray-dataframe.ipynb - - examples/dask-dataframe.ipynb + - Examples: + - examples/agent-quickstart.ipynb + - examples/llm-providers.ipynb + - examples/aws-bedrock.ipynb + - examples/google-llms.ipynb + - examples/forecaster-quickstart.ipynb + - examples/anomaly-detection-forecaster-quickstart.ipynb + - examples/ts-foundation-models-comparison-quickstart.ipynb + - examples/gift-eval.ipynb + - examples/chronos-family.ipynb + - examples/cryptocurrency-quickstart.ipynb + - examples/sktime.ipynb + - examples/spark-dataframe.ipynb + - examples/ray-dataframe.ipynb + - examples/dask-dataframe.ipynb - Experiments: - experiments/gift-eval.md - experiments/fev.md diff --git a/pyproject.toml b/pyproject.toml index af0af3af..18028663 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -79,6 +79,11 @@ dependencies = [ "tsfeatures", "utilsforecast[plotting]>=0.2.15", ] +optional-dependencies = { dataframes = [ + "dask[dataframe]>=2024.9.1", + "pyspark>=3.5.1", + "ray[data]>=2.52.1", +] } description = "The GenAI Forecasting Agent · LLMs × Time Series Foundation Models" license = "MIT" name = "timecopilot" diff --git a/tests/utils/test_dataframes.py b/tests/utils/test_dataframes.py index e407d40d..1cb733e9 100644 --- a/tests/utils/test_dataframes.py +++ b/tests/utils/test_dataframes.py @@ -31,8 +31,9 @@ def test_to_pandas_with_dask(): def test_to_pandas_with_ray(): ray = pytest.importorskip("ray") ray_data = pytest.importorskip("ray.data") - if not ray.is_initialized(): - ray.init(ignore_reinit_error=True) + if ray.is_initialized(): + ray.shutdown() + ray.init(ignore_reinit_error=True) try: df = _make_frame() ray_df = ray_data.from_pandas(df) @@ -54,3 +55,8 @@ def test_to_pandas_with_spark(): pd.testing.assert_frame_equal(out.sort_values("ds").reset_index(drop=True), df) finally: spark.stop() + + +def test_to_pandas_unsupported_type(): + with pytest.raises(TypeError, match="Unsupported dataframe type"): + to_pandas(["not", "a", "dataframe"]) diff --git a/timecopilot/utils/dataframes.py b/timecopilot/utils/dataframes.py index b48fcd3c..36f870cd 100644 --- a/timecopilot/utils/dataframes.py +++ b/timecopilot/utils/dataframes.py @@ -1,28 +1,47 @@ -from typing import Any +from typing import TYPE_CHECKING, TypeAlias import pandas as pd +if TYPE_CHECKING: + import dask.dataframe as dd + from pyspark.sql import DataFrame as SparkDataFrame + from ray.data import Dataset -def to_pandas(df: Any) -> pd.DataFrame: - """Convert supported dataframe inputs to pandas.""" + DataFrameLike: TypeAlias = pd.DataFrame | SparkDataFrame | Dataset | dd.DataFrame +else: + DataFrameLike: TypeAlias = pd.DataFrame + + +def to_pandas(df: DataFrameLike) -> pd.DataFrame: + """Convert supported dataframe inputs (Spark, Ray, Dask) to pandas. + + Returns the input unchanged if it is already a pandas DataFrame. Conversion + collects the data into memory and may be expensive. Raises a TypeError for + unsupported dataframe types. + """ if isinstance(df, pd.DataFrame): return df - spark_df = _maybe_spark_to_pandas(df) - if spark_df is not None: - return spark_df + module = type(df).__module__ + if module.startswith("pyspark"): + spark_df = _maybe_spark_to_pandas(df) + if spark_df is not None: + return spark_df - ray_df = _maybe_ray_to_pandas(df) - if ray_df is not None: - return ray_df + if module.startswith("ray"): + ray_df = _maybe_ray_to_pandas(df) + if ray_df is not None: + return ray_df - dask_df = _maybe_dask_to_pandas(df) - if dask_df is not None: - return dask_df + if module.startswith("dask"): + dask_df = _maybe_dask_to_pandas(df) + if dask_df is not None: + return dask_df raise TypeError( "Unsupported dataframe type. Provide a pandas DataFrame or a compatible " - "Spark, Ray, or Dask dataframe." + "Spark, Ray, or Dask dataframe. Install optional dependencies with " + "`pip install \"timecopilot[dataframes]\"` or `uv sync --group dataframes`." )