SNOW-2203826: Loosen flattening rules for sort and filter#4026
SNOW-2203826: Loosen flattening rules for sort and filter#4026sfc-gh-yixie wants to merge 10 commits intomainfrom
Conversation
| def _retrieve_aggregation_function_list(self) -> None: | ||
| """Retrieve the list of aggregation functions which will later be used in sql simplifier.""" | ||
| if ( | ||
| not context._is_snowpark_connect_compatible_mode | ||
| or context._aggregation_function_set | ||
| ): | ||
| return | ||
|
|
||
| retrieved_set = set() | ||
|
|
||
| for sql in [ | ||
| """select function_name from information_schema.functions where is_aggregate = 'YES'""", | ||
| """show functions ->> select "name" from $1 where "is_aggregate" = 'Y'""", | ||
| ]: | ||
| try: | ||
| retrieved_set.update({r[0].lower() for r in self.sql(sql).collect()}) | ||
| except BaseException as e: | ||
| _logger.debug( | ||
| "Unable to get aggregation functions from the database: %s", | ||
| e, | ||
| ) | ||
| # we raise error here as a pessimistic tactics | ||
| # the reason is that if we fail to retrieve the aggregation function list, we have empty set | ||
| # the simplifier will flatten the query which contains aggregation functions leading to incorrect results | ||
| raise | ||
|
|
||
| with context._aggregation_function_set_lock: | ||
| context._aggregation_function_set.update(retrieved_set) |
There was a problem hiding this comment.
Race condition in _retrieve_aggregation_function_list(). The method checks if context._aggregation_function_set is populated at line 4935 without holding the lock, but only acquires the lock at line 4957 before updating. This creates a check-then-act race condition.
Issue: Multiple threads calling filter() concurrently could all see an empty set at line 4935, bypass the early return, and all execute the database queries simultaneously, leading to:
- Multiple redundant database queries
- Potential thread contention when updating the set
- Wasted resources
Fix:
def _retrieve_aggregation_function_list(self) -> None:
if not context._is_snowpark_connect_compatible_mode:
return
with context._aggregation_function_set_lock:
# Re-check inside the lock
if context._aggregation_function_set:
return
retrieved_set = set()
for sql in [...]:
# ... query logic ...
context._aggregation_function_set.update(retrieved_set)| def _retrieve_aggregation_function_list(self) -> None: | |
| """Retrieve the list of aggregation functions which will later be used in sql simplifier.""" | |
| if ( | |
| not context._is_snowpark_connect_compatible_mode | |
| or context._aggregation_function_set | |
| ): | |
| return | |
| retrieved_set = set() | |
| for sql in [ | |
| """select function_name from information_schema.functions where is_aggregate = 'YES'""", | |
| """show functions ->> select "name" from $1 where "is_aggregate" = 'Y'""", | |
| ]: | |
| try: | |
| retrieved_set.update({r[0].lower() for r in self.sql(sql).collect()}) | |
| except BaseException as e: | |
| _logger.debug( | |
| "Unable to get aggregation functions from the database: %s", | |
| e, | |
| ) | |
| # we raise error here as a pessimistic tactics | |
| # the reason is that if we fail to retrieve the aggregation function list, we have empty set | |
| # the simplifier will flatten the query which contains aggregation functions leading to incorrect results | |
| raise | |
| with context._aggregation_function_set_lock: | |
| context._aggregation_function_set.update(retrieved_set) | |
| def _retrieve_aggregation_function_list(self) -> None: | |
| """Retrieve the list of aggregation functions which will later be used in sql simplifier.""" | |
| if not context._is_snowpark_connect_compatible_mode: | |
| return | |
| # First check without lock for performance | |
| if context._aggregation_function_set: | |
| return | |
| with context._aggregation_function_set_lock: | |
| # Re-check inside the lock | |
| if context._aggregation_function_set: | |
| return | |
| retrieved_set = set() | |
| for sql in [ | |
| """select function_name from information_schema.functions where is_aggregate = 'YES'""", | |
| """show functions ->> select "name" from $1 where "is_aggregate" = 'Y'""", | |
| ]: | |
| try: | |
| retrieved_set.update({r[0].lower() for r in self.sql(sql).collect()}) | |
| except BaseException as e: | |
| _logger.debug( | |
| "Unable to get aggregation functions from the database: %s", | |
| e, | |
| ) | |
| # we raise error here as a pessimistic tactics | |
| # the reason is that if we fail to retrieve the aggregation function list, we have empty set | |
| # the simplifier will flatten the query which contains aggregation functions leading to incorrect results | |
| raise | |
| context._aggregation_function_set.update(retrieved_set) | |
Spotted by Graphite Agent
Is this helpful? React 👍 or 👎 to let us know.
…atten-filter-sort-new
sfc-gh-pwojcik
left a comment
There was a problem hiding this comment.
LGTM, just left some minor comments
| # We can inspect whether the referenced new column uses window function. Here we are being | ||
| # conservative for now to not flatten the SQL. | ||
| return False | ||
| return context._is_snowpark_connect_compatible_mode |
There was a problem hiding this comment.
Just to confirm, are all of these changes are behing the _is_snowpark_connect_compatible_mode flag?
| # In non-connect mode, windows are treated as data generators | ||
| if not context._is_snowpark_connect_compatible_mode and isinstance( | ||
| exp, WindowExpression | ||
| ): | ||
| return True |
There was a problem hiding this comment.
This is confusing imo. It would be better to remove this and just do the following in has_data_generator_or_window_function_exp, since it seems like we always check for window expressions. Am I missing something here?
return _check_expressions_for_types(
expressions, check_data_gen=True, check_window=True
)
| """select function_name from information_schema.functions where is_aggregate = 'YES'""", | ||
| """show functions ->> select "name" from $1 where "is_aggregate" = 'Y'""", |
There was a problem hiding this comment.
Why do we need both of these? For udfs?
Some old commits in the main branch were changed so #3941 can't be merged. This PR is a recreation of #3941 with the same code change.
Which Jira issue is this PR addressing? Make sure that there is an accompanying issue to your PR.
Fixes SNOW-2203826
Fill out the following pre-review checklist:
Please describe how your code solves the related issue.
Please write a short description of how your code change solves the related issue.