-
Notifications
You must be signed in to change notification settings - Fork 1
Consolidate API access check and folder listing into single request #179
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a5dfc62
7dcaa84
60f1518
a0e7bc6
6688c79
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -718,118 +718,125 @@ | |
| return {} | ||
|
|
||
|
|
||
| def verify_access_and_get_folders( | ||
| client: httpx.Client, profile_id: str | ||
| ) -> Optional[Dict[str, str]]: | ||
| """Combine access check and folder listing into a single API request. | ||
|
|
||
| Returns: | ||
| Dict of {folder_name: folder_id} on success. | ||
| None if access is denied or the request fails after retries. | ||
| """ | ||
| url = f"{API_BASE}/{profile_id}/groups" | ||
|
|
||
| for attempt in range(MAX_RETRIES): | ||
| try: | ||
| resp = client.get(url) | ||
| resp.raise_for_status() | ||
|
|
||
| try: | ||
| data = resp.json() | ||
|
|
||
| # Ensure we got the expected top-level JSON structure. | ||
| # We defensively validate types here so that unexpected but valid | ||
| # JSON (e.g., a list or a scalar) doesn't cause AttributeError/TypeError | ||
| # and crash the sync logic. | ||
| # and cause the operation to fail unexpectedly. | ||
| if not isinstance(data, dict): | ||
| log.error( | ||
| "Failed to parse folders data: expected JSON object at top level, " | ||
| f"got {type(data).__name__}" | ||
| ) | ||
| return None | ||
|
|
||
| body = data.get("body") | ||
| if not isinstance(body, dict): | ||
| log.error( | ||
| "Failed to parse folders data: expected 'body' to be an object, " | ||
| f"got {type(body).__name__ if body is not None else 'None'}" | ||
| ) | ||
| return None | ||
|
|
||
| folders = body.get("groups", []) | ||
| if not isinstance(folders, list): | ||
| log.error( | ||
| "Failed to parse folders data: expected 'body[\"groups\"]' to be a list, " | ||
| f"got {type(folders).__name__}" | ||
| ) | ||
| return None | ||
|
|
||
| # Only process entries that are dicts and have the required keys. | ||
| result: Dict[str, str] = {} | ||
| for f in folders: | ||
Check warningCode scanning / Pylint (reported by Codacy) Variable name "f" doesn't conform to snake_case naming style Warning
Variable name "f" doesn't conform to snake_case naming style
|
||
| if not isinstance(f, dict): | ||
| # Skip non-dict entries instead of crashing; this protects | ||
| # against partial data corruption or unexpected API changes. | ||
| continue | ||
| name = f.get("group") | ||
| pk = f.get("PK") | ||
Check warningCode scanning / Pylint (reported by Codacy) Variable name "pk" doesn't conform to snake_case naming style Warning
Variable name "pk" doesn't conform to snake_case naming style
Check warningCode scanning / Pylintpython3 (reported by Codacy) Variable name "pk" doesn't conform to snake_case naming style Warning
Variable name "pk" doesn't conform to snake_case naming style
|
||
| # Skip entries with empty or None values for required fields | ||
| if not name or not pk: | ||
| continue | ||
| result[str(name).strip()] = str(pk) | ||
|
|
||
| return result | ||
| except (KeyError, ValueError, TypeError, AttributeError) as e: | ||
| except (ValueError, TypeError, AttributeError) as err: | ||
| # As a final safeguard, catch any remaining parsing/shape errors so | ||
| # that a malformed response cannot crash the caller. | ||
| log.error(f"Failed to parse folders data: {sanitize_for_log(e)}") | ||
| log.error( | ||
| "Failed to parse folders data: %s", sanitize_for_log(err) | ||
| ) | ||
| return None | ||
|
|
||
| except httpx.HTTPStatusError as e: | ||
| code = e.response.status_code | ||
| if code in (401, 403, 404): | ||
| if code == 401: | ||
| log.critical( | ||
|
|
||
| f"{Colors.FAIL}❌ Authentication Failed: The API Token is invalid.{Colors.ENDC}" | ||
Check warningCode scanning / Pylint (reported by Codacy) Line too long (103/100) Warning
Line too long (103/100)
Check warningCode scanning / Pylintpython3 (reported by Codacy) Line too long (103/100) Warning
Line too long (103/100)
|
||
| ) | ||
| log.critical( | ||
|
|
||
| f"{Colors.FAIL} Please check your token at: https://controld.com/account/manage-account{Colors.ENDC}" | ||
| ) | ||
| elif code == 403: | ||
| log.critical( | ||
| f"{Colors.FAIL}🚫 Access Denied: Token lacks permission for Profile {sanitize_for_log(profile_id)}.{Colors.ENDC}" | ||
| "%s🚫 Access Denied: Token lacks permission for " | ||
| "Profile %s.%s", | ||
| Colors.FAIL, | ||
| sanitize_for_log(profile_id), | ||
| Colors.ENDC, | ||
| ) | ||
| elif code == 404: | ||
| log.critical( | ||
| f"{Colors.FAIL}🔍 Profile Not Found: The ID '{sanitize_for_log(profile_id)}' does not exist.{Colors.ENDC}" | ||
Check warningCode scanning / Pylint (reported by Codacy) Line too long (121/100) Warning
Line too long (121/100)
|
||
| ) | ||
| log.critical( | ||
| f"{Colors.FAIL} Please verify the Profile ID from your Control D Dashboard URL.{Colors.ENDC}" | ||
| ) | ||
| return None | ||
|
|
||
| if attempt == MAX_RETRIES - 1: | ||
| log.error(f"API Request Failed ({code}): {sanitize_for_log(e)}") | ||
| return None | ||
|
|
||
| except httpx.RequestError as e: | ||
| except httpx.RequestError as err: | ||
| if attempt == MAX_RETRIES - 1: | ||
| log.error( | ||
| f"Network error during access verification: {sanitize_for_log(e)}" | ||
| "Network error during access verification: %s", | ||
| sanitize_for_log(err), | ||
| ) | ||
| return None | ||
|
|
||
| wait_time = RETRY_DELAY * (2**attempt) | ||
| log.warning( | ||
| f"Request failed (attempt {attempt + 1}/{MAX_RETRIES}). Retrying in {wait_time}s..." | ||
| "Request failed (attempt %d/%d). Retrying in %ds...", | ||
| attempt + 1, | ||
| MAX_RETRIES, | ||
| wait_time, | ||
| ) | ||
|
Comment on lines
+830
to
+835
|
||
| time.sleep(wait_time) | ||
|
|
||
| log.error("Access verification failed after all retries") | ||
| return None | ||
|
|
||
|
|
||
| def get_all_existing_rules( | ||
| client: httpx.Client, | ||
| profile_id: str, | ||
| known_folders: Optional[Dict[str, str]] = None, | ||
| ) -> Set[str]: | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.