|
2 | 2 |
|
3 | 3 | import typing as t |
4 | 4 | import logging |
5 | | -import time |
6 | 5 | from sqlglot import exp |
| 6 | +from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_result |
7 | 7 | from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter |
8 | 8 | from sqlmesh.core.engine_adapter.shared import InsertOverwriteStrategy, SourceQuery |
9 | 9 | from sqlmesh.core.engine_adapter.base import EngineAdapter |
@@ -225,47 +225,53 @@ def _make_fabric_api_request_with_location( |
225 | 225 | except requests.exceptions.RequestException as e: |
226 | 226 | raise SQLMeshError(f"Fabric API request failed: {e}") |
227 | 227 |
|
228 | | - def _poll_operation_status(self, location_url: str, operation_name: str) -> None: |
229 | | - """Poll the operation status until completion.""" |
| 228 | + @retry( |
| 229 | + wait=wait_exponential(multiplier=1, min=1, max=30), |
| 230 | + stop=stop_after_attempt(60), |
| 231 | + retry=retry_if_result(lambda result: result not in ["Succeeded", "Failed"]), |
| 232 | + ) |
| 233 | + def _check_operation_status(self, location_url: str, operation_name: str) -> str: |
| 234 | + """Check the operation status and return the status string.""" |
230 | 235 | if not requests: |
231 | 236 | raise SQLMeshError("requests library is required for Fabric catalog operations") |
232 | 237 |
|
233 | 238 | headers = self._get_fabric_auth_headers() |
234 | | - max_attempts = 60 # Poll for up to 10 minutes |
235 | | - initial_delay = 1 # Start with 1 second |
236 | 239 |
|
237 | | - for attempt in range(max_attempts): |
238 | | - try: |
239 | | - response = requests.get(location_url, headers=headers) |
240 | | - response.raise_for_status() |
241 | | - |
242 | | - result = response.json() |
243 | | - status = result.get("status", "Unknown") |
244 | | - |
245 | | - logger.info(f"Operation {operation_name} status: {status}") |
246 | | - |
247 | | - if status == "Succeeded": |
248 | | - return |
249 | | - if status == "Failed": |
250 | | - error_msg = result.get("error", {}).get("message", "Unknown error") |
251 | | - raise SQLMeshError(f"Operation {operation_name} failed: {error_msg}") |
252 | | - elif status in ["InProgress", "Running"]: |
253 | | - # Use exponential backoff with max of 30 seconds |
254 | | - delay = min(initial_delay * (2 ** min(attempt // 3, 4)), 30) |
255 | | - logger.info(f"Waiting {delay} seconds before next status check...") |
256 | | - time.sleep(delay) |
257 | | - else: |
258 | | - logger.warning(f"Unknown status '{status}' for operation {operation_name}") |
259 | | - time.sleep(5) |
260 | | - |
261 | | - except requests.exceptions.RequestException as e: |
262 | | - if attempt < max_attempts - 1: |
263 | | - logger.warning(f"Failed to poll status (attempt {attempt + 1}): {e}") |
264 | | - time.sleep(5) |
265 | | - else: |
266 | | - raise SQLMeshError(f"Failed to poll operation status: {e}") |
267 | | - |
268 | | - raise SQLMeshError(f"Operation {operation_name} did not complete within timeout") |
| 240 | + try: |
| 241 | + response = requests.get(location_url, headers=headers) |
| 242 | + response.raise_for_status() |
| 243 | + |
| 244 | + result = response.json() |
| 245 | + status = result.get("status", "Unknown") |
| 246 | + |
| 247 | + logger.info(f"Operation {operation_name} status: {status}") |
| 248 | + |
| 249 | + if status == "Failed": |
| 250 | + error_msg = result.get("error", {}).get("message", "Unknown error") |
| 251 | + raise SQLMeshError(f"Operation {operation_name} failed: {error_msg}") |
| 252 | + elif status in ["InProgress", "Running"]: |
| 253 | + logger.info(f"Operation {operation_name} still in progress...") |
| 254 | + elif status not in ["Succeeded"]: |
| 255 | + logger.warning(f"Unknown status '{status}' for operation {operation_name}") |
| 256 | + |
| 257 | + return status |
| 258 | + |
| 259 | + except requests.exceptions.RequestException as e: |
| 260 | + logger.warning(f"Failed to poll status: {e}") |
| 261 | + raise SQLMeshError(f"Failed to poll operation status: {e}") |
| 262 | + |
| 263 | + def _poll_operation_status(self, location_url: str, operation_name: str) -> None: |
| 264 | + """Poll the operation status until completion.""" |
| 265 | + try: |
| 266 | + final_status = self._check_operation_status(location_url, operation_name) |
| 267 | + if final_status != "Succeeded": |
| 268 | + raise SQLMeshError( |
| 269 | + f"Operation {operation_name} completed with status: {final_status}" |
| 270 | + ) |
| 271 | + except Exception as e: |
| 272 | + if "retry" in str(e).lower(): |
| 273 | + raise SQLMeshError(f"Operation {operation_name} did not complete within timeout") |
| 274 | + raise |
269 | 275 |
|
270 | 276 | def _create_catalog(self, catalog_name: exp.Identifier) -> None: |
271 | 277 | """Create a catalog (warehouse) in Microsoft Fabric via REST API.""" |
|
0 commit comments