Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion src/snowflake/snowpark/mock/_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
from snowflake.snowpark._internal.analyzer.unary_plan_node import (
Aggregate,
CreateViewCommand,
Filter,
Pivot,
Sample,
Project,
Expand Down Expand Up @@ -1082,6 +1083,18 @@ def execute_mock_plan(
return table
if isinstance(source_plan, MockSelectExecutionPlan):
return execute_mock_plan(source_plan.execution_plan, expr_to_alias)
if isinstance(source_plan, Filter):
child_df = execute_mock_plan(source_plan.child, expr_to_alias)
if child_df is None:
return TableEmulator()
condition = calculate_expression(
source_plan.condition, child_df, analyzer, expr_to_alias
)
filtered = child_df[condition.fillna(value=False)]
# Fix issue #4076: same pandas empty-df indexing edge case as MockSelectStatement where.
if len(filtered.columns) == 0 and len(child_df.columns) > 0:
filtered = child_df.iloc[0:0].copy()
return filtered
if isinstance(source_plan, MockSelectStatement):
projection: Optional[List[Expression]] = source_plan.projection or []
from_: Optional[MockSelectable] = source_plan.from_
Expand Down Expand Up @@ -1161,7 +1174,16 @@ def execute_mock_plan(

if where:
condition = calculate_expression(where, result_df, analyzer, expr_to_alias)
result_df = result_df[condition.fillna(value=False)]
filtered_result = result_df[condition.fillna(value=False)]
# Fix issue #4076: pandas df[boolean_mask] on empty DataFrame can return
# 0 columns (schemaless) due to index-alignment edge case. Row filtering
# should always preserve schema. We use iloc[0:0] when this happens.
if (
len(filtered_result.columns) == 0
and len(result_df.columns) > 0
):
filtered_result = result_df.iloc[0:0].copy()
result_df = filtered_result

if order_by:
result_df = handle_order_by_clause(
Expand Down
81 changes: 81 additions & 0 deletions tests/mock/test_empty_dataframe_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#
# Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved.
#
"""Tests for issue #4076: Filter on empty dataframe results in schemaless dataframe."""

from snowflake.snowpark.functions import col, lit
from snowflake.snowpark.types import IntegerType, StringType, StructField, StructType


def test_filter_empty_dataframe_preserves_schema(session):
"""Filter on empty dataframe should preserve schema (issue #4076)."""
empty_schema = StructType([
StructField("entry", StringType(), True),
StructField("file_path", StringType(), True),
])
mock_encounters_df = session.create_dataframe([], schema=empty_schema)

# Before filter: should have 2 columns
assert len(mock_encounters_df.schema.fields) == 2
assert [f.name for f in mock_encounters_df.schema.fields] == ["ENTRY", "FILE_PATH"]

# After filter: should preserve the same schema
filtered_df = mock_encounters_df.filter(col("entry").is_null())
assert len(filtered_df.schema.fields) == 2, (
"Filtered empty dataframe should retain schema (ENTRY, FILE_PATH)"
)
assert [f.name for f in filtered_df.schema.fields] == ["ENTRY", "FILE_PATH"]

# Both should be empty
assert mock_encounters_df.count() == 0
assert filtered_df.count() == 0


def test_filter_empty_dataframe_various_conditions(session):
"""Various filter conditions on empty dataframe should all preserve schema."""
empty_schema = StructType([
StructField("entry", StringType(), True),
StructField("file_path", StringType(), True),
])
df = session.create_dataframe([], schema=empty_schema)

# is_null
assert len(df.filter(col("entry").is_null()).schema.fields) == 2

# is_not_null
assert len(df.filter(col("entry").is_not_null()).schema.fields) == 2

# equals
assert len(df.filter(col("entry") == lit("x")).schema.fields) == 2

# not equals
assert len(df.filter(col("entry") != lit("x")).schema.fields) == 2

# Chain filters
chained = df.filter(col("entry").is_null()).filter(col("file_path").is_null())
assert len(chained.schema.fields) == 2


def test_select_lit_filter_preserves_projection_not_source(session):
"""select(lit(1)).filter() must keep projected columns only, not original schema."""
schema = StructType([
StructField("a", IntegerType()),
StructField("b", StringType()),
])
df = session.create_dataframe([], schema=schema)
result = df.select(lit(1).alias("x")).filter(col("x") > 0)
assert len(result.schema.fields) == 1
assert result.schema.fields[0].name == "X"


def test_filter_empty_dataframe_with_three_columns(session):
"""Filter on empty dataframe with more columns preserves schema."""
schema = StructType([
StructField("a", IntegerType()),
StructField("b", StringType()),
StructField("c", IntegerType()),
])
df = session.create_dataframe([], schema=schema)
filtered = df.filter(col("a") > 0)
assert len(filtered.schema.fields) == 3
assert [f.name for f in filtered.schema.fields] == ["A", "B", "C"]
Loading