Skip to content

SNOW-2203826: Loosen flattening rules for sort and filter#4026

Open
sfc-gh-yixie wants to merge 10 commits intomainfrom
yixie-SNOW-2203826-flatten-filter-sort-new
Open

SNOW-2203826: Loosen flattening rules for sort and filter#4026
sfc-gh-yixie wants to merge 10 commits intomainfrom
yixie-SNOW-2203826-flatten-filter-sort-new

Conversation

@sfc-gh-yixie
Copy link
Copy Markdown
Collaborator

@sfc-gh-yixie sfc-gh-yixie commented Dec 9, 2025

Some old commits in the main branch were changed so #3941 can't be merged. This PR is a recreation of #3941 with the same code change.

  1. Which Jira issue is this PR addressing? Make sure that there is an accompanying issue to your PR.

    Fixes SNOW-2203826

  2. Fill out the following pre-review checklist:

    • I am adding a new automated test(s) to verify correctness of my new code
      • If this test skips Local Testing mode, I'm requesting review from @snowflakedb/local-testing
    • I am adding new logging messages
    • I am adding a new telemetry message
    • I am adding new credentials
    • I am adding a new dependency
    • If this is a new feature/behavior, I'm adding the Local Testing parity changes.
    • I acknowledge that I have ensured my changes to be thread-safe. Follow the link for more information: Thread-safe Developer Guidelines
    • If adding any arguments to public Snowpark APIs or creating new public Snowpark APIs, I acknowledge that I have ensured my changes include AST support. Follow the link for more information: AST Support Guidelines
  3. Please describe how your code solves the related issue.

    Please write a short description of how your code change solves the related issue.

@sfc-gh-yixie sfc-gh-yixie requested review from a team as code owners December 9, 2025 19:40
@github-actions github-actions bot added the local testing Local Testing issues/PRs label Dec 9, 2025
Comment on lines +4931 to +4958
def _retrieve_aggregation_function_list(self) -> None:
"""Retrieve the list of aggregation functions which will later be used in sql simplifier."""
if (
not context._is_snowpark_connect_compatible_mode
or context._aggregation_function_set
):
return

retrieved_set = set()

for sql in [
"""select function_name from information_schema.functions where is_aggregate = 'YES'""",
"""show functions ->> select "name" from $1 where "is_aggregate" = 'Y'""",
]:
try:
retrieved_set.update({r[0].lower() for r in self.sql(sql).collect()})
except BaseException as e:
_logger.debug(
"Unable to get aggregation functions from the database: %s",
e,
)
# we raise error here as a pessimistic tactics
# the reason is that if we fail to retrieve the aggregation function list, we have empty set
# the simplifier will flatten the query which contains aggregation functions leading to incorrect results
raise

with context._aggregation_function_set_lock:
context._aggregation_function_set.update(retrieved_set)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition in _retrieve_aggregation_function_list(). The method checks if context._aggregation_function_set is populated at line 4935 without holding the lock, but only acquires the lock at line 4957 before updating. This creates a check-then-act race condition.

Issue: Multiple threads calling filter() concurrently could all see an empty set at line 4935, bypass the early return, and all execute the database queries simultaneously, leading to:

  • Multiple redundant database queries
  • Potential thread contention when updating the set
  • Wasted resources

Fix:

def _retrieve_aggregation_function_list(self) -> None:
    if not context._is_snowpark_connect_compatible_mode:
        return
    
    with context._aggregation_function_set_lock:
        # Re-check inside the lock
        if context._aggregation_function_set:
            return
        
        retrieved_set = set()
        for sql in [...]:
            # ... query logic ...
        
        context._aggregation_function_set.update(retrieved_set)
Suggested change
def _retrieve_aggregation_function_list(self) -> None:
"""Retrieve the list of aggregation functions which will later be used in sql simplifier."""
if (
not context._is_snowpark_connect_compatible_mode
or context._aggregation_function_set
):
return
retrieved_set = set()
for sql in [
"""select function_name from information_schema.functions where is_aggregate = 'YES'""",
"""show functions ->> select "name" from $1 where "is_aggregate" = 'Y'""",
]:
try:
retrieved_set.update({r[0].lower() for r in self.sql(sql).collect()})
except BaseException as e:
_logger.debug(
"Unable to get aggregation functions from the database: %s",
e,
)
# we raise error here as a pessimistic tactics
# the reason is that if we fail to retrieve the aggregation function list, we have empty set
# the simplifier will flatten the query which contains aggregation functions leading to incorrect results
raise
with context._aggregation_function_set_lock:
context._aggregation_function_set.update(retrieved_set)
def _retrieve_aggregation_function_list(self) -> None:
"""Retrieve the list of aggregation functions which will later be used in sql simplifier."""
if not context._is_snowpark_connect_compatible_mode:
return
# First check without lock for performance
if context._aggregation_function_set:
return
with context._aggregation_function_set_lock:
# Re-check inside the lock
if context._aggregation_function_set:
return
retrieved_set = set()
for sql in [
"""select function_name from information_schema.functions where is_aggregate = 'YES'""",
"""show functions ->> select "name" from $1 where "is_aggregate" = 'Y'""",
]:
try:
retrieved_set.update({r[0].lower() for r in self.sql(sql).collect()})
except BaseException as e:
_logger.debug(
"Unable to get aggregation functions from the database: %s",
e,
)
# we raise error here as a pessimistic tactics
# the reason is that if we fail to retrieve the aggregation function list, we have empty set
# the simplifier will flatten the query which contains aggregation functions leading to incorrect results
raise
context._aggregation_function_set.update(retrieved_set)

Spotted by Graphite Agent

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Copy link
Copy Markdown

@sfc-gh-pwojcik sfc-gh-pwojcik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just left some minor comments

# We can inspect whether the referenced new column uses window function. Here we are being
# conservative for now to not flatten the SQL.
return False
return context._is_snowpark_connect_compatible_mode
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm, are all of these changes are behing the _is_snowpark_connect_compatible_mode flag?

Comment on lines +2374 to +2378
# In non-connect mode, windows are treated as data generators
if not context._is_snowpark_connect_compatible_mode and isinstance(
exp, WindowExpression
):
return True
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing imo. It would be better to remove this and just do the following in has_data_generator_or_window_function_exp, since it seems like we always check for window expressions. Am I missing something here?

return _check_expressions_for_types(
        expressions, check_data_gen=True, check_window=True
    )

Comment on lines +5043 to +5044
"""select function_name from information_schema.functions where is_aggregate = 'YES'""",
"""show functions ->> select "name" from $1 where "is_aggregate" = 'Y'""",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need both of these? For udfs?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

local testing Local Testing issues/PRs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants