Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,118 +718,125 @@
return {}


def verify_access_and_get_folders(
client: httpx.Client, profile_id: str
) -> Optional[Dict[str, str]]:
"""Combine access check and folder listing into a single API request.

Returns:
Dict of {folder_name: folder_id} on success.
None if access is denied or the request fails after retries.
"""
url = f"{API_BASE}/{profile_id}/groups"

for attempt in range(MAX_RETRIES):
try:
resp = client.get(url)
resp.raise_for_status()

try:
data = resp.json()

# Ensure we got the expected top-level JSON structure.
# We defensively validate types here so that unexpected but valid
# JSON (e.g., a list or a scalar) doesn't cause AttributeError/TypeError
# and crash the sync logic.
# and cause the operation to fail unexpectedly.
if not isinstance(data, dict):
log.error(
"Failed to parse folders data: expected JSON object at top level, "
f"got {type(data).__name__}"
)
return None

body = data.get("body")
if not isinstance(body, dict):
log.error(
Comment thread Dismissed
"Failed to parse folders data: expected 'body' to be an object, "
f"got {type(body).__name__ if body is not None else 'None'}"
)
return None

folders = body.get("groups", [])
if not isinstance(folders, list):
log.error(
"Failed to parse folders data: expected 'body[\"groups\"]' to be a list, "
f"got {type(folders).__name__}"
)
return None

# Only process entries that are dicts and have the required keys.
result: Dict[str, str] = {}
for f in folders:

Check warning

Code scanning / Pylint (reported by Codacy)

Variable name "f" doesn't conform to snake_case naming style Warning

Variable name "f" doesn't conform to snake_case naming style
if not isinstance(f, dict):
# Skip non-dict entries instead of crashing; this protects
# against partial data corruption or unexpected API changes.
continue
name = f.get("group")
pk = f.get("PK")

Check warning

Code scanning / Pylint (reported by Codacy)

Variable name "pk" doesn't conform to snake_case naming style Warning

Variable name "pk" doesn't conform to snake_case naming style

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Variable name "pk" doesn't conform to snake_case naming style Warning

Variable name "pk" doesn't conform to snake_case naming style
# Skip entries with empty or None values for required fields
if not name or not pk:
continue
result[str(name).strip()] = str(pk)

return result
except (KeyError, ValueError, TypeError, AttributeError) as e:
except (ValueError, TypeError, AttributeError) as err:
# As a final safeguard, catch any remaining parsing/shape errors so
# that a malformed response cannot crash the caller.
log.error(f"Failed to parse folders data: {sanitize_for_log(e)}")
log.error(
"Failed to parse folders data: %s", sanitize_for_log(err)
)
return None

except httpx.HTTPStatusError as e:
code = e.response.status_code
if code in (401, 403, 404):
if code == 401:
log.critical(
Comment thread Dismissed
f"{Colors.FAIL}❌ Authentication Failed: The API Token is invalid.{Colors.ENDC}"

Check warning

Code scanning / Pylint (reported by Codacy)

Line too long (103/100) Warning

Line too long (103/100)

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Line too long (103/100) Warning

Line too long (103/100)
)
log.critical(
Comment thread Dismissed
f"{Colors.FAIL} Please check your token at: https://controld.com/account/manage-account{Colors.ENDC}"
)
elif code == 403:
log.critical(
f"{Colors.FAIL}🚫 Access Denied: Token lacks permission for Profile {sanitize_for_log(profile_id)}.{Colors.ENDC}"
"%s🚫 Access Denied: Token lacks permission for "
"Profile %s.%s",
Colors.FAIL,
sanitize_for_log(profile_id),
Colors.ENDC,
)
elif code == 404:
log.critical(
f"{Colors.FAIL}🔍 Profile Not Found: The ID '{sanitize_for_log(profile_id)}' does not exist.{Colors.ENDC}"

Check warning

Code scanning / Pylint (reported by Codacy)

Line too long (121/100) Warning

Line too long (121/100)
)
log.critical(
f"{Colors.FAIL} Please verify the Profile ID from your Control D Dashboard URL.{Colors.ENDC}"
)
return None

if attempt == MAX_RETRIES - 1:
log.error(f"API Request Failed ({code}): {sanitize_for_log(e)}")
return None

except httpx.RequestError as e:
except httpx.RequestError as err:
if attempt == MAX_RETRIES - 1:
log.error(
f"Network error during access verification: {sanitize_for_log(e)}"
"Network error during access verification: %s",
sanitize_for_log(err),
)
return None

wait_time = RETRY_DELAY * (2**attempt)
log.warning(
f"Request failed (attempt {attempt + 1}/{MAX_RETRIES}). Retrying in {wait_time}s..."
"Request failed (attempt %d/%d). Retrying in %ds...",
attempt + 1,
MAX_RETRIES,
wait_time,
)
Comment on lines +830 to +835
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retry log message uses a %d placeholder for wait_time, but wait_time can be a float (e.g., if RETRY_DELAY is configured/overridden as a non-integer for faster retries). Using %d will raise a formatting TypeError at runtime. Consider switching to %s/%.3f (or explicitly casting) so retries can't crash due to logging format mismatch.

Copilot uses AI. Check for mistakes.
time.sleep(wait_time)

log.error("Access verification failed after all retries")
return None


def get_all_existing_rules(
client: httpx.Client,

Check notice on line 839 in main.py

View check run for this annotation

codefactor.io / CodeFactor

main.py#L721-L839

Complex Method
profile_id: str,
known_folders: Optional[Dict[str, str]] = None,
) -> Set[str]:
Expand Down
5 changes: 4 additions & 1 deletion test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@


def test_verify_access_and_get_folders_500_retry(monkeypatch):
"""Test that verify_access_and_get_folders retries on 500 errors."""
m = reload_main_with_env(monkeypatch)
mock_client = MagicMock()

Expand All @@ -356,6 +357,7 @@


def test_verify_access_and_get_folders_network_error(monkeypatch):
"""Test that verify_access_and_get_folders handles network errors."""
m = reload_main_with_env(monkeypatch)
mock_client = MagicMock()

Expand All @@ -372,7 +374,8 @@
assert m.verify_access_and_get_folders(mock_client, "profile") is None

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.

Check notice

Code scanning / Bandit (reported by Codacy)

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
assert mock_client.get.call_count == 2

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.

Check notice

Code scanning / Bandit (reported by Codacy)

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
assert mock_log.error.called
assert "Network error" in str(mock_log.error.call_args) or "access verification" in str(mock_log.error.call_args)
error_msg = str(mock_log.error.call_args)
assert "Network error" in error_msg or "access verification" in error_msg

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.

Check notice

Code scanning / Bandit (reported by Codacy)

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.


# Case 8: extract_profile_id correctly extracts ID from URL or returns input
Expand Down
18 changes: 15 additions & 3 deletions tests/test_parallel_deletion.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ def test_parallel_deletion_uses_threadpool(mock_env, monkeypatch):
__enter__=lambda self: mock_client, __exit__=lambda *args: None
)
monkeypatch.setattr(main, "_api_client", lambda: mock_client_ctx)
monkeypatch.setattr(main, "verify_access_and_get_folders", lambda *args: {"FolderA": "id1", "FolderB": "id2"})
monkeypatch.setattr(
main,
"verify_access_and_get_folders",
lambda *args: {"FolderA": "id1", "FolderB": "id2"},
)
monkeypatch.setattr(main, "delete_folder", lambda *args: True)
monkeypatch.setattr(main, "get_all_existing_rules", lambda *args: set())
monkeypatch.setattr(main, "countdown_timer", lambda *args: None)
Expand Down Expand Up @@ -90,7 +94,11 @@ def test_parallel_deletion_handles_exceptions(mock_env, monkeypatch):
__enter__=lambda self: mock_client, __exit__=lambda *args: None
)
monkeypatch.setattr(main, "_api_client", lambda: mock_client_ctx)
monkeypatch.setattr(main, "verify_access_and_get_folders", lambda *args: {"Folder1": "id1"})
monkeypatch.setattr(
main,
"verify_access_and_get_folders",
lambda *args: {"Folder1": "id1"},
)

# Mock delete_folder to raise an exception
def failing_delete(*args):
Expand Down Expand Up @@ -140,7 +148,11 @@ def test_parallel_deletion_sanitizes_exception(mock_env, monkeypatch):
__enter__=lambda self: mock_client, __exit__=lambda *args: None
)
monkeypatch.setattr(main, "_api_client", lambda: mock_client_ctx)
monkeypatch.setattr(main, "verify_access_and_get_folders", lambda *args: {"TestFolder": "id1"})
monkeypatch.setattr(
main,
"verify_access_and_get_folders",
lambda *args: {"TestFolder": "id1"},
)

# Mock delete_folder to raise exception with potentially dangerous content
def failing_delete(*args):
Expand Down
Loading