From fe1fd5367bec6f1acc963b0c0894fa697344c0f1 Mon Sep 17 00:00:00 2001 From: pritishpai Date: Fri, 27 Feb 2026 14:03:06 -0500 Subject: [PATCH 1/8] Add common pagination utilities for raw API calls Introduce paginated_fetch_offset (SCIM startIndex/count) and paginated_fetch_cursor (token-based) helpers in framework/utils.py to eliminate duplicated pagination boilerplate across the codebase. --- src/databricks/labs/ucx/framework/utils.py | 56 +++++++++ tests/unit/framework/test_utils.py | 140 ++++++++++++++++++++- 2 files changed, 195 insertions(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/framework/utils.py b/src/databricks/labs/ucx/framework/utils.py index d428447911..6e6815e057 100644 --- a/src/databricks/labs/ucx/framework/utils.py +++ b/src/databricks/labs/ucx/framework/utils.py @@ -1,5 +1,7 @@ import logging import subprocess +from collections.abc import Callable, Iterator +from typing import Any logger = logging.getLogger(__name__) @@ -22,6 +24,60 @@ def escape_sql_identifier(path: str, *, maxsplit: int = 2) -> str: return ".".join(escaped) +def paginated_fetch_offset( + fetch_page: Callable[[dict[str, Any]], dict], + items_key: str, + page_size: int, + start_index: int = 1, +) -> Iterator[dict]: + """Paginate a SCIM-style offset API (startIndex / count). + + Args: + fetch_page: Callable that takes a query dict and returns a response dict. + items_key: Key in the response containing the list of items (e.g. "Resources"). + page_size: Number of items to request per page. + start_index: 1-based index to start from (SCIM default is 1). + + Yields: + Individual raw item dicts from each page. + """ + query: dict[str, Any] = {"startIndex": start_index, "count": page_size} + while True: + response = fetch_page(query) + items = response.get(items_key, []) + if not items: + break + yield from items + if len(items) < page_size: + break + query["startIndex"] = int(query["startIndex"]) + len(items) + + +def paginated_fetch_cursor( + fetch_page: Callable[[str | None], dict], + items_key: str, + next_token_key: str = "next_page_token", +) -> Iterator[dict]: + """Paginate a cursor/token-based API. + + Args: + fetch_page: Callable that takes an optional page token and returns a response dict. + items_key: Key in the response containing the list of items. + next_token_key: Key in the response containing the next page token. + + Yields: + Individual raw item dicts from each page. + """ + token: str | None = None + while True: + response = fetch_page(token) + items = response.get(items_key, []) + yield from items + token = response.get(next_token_key) + if not token: + break + + def run_command(command: str | list[str]) -> tuple[int, str, str]: args = command.split() if isinstance(command, str) else command logger.info(f"Invoking command: {args!r}") diff --git a/tests/unit/framework/test_utils.py b/tests/unit/framework/test_utils.py index 966a8c9945..8bf746c10a 100644 --- a/tests/unit/framework/test_utils.py +++ b/tests/unit/framework/test_utils.py @@ -1,6 +1,10 @@ import pytest -from databricks.labs.ucx.framework.utils import escape_sql_identifier +from databricks.labs.ucx.framework.utils import ( + escape_sql_identifier, + paginated_fetch_offset, + paginated_fetch_cursor, +) @pytest.mark.parametrize( @@ -34,3 +38,137 @@ def test_escaped_when_column_contains_period() -> None: expected = "`column.with.periods`" path = "column.with.periods" assert escape_sql_identifier(path, maxsplit=0) == expected + + +# --- paginated_fetch_offset tests --- + + +def test_offset_pagination_empty_first_page() -> None: + """No items returned on the first request.""" + pages = [{"Resources": []}] + + def fetch_page(query: dict) -> dict: + return pages.pop(0) + + results = list(paginated_fetch_offset(fetch_page, items_key="Resources", page_size=10)) + assert results == [] + + +def test_offset_pagination_single_page() -> None: + """All items fit in a single page (fewer items than page_size).""" + items = [{"id": "1"}, {"id": "2"}] + pages = [{"Resources": items}] + + def fetch_page(query: dict) -> dict: + return pages.pop(0) + + results = list(paginated_fetch_offset(fetch_page, items_key="Resources", page_size=10)) + assert results == items + + +def test_offset_pagination_multiple_pages() -> None: + """Items span multiple pages; pagination advances startIndex correctly.""" + page1 = [{"id": "1"}, {"id": "2"}] + page2 = [{"id": "3"}] + pages = [{"Resources": page1}, {"Resources": page2}] + captured_queries: list[dict] = [] + + def fetch_page(query: dict) -> dict: + captured_queries.append(dict(query)) + return pages.pop(0) + + results = list(paginated_fetch_offset(fetch_page, items_key="Resources", page_size=2)) + assert results == [{"id": "1"}, {"id": "2"}, {"id": "3"}] + assert captured_queries[0]["startIndex"] == 1 + assert captured_queries[1]["startIndex"] == 3 + + +def test_offset_pagination_terminates_on_missing_key() -> None: + """Stops when the items key is missing from the response entirely.""" + pages = [{"other_key": "value"}] + + def fetch_page(query: dict) -> dict: + return pages.pop(0) + + results = list(paginated_fetch_offset(fetch_page, items_key="Resources", page_size=10)) + assert results == [] + + +def test_offset_pagination_respects_start_index() -> None: + """Custom start_index is passed to the first request.""" + pages = [{"Resources": [{"id": "5"}]}] + captured_queries: list[dict] = [] + + def fetch_page(query: dict) -> dict: + captured_queries.append(dict(query)) + return pages.pop(0) + + results = list(paginated_fetch_offset(fetch_page, items_key="Resources", page_size=10, start_index=5)) + assert results == [{"id": "5"}] + assert captured_queries[0]["startIndex"] == 5 + + +# --- paginated_fetch_cursor tests --- + + +def test_cursor_pagination_empty_first_page() -> None: + """No items returned on the first request.""" + pages = [{"feature_tables": []}] + + def fetch_page(token: str | None) -> dict: + return pages.pop(0) + + results = list(paginated_fetch_cursor(fetch_page, items_key="feature_tables")) + assert results == [] + + +def test_cursor_pagination_single_page_no_token() -> None: + """All items in one page, no next_page_token in response.""" + items = [{"id": "t1"}, {"id": "t2"}] + pages = [{"feature_tables": items}] + + def fetch_page(token: str | None) -> dict: + return pages.pop(0) + + results = list(paginated_fetch_cursor(fetch_page, items_key="feature_tables")) + assert results == items + + +def test_cursor_pagination_multiple_pages() -> None: + """Items span multiple pages with cursor tokens.""" + page1 = {"feature_tables": [{"id": "t1"}], "next_page_token": "token_abc"} + page2 = {"feature_tables": [{"id": "t2"}, {"id": "t3"}]} + pages = [page1, page2] + captured_tokens: list[str | None] = [] + + def fetch_page(token: str | None) -> dict: + captured_tokens.append(token) + return pages.pop(0) + + results = list(paginated_fetch_cursor(fetch_page, items_key="feature_tables")) + assert results == [{"id": "t1"}, {"id": "t2"}, {"id": "t3"}] + assert captured_tokens == [None, "token_abc"] + + +def test_cursor_pagination_custom_token_key() -> None: + """Supports a custom key name for the next page token.""" + page1 = {"items": [{"id": "1"}], "continuation": "xyz"} + page2 = {"items": [{"id": "2"}]} + pages = [page1, page2] + + def fetch_page(token: str | None) -> dict: + return pages.pop(0) + + results = list(paginated_fetch_cursor(fetch_page, items_key="items", next_token_key="continuation")) + assert results == [{"id": "1"}, {"id": "2"}] + + +def test_cursor_pagination_terminates_on_missing_items_key() -> None: + """Stops when the items key is missing from the response.""" + pages = [{"other": "data"}] + + def fetch_page(token: str | None) -> dict: + return pages.pop(0) + + results = list(paginated_fetch_cursor(fetch_page, items_key="feature_tables")) + assert results == [] From effefe8254f1180d221d4269959ab28040ece9d4 Mon Sep 17 00:00:00 2001 From: pritishpai Date: Fri, 27 Feb 2026 14:11:54 -0500 Subject: [PATCH 2/8] Refactor account groups listing to use paginated_fetch_offset Replace inline pagination loop in AccountGroupLookup._list_account_groups with the shared paginated_fetch_offset utility, adding proper SCIM offset pagination and deduplication to the previously unpaginated call. --- .../labs/ucx/workspace_access/groups.py | 20 +++++-- tests/unit/workspace_access/test_groups.py | 53 +++++++++++++++++++ 2 files changed, 69 insertions(+), 4 deletions(-) diff --git a/src/databricks/labs/ucx/workspace_access/groups.py b/src/databricks/labs/ucx/workspace_access/groups.py index 814043e1c6..bfbfefddb1 100644 --- a/src/databricks/labs/ucx/workspace_access/groups.py +++ b/src/databricks/labs/ucx/workspace_access/groups.py @@ -24,7 +24,7 @@ from databricks.sdk.service.iam import Group, User from databricks.labs.ucx.framework.crawlers import CrawlerBase -from databricks.labs.ucx.framework.utils import escape_sql_identifier +from databricks.labs.ucx.framework.utils import escape_sql_identifier, paginated_fetch_offset logger = logging.getLogger(__name__) @@ -913,6 +913,8 @@ def _get_strategy( class AccountGroupLookup: + PAGE_SIZE = 10000 + def __init__(self, ws: WorkspaceClient): self._ws = ws @@ -964,13 +966,23 @@ def _list_account_groups(self, scim_attributes: str) -> list[iam.Group]: # TODO: we should avoid using this method, as it's not documented # get account-level groups even if they're not (yet) assigned to a workspace logger.info(f"Listing account groups with {scim_attributes}...") - account_groups = [] - raw = self._ws.api_client.do("GET", "/api/2.0/account/scim/v2/Groups", query={"attributes": scim_attributes}) - for resource in raw.get("Resources", []): # type: ignore[union-attr] + + def fetch_page(query: dict) -> dict: + query["attributes"] = scim_attributes + return self._ws.api_client.do("GET", "/api/2.0/account/scim/v2/Groups", query=query) # type: ignore[return-value] + + account_groups: list[iam.Group] = [] + seen: set[str] = set() + for resource in paginated_fetch_offset(fetch_page, items_key="Resources", page_size=self.PAGE_SIZE): group = iam.Group.from_dict(resource) + if group.id and group.id in seen: + continue + if group.id: + seen.add(group.id) if group.display_name in SYSTEM_GROUPS: continue account_groups.append(group) + logger.info(f"Found {len(account_groups)} account groups") sorted_groups: list[iam.Group] = sorted( account_groups, key=lambda _: _.display_name if _.display_name else "" diff --git a/tests/unit/workspace_access/test_groups.py b/tests/unit/workspace_access/test_groups.py index 1adc823fc2..83e8bc3bc9 100644 --- a/tests/unit/workspace_access/test_groups.py +++ b/tests/unit/workspace_access/test_groups.py @@ -14,6 +14,7 @@ from databricks.sdk.service.iam import ComplexValue, Group, ResourceMeta from databricks.labs.ucx.workspace_access.groups import ( + AccountGroupLookup, ConfigureGroups, GroupManager, MigratedGroup, @@ -568,6 +569,58 @@ def reflect_account_side_effect(method, *_, **__): ) +def test_list_account_groups_paginates_through_multiple_pages(): + """Verify _list_account_groups uses paginated_fetch_offset to retrieve all pages.""" + wsclient = create_autospec(WorkspaceClient) + + page1 = [ + Group(id="1", display_name="alpha").as_dict(), + Group(id="2", display_name="beta").as_dict(), + ] + page2 = [ + Group(id="3", display_name="gamma").as_dict(), + ] + responses = iter([{"Resources": page1}, {"Resources": page2}]) + + def do_side_effect(_method, *_args, **_kwargs): + return next(responses) + + wsclient.api_client.do.side_effect = do_side_effect + + with patch.object(AccountGroupLookup, "PAGE_SIZE", 2): + lookup = AccountGroupLookup(wsclient) + groups = lookup.get_mapping() + + assert len(groups) == 3 + assert "alpha" in groups + assert "beta" in groups + assert "gamma" in groups + assert wsclient.api_client.do.call_count == 2 + + +def test_list_account_groups_filters_system_groups_across_pages(): + """System groups should be filtered even when they appear on later pages.""" + wsclient = create_autospec(WorkspaceClient) + + page1 = [ + Group(id="1", display_name="real_group").as_dict(), + Group(id="2", display_name="users").as_dict(), + ] + responses = iter([{"Resources": page1}]) + + def do_side_effect(_method, *_args, **_kwargs): + return next(responses) + + wsclient.api_client.do.side_effect = do_side_effect + + lookup = AccountGroupLookup(wsclient) + groups = lookup.get_mapping() + + assert len(groups) == 1 + assert "real_group" in groups + assert "users" not in groups + + def test_delete_original_workspace_groups_should_delete_reflected_acc_groups_in_workspace(fake_sleep: Mock) -> None: account_id = "11" ws_id = "1" From 7acdd735fe8167ca119f48f325852c9b002ebf50 Mon Sep 17 00:00:00 2001 From: pritishpai Date: Fri, 27 Feb 2026 14:18:49 -0500 Subject: [PATCH 3/8] Refactor feature store listing to use paginated_fetch_cursor Replace inline cursor-based pagination loop in feature_store_listing with the shared paginated_fetch_cursor utility. --- .../labs/ucx/workspace_access/generic.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/databricks/labs/ucx/workspace_access/generic.py b/src/databricks/labs/ucx/workspace_access/generic.py index a646317ece..c28bfe3e33 100644 --- a/src/databricks/labs/ucx/workspace_access/generic.py +++ b/src/databricks/labs/ucx/workspace_access/generic.py @@ -23,6 +23,8 @@ from databricks.sdk.service import iam, ml from databricks.sdk.service.iam import PermissionLevel +from databricks.labs.ucx.framework.utils import paginated_fetch_cursor + from databricks.labs.ucx.framework.crawlers import CrawlerBase from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.workspace_access.base import AclSupport, Permissions, StaticListing @@ -439,21 +441,17 @@ def inner() -> Iterator[ml.Experiment]: def feature_store_listing(ws: WorkspaceClient): def inner() -> list[GenericPermissionsInfo]: - feature_tables = [] - token = None - while True: + def fetch_page(token: str | None) -> dict: result = ws.api_client.do( "GET", "/api/2.0/feature-store/feature-tables/search", query={"page_token": token, "max_results": 200} ) assert isinstance(result, dict) - for table in result.get("feature_tables", []): - feature_tables.append(GenericPermissionsInfo(table["id"], "feature-tables")) - - if "next_page_token" not in result: - break - token = result["next_page_token"] # type: ignore[index] + return result - return feature_tables + return [ + GenericPermissionsInfo(table["id"], "feature-tables") + for table in paginated_fetch_cursor(fetch_page, items_key="feature_tables") + ] return inner From ee9e45584e4d0be9bcdc2ade4dc1098611e70385 Mon Sep 17 00:00:00 2001 From: pritishpai Date: Fri, 27 Feb 2026 14:22:23 -0500 Subject: [PATCH 4/8] Fix mypy type annotations in pagination utility tests --- tests/unit/framework/test_utils.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/tests/unit/framework/test_utils.py b/tests/unit/framework/test_utils.py index 8bf746c10a..49edc71b68 100644 --- a/tests/unit/framework/test_utils.py +++ b/tests/unit/framework/test_utils.py @@ -45,7 +45,7 @@ def test_escaped_when_column_contains_period() -> None: def test_offset_pagination_empty_first_page() -> None: """No items returned on the first request.""" - pages = [{"Resources": []}] + pages: list[dict] = [{"Resources": []}] def fetch_page(query: dict) -> dict: return pages.pop(0) @@ -113,7 +113,7 @@ def fetch_page(query: dict) -> dict: def test_cursor_pagination_empty_first_page() -> None: """No items returned on the first request.""" - pages = [{"feature_tables": []}] + pages: list[dict] = [{"feature_tables": []}] def fetch_page(token: str | None) -> dict: return pages.pop(0) @@ -136,9 +136,10 @@ def fetch_page(token: str | None) -> dict: def test_cursor_pagination_multiple_pages() -> None: """Items span multiple pages with cursor tokens.""" - page1 = {"feature_tables": [{"id": "t1"}], "next_page_token": "token_abc"} - page2 = {"feature_tables": [{"id": "t2"}, {"id": "t3"}]} - pages = [page1, page2] + pages: list[dict] = [ + {"feature_tables": [{"id": "t1"}], "next_page_token": "token_abc"}, + {"feature_tables": [{"id": "t2"}, {"id": "t3"}]}, + ] captured_tokens: list[str | None] = [] def fetch_page(token: str | None) -> dict: @@ -152,9 +153,10 @@ def fetch_page(token: str | None) -> dict: def test_cursor_pagination_custom_token_key() -> None: """Supports a custom key name for the next page token.""" - page1 = {"items": [{"id": "1"}], "continuation": "xyz"} - page2 = {"items": [{"id": "2"}]} - pages = [page1, page2] + pages: list[dict] = [ + {"items": [{"id": "1"}], "continuation": "xyz"}, + {"items": [{"id": "2"}]}, + ] def fetch_page(token: str | None) -> dict: return pages.pop(0) From 9d86776a79a934bf5bb3c8886fc2bb70f32e6b4b Mon Sep 17 00:00:00 2001 From: pritishpai Date: Fri, 27 Feb 2026 15:00:06 -0500 Subject: [PATCH 5/8] Fix pylint warnings in pagination utility code Rewrite multi-line list comprehension as for loop (R8923), prefix unused arguments with underscore (W0613), use implicit booleaness for empty list checks (C1803). --- .../labs/ucx/workspace_access/generic.py | 8 +++---- tests/unit/framework/test_utils.py | 22 +++++++++---------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/databricks/labs/ucx/workspace_access/generic.py b/src/databricks/labs/ucx/workspace_access/generic.py index c28bfe3e33..4fbafbad09 100644 --- a/src/databricks/labs/ucx/workspace_access/generic.py +++ b/src/databricks/labs/ucx/workspace_access/generic.py @@ -448,10 +448,10 @@ def fetch_page(token: str | None) -> dict: assert isinstance(result, dict) return result - return [ - GenericPermissionsInfo(table["id"], "feature-tables") - for table in paginated_fetch_cursor(fetch_page, items_key="feature_tables") - ] + feature_tables = [] + for table in paginated_fetch_cursor(fetch_page, items_key="feature_tables"): + feature_tables.append(GenericPermissionsInfo(table["id"], "feature-tables")) + return feature_tables return inner diff --git a/tests/unit/framework/test_utils.py b/tests/unit/framework/test_utils.py index 49edc71b68..63faa4d4a6 100644 --- a/tests/unit/framework/test_utils.py +++ b/tests/unit/framework/test_utils.py @@ -47,11 +47,11 @@ def test_offset_pagination_empty_first_page() -> None: """No items returned on the first request.""" pages: list[dict] = [{"Resources": []}] - def fetch_page(query: dict) -> dict: + def fetch_page(_query: dict) -> dict: return pages.pop(0) results = list(paginated_fetch_offset(fetch_page, items_key="Resources", page_size=10)) - assert results == [] + assert not results def test_offset_pagination_single_page() -> None: @@ -59,7 +59,7 @@ def test_offset_pagination_single_page() -> None: items = [{"id": "1"}, {"id": "2"}] pages = [{"Resources": items}] - def fetch_page(query: dict) -> dict: + def fetch_page(_query: dict) -> dict: return pages.pop(0) results = list(paginated_fetch_offset(fetch_page, items_key="Resources", page_size=10)) @@ -87,11 +87,11 @@ def test_offset_pagination_terminates_on_missing_key() -> None: """Stops when the items key is missing from the response entirely.""" pages = [{"other_key": "value"}] - def fetch_page(query: dict) -> dict: + def fetch_page(_query: dict) -> dict: return pages.pop(0) results = list(paginated_fetch_offset(fetch_page, items_key="Resources", page_size=10)) - assert results == [] + assert not results def test_offset_pagination_respects_start_index() -> None: @@ -115,11 +115,11 @@ def test_cursor_pagination_empty_first_page() -> None: """No items returned on the first request.""" pages: list[dict] = [{"feature_tables": []}] - def fetch_page(token: str | None) -> dict: + def fetch_page(_token: str | None) -> dict: return pages.pop(0) results = list(paginated_fetch_cursor(fetch_page, items_key="feature_tables")) - assert results == [] + assert not results def test_cursor_pagination_single_page_no_token() -> None: @@ -127,7 +127,7 @@ def test_cursor_pagination_single_page_no_token() -> None: items = [{"id": "t1"}, {"id": "t2"}] pages = [{"feature_tables": items}] - def fetch_page(token: str | None) -> dict: + def fetch_page(_token: str | None) -> dict: return pages.pop(0) results = list(paginated_fetch_cursor(fetch_page, items_key="feature_tables")) @@ -158,7 +158,7 @@ def test_cursor_pagination_custom_token_key() -> None: {"items": [{"id": "2"}]}, ] - def fetch_page(token: str | None) -> dict: + def fetch_page(_token: str | None) -> dict: return pages.pop(0) results = list(paginated_fetch_cursor(fetch_page, items_key="items", next_token_key="continuation")) @@ -169,8 +169,8 @@ def test_cursor_pagination_terminates_on_missing_items_key() -> None: """Stops when the items key is missing from the response.""" pages = [{"other": "data"}] - def fetch_page(token: str | None) -> dict: + def fetch_page(_token: str | None) -> dict: return pages.pop(0) results = list(paginated_fetch_cursor(fetch_page, items_key="feature_tables")) - assert results == [] + assert not results From dfe3ed1adf70783f26e0c91260542bb6bdb42edc Mon Sep 17 00:00:00 2001 From: pritishpai Date: Thu, 4 Jun 2026 11:30:55 -0400 Subject: [PATCH 6/8] Improve pagination utility design and test coverage Rename inner fetch functions to be more descriptive (fetch_page -> fetch_feature_tables, fetch_account_groups). Redesign paginated_fetch_offset callable signature to accept (start_index, count) as ints instead of a mutable dict, so callers own query construction. Add unit tests for feature_store_listing multi-page collection and _list_account_groups duplicate-ID deduplication to close coverage gap. --- src/databricks/labs/ucx/framework/utils.py | 10 ++++---- .../labs/ucx/workspace_access/generic.py | 4 ++-- .../labs/ucx/workspace_access/groups.py | 6 ++--- tests/unit/framework/test_utils.py | 24 +++++++++---------- tests/unit/workspace_access/test_generic.py | 20 ++++++++++++++++ tests/unit/workspace_access/test_groups.py | 24 +++++++++++++++++++ 6 files changed, 65 insertions(+), 23 deletions(-) diff --git a/src/databricks/labs/ucx/framework/utils.py b/src/databricks/labs/ucx/framework/utils.py index 6e6815e057..9bec95bd80 100644 --- a/src/databricks/labs/ucx/framework/utils.py +++ b/src/databricks/labs/ucx/framework/utils.py @@ -1,7 +1,6 @@ import logging import subprocess from collections.abc import Callable, Iterator -from typing import Any logger = logging.getLogger(__name__) @@ -25,7 +24,7 @@ def escape_sql_identifier(path: str, *, maxsplit: int = 2) -> str: def paginated_fetch_offset( - fetch_page: Callable[[dict[str, Any]], dict], + fetch_page: Callable[[int, int], dict], items_key: str, page_size: int, start_index: int = 1, @@ -33,7 +32,7 @@ def paginated_fetch_offset( """Paginate a SCIM-style offset API (startIndex / count). Args: - fetch_page: Callable that takes a query dict and returns a response dict. + fetch_page: Callable that takes (start_index, count) and returns a response dict. items_key: Key in the response containing the list of items (e.g. "Resources"). page_size: Number of items to request per page. start_index: 1-based index to start from (SCIM default is 1). @@ -41,16 +40,15 @@ def paginated_fetch_offset( Yields: Individual raw item dicts from each page. """ - query: dict[str, Any] = {"startIndex": start_index, "count": page_size} while True: - response = fetch_page(query) + response = fetch_page(start_index, page_size) items = response.get(items_key, []) if not items: break yield from items if len(items) < page_size: break - query["startIndex"] = int(query["startIndex"]) + len(items) + start_index += len(items) def paginated_fetch_cursor( diff --git a/src/databricks/labs/ucx/workspace_access/generic.py b/src/databricks/labs/ucx/workspace_access/generic.py index 4fbafbad09..a9568ef92f 100644 --- a/src/databricks/labs/ucx/workspace_access/generic.py +++ b/src/databricks/labs/ucx/workspace_access/generic.py @@ -441,7 +441,7 @@ def inner() -> Iterator[ml.Experiment]: def feature_store_listing(ws: WorkspaceClient): def inner() -> list[GenericPermissionsInfo]: - def fetch_page(token: str | None) -> dict: + def fetch_feature_tables(token: str | None) -> dict: result = ws.api_client.do( "GET", "/api/2.0/feature-store/feature-tables/search", query={"page_token": token, "max_results": 200} ) @@ -449,7 +449,7 @@ def fetch_page(token: str | None) -> dict: return result feature_tables = [] - for table in paginated_fetch_cursor(fetch_page, items_key="feature_tables"): + for table in paginated_fetch_cursor(fetch_feature_tables, items_key="feature_tables"): feature_tables.append(GenericPermissionsInfo(table["id"], "feature-tables")) return feature_tables diff --git a/src/databricks/labs/ucx/workspace_access/groups.py b/src/databricks/labs/ucx/workspace_access/groups.py index bfbfefddb1..8b4ff00e0d 100644 --- a/src/databricks/labs/ucx/workspace_access/groups.py +++ b/src/databricks/labs/ucx/workspace_access/groups.py @@ -967,13 +967,13 @@ def _list_account_groups(self, scim_attributes: str) -> list[iam.Group]: # get account-level groups even if they're not (yet) assigned to a workspace logger.info(f"Listing account groups with {scim_attributes}...") - def fetch_page(query: dict) -> dict: - query["attributes"] = scim_attributes + def fetch_account_groups(start_index: int, count: int) -> dict: + query = {"startIndex": start_index, "count": count, "attributes": scim_attributes} return self._ws.api_client.do("GET", "/api/2.0/account/scim/v2/Groups", query=query) # type: ignore[return-value] account_groups: list[iam.Group] = [] seen: set[str] = set() - for resource in paginated_fetch_offset(fetch_page, items_key="Resources", page_size=self.PAGE_SIZE): + for resource in paginated_fetch_offset(fetch_account_groups, items_key="Resources", page_size=self.PAGE_SIZE): group = iam.Group.from_dict(resource) if group.id and group.id in seen: continue diff --git a/tests/unit/framework/test_utils.py b/tests/unit/framework/test_utils.py index 63faa4d4a6..a48ff7418b 100644 --- a/tests/unit/framework/test_utils.py +++ b/tests/unit/framework/test_utils.py @@ -47,7 +47,7 @@ def test_offset_pagination_empty_first_page() -> None: """No items returned on the first request.""" pages: list[dict] = [{"Resources": []}] - def fetch_page(_query: dict) -> dict: + def fetch_page(_start_index: int, _count: int) -> dict: return pages.pop(0) results = list(paginated_fetch_offset(fetch_page, items_key="Resources", page_size=10)) @@ -59,7 +59,7 @@ def test_offset_pagination_single_page() -> None: items = [{"id": "1"}, {"id": "2"}] pages = [{"Resources": items}] - def fetch_page(_query: dict) -> dict: + def fetch_page(_start_index: int, _count: int) -> dict: return pages.pop(0) results = list(paginated_fetch_offset(fetch_page, items_key="Resources", page_size=10)) @@ -71,23 +71,23 @@ def test_offset_pagination_multiple_pages() -> None: page1 = [{"id": "1"}, {"id": "2"}] page2 = [{"id": "3"}] pages = [{"Resources": page1}, {"Resources": page2}] - captured_queries: list[dict] = [] + captured_calls: list[tuple[int, int]] = [] - def fetch_page(query: dict) -> dict: - captured_queries.append(dict(query)) + def fetch_page(start_index: int, count: int) -> dict: + captured_calls.append((start_index, count)) return pages.pop(0) results = list(paginated_fetch_offset(fetch_page, items_key="Resources", page_size=2)) assert results == [{"id": "1"}, {"id": "2"}, {"id": "3"}] - assert captured_queries[0]["startIndex"] == 1 - assert captured_queries[1]["startIndex"] == 3 + assert captured_calls[0] == (1, 2) + assert captured_calls[1] == (3, 2) def test_offset_pagination_terminates_on_missing_key() -> None: """Stops when the items key is missing from the response entirely.""" pages = [{"other_key": "value"}] - def fetch_page(_query: dict) -> dict: + def fetch_page(_start_index: int, _count: int) -> dict: return pages.pop(0) results = list(paginated_fetch_offset(fetch_page, items_key="Resources", page_size=10)) @@ -97,15 +97,15 @@ def fetch_page(_query: dict) -> dict: def test_offset_pagination_respects_start_index() -> None: """Custom start_index is passed to the first request.""" pages = [{"Resources": [{"id": "5"}]}] - captured_queries: list[dict] = [] + captured_calls: list[tuple[int, int]] = [] - def fetch_page(query: dict) -> dict: - captured_queries.append(dict(query)) + def fetch_page(start_index: int, count: int) -> dict: + captured_calls.append((start_index, count)) return pages.pop(0) results = list(paginated_fetch_offset(fetch_page, items_key="Resources", page_size=10, start_index=5)) assert results == [{"id": "5"}] - assert captured_queries[0]["startIndex"] == 5 + assert captured_calls[0] == (5, 10) # --- paginated_fetch_cursor tests --- diff --git a/tests/unit/workspace_access/test_generic.py b/tests/unit/workspace_access/test_generic.py index 860b17a9c5..2a3572a7bf 100644 --- a/tests/unit/workspace_access/test_generic.py +++ b/tests/unit/workspace_access/test_generic.py @@ -21,6 +21,7 @@ from databricks.sdk.service.workspace import Language, ObjectInfo, ObjectType from databricks.labs.ucx.workspace_access.generic import ( + GenericPermissionsInfo, GenericPermissionsSupport, Listing, Permissions, @@ -872,6 +873,25 @@ def do_api_side_effect(*_, query): assert result[0].request_type == "feature-tables" +def test_feature_store_listing_collects_all_pages(): + ws = create_autospec(WorkspaceClient) + page1_tables = [{"id": f"table{i}"} for i in range(200)] + page2_tables = [{"id": f"table{i}"} for i in range(200, 250)] + + def do_api_side_effect(*_, query): + if not query["page_token"]: + return {"feature_tables": page1_tables, "next_page_token": "token2"} + return {"feature_tables": page2_tables} + + ws.api_client.do.side_effect = do_api_side_effect + + result = feature_store_listing(ws)() + + assert len(result) == 250 + assert all(isinstance(item, GenericPermissionsInfo) for item in result) + assert all(item.request_type == "feature-tables" for item in result) + + def test_root_page_listing(): ws = create_autospec(WorkspaceClient) diff --git a/tests/unit/workspace_access/test_groups.py b/tests/unit/workspace_access/test_groups.py index 83e8bc3bc9..60c544ab15 100644 --- a/tests/unit/workspace_access/test_groups.py +++ b/tests/unit/workspace_access/test_groups.py @@ -621,6 +621,30 @@ def do_side_effect(_method, *_args, **_kwargs): assert "users" not in groups +def test_list_account_groups_deduplicates_across_pages(): + """Duplicate group IDs across pages should be counted only once.""" + wsclient = create_autospec(WorkspaceClient) + + duplicate = Group(id="1", display_name="alpha").as_dict() + page1 = [duplicate, Group(id="2", display_name="beta").as_dict()] + page2 = [duplicate, Group(id="3", display_name="gamma").as_dict()] + responses = iter([{"Resources": page1}, {"Resources": page2}]) + + def do_side_effect(_method, *_args, **_kwargs): + return next(responses) + + wsclient.api_client.do.side_effect = do_side_effect + + with patch.object(AccountGroupLookup, "PAGE_SIZE", 2): + lookup = AccountGroupLookup(wsclient) + groups = lookup.get_mapping() + + assert len(groups) == 3 + assert "alpha" in groups + assert "beta" in groups + assert "gamma" in groups + + def test_delete_original_workspace_groups_should_delete_reflected_acc_groups_in_workspace(fake_sleep: Mock) -> None: account_id = "11" ws_id = "1" From a6f0d00f18a15aa198274e96e5832d09c363da2e Mon Sep 17 00:00:00 2001 From: pritishpai Date: Thu, 4 Jun 2026 12:35:55 -0400 Subject: [PATCH 7/8] Fix dedup test: last page must be shorter than PAGE_SIZE to terminate loop --- tests/unit/workspace_access/test_groups.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/unit/workspace_access/test_groups.py b/tests/unit/workspace_access/test_groups.py index 60c544ab15..d4f07c47bc 100644 --- a/tests/unit/workspace_access/test_groups.py +++ b/tests/unit/workspace_access/test_groups.py @@ -626,8 +626,10 @@ def test_list_account_groups_deduplicates_across_pages(): wsclient = create_autospec(WorkspaceClient) duplicate = Group(id="1", display_name="alpha").as_dict() + # page1 fills PAGE_SIZE (2), so pagination continues; page2 is shorter, so it stops. + # alpha appears on both pages and must only be counted once. page1 = [duplicate, Group(id="2", display_name="beta").as_dict()] - page2 = [duplicate, Group(id="3", display_name="gamma").as_dict()] + page2 = [duplicate] responses = iter([{"Resources": page1}, {"Resources": page2}]) def do_side_effect(_method, *_args, **_kwargs): @@ -639,10 +641,9 @@ def do_side_effect(_method, *_args, **_kwargs): lookup = AccountGroupLookup(wsclient) groups = lookup.get_mapping() - assert len(groups) == 3 + assert len(groups) == 2 assert "alpha" in groups assert "beta" in groups - assert "gamma" in groups def test_delete_original_workspace_groups_should_delete_reflected_acc_groups_in_workspace(fake_sleep: Mock) -> None: From 02a2ef4e60235fa1152470d25be3de796420ef42 Mon Sep 17 00:00:00 2001 From: pritishpai Date: Thu, 4 Jun 2026 15:03:33 -0400 Subject: [PATCH 8/8] Fix 403 on databricks labs install lsql by passing GITHUB_TOKEN --- .github/workflows/push.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml index 263aafbf5f..efb780f6fc 100644 --- a/.github/workflows/push.yml +++ b/.github/workflows/push.yml @@ -89,6 +89,7 @@ jobs: env: # this is a temporary hack DATABRICKS_HOST: any DATABRICKS_TOKEN: any + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - name: Reformat SQL queries run: databricks labs lsql fmt --normalize-case false --exclude tests/unit/source_code/samples/