Add snowflake-cortex-code-nhtsa-safety-intelligence connector#561
Conversation
Agent-Driven Discovery pattern built with Snowflake Cortex Code: syncs NHTSA recalls, complaints, and vehicle specs, then uses Snowflake Cortex COMPLETE to analyze safety patterns and autonomously discover related vehicles during Fivetran ingestion. - 5 tables: recalls, complaints, vehicle_specs, discovery_insights, safety_analysis - Two Cortex COMPLETE calls per sync: discovery analysis + cross-vehicle synthesis - PAT token authentication for Snowflake Cortex REST API - Resilient per-vehicle error handling for agent-recommended fetches - SDK v2+ compliant (no yield, required comments, template docstrings)
There was a problem hiding this comment.
Pull request overview
Adds a new AI tutorial connector under all_things_ai/tutorials/snowflake-cortex-code-nhtsa-safety-intelligence/ demonstrating “Agent-Driven Discovery” by syncing NHTSA vehicle safety data and optionally using Snowflake Cortex COMPLETE to recommend and synthesize additional vehicle investigations.
Changes:
- Added new tutorial connector (Python) that syncs NHTSA recalls/complaints/specs and optionally calls Cortex COMPLETE for discovery + synthesis.
- Added tutorial documentation (README) and a
configuration.jsonfor the new connector. - Updated the repo root
README.mdto include the new tutorial link.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 10 comments.
| File | Description |
|---|---|
| README.md | Adds the new tutorial connector link to the AI tutorials list. |
| all_things_ai/tutorials/snowflake-cortex-code-nhtsa-safety-intelligence/README.md | Documents setup, configuration, behavior, and tables for the new connector. |
| all_things_ai/tutorials/snowflake-cortex-code-nhtsa-safety-intelligence/connector.py | Implements the 3-phase NHTSA ingestion + optional Cortex-driven discovery and synthesis pipeline. |
| all_things_ai/tutorials/snowflake-cortex-code-nhtsa-safety-intelligence/configuration.json | Provides example configuration for running the connector. |
| "seed_make": "Toyota", | ||
| "seed_model": "Tundra", | ||
| "seed_year": "2022", | ||
| "discovery_depth": "2", | ||
| "max_discoveries": "3", | ||
| "enable_cortex": "true", | ||
| "snowflake_account": "<your-snowflake-account>.snowflakecomputing.com", | ||
| "snowflake_pat_token": "<your-snowflake-pat-token>", | ||
| "cortex_model": "llama3.3-70b", | ||
| "max_enrichments": "10" |
There was a problem hiding this comment.
configuration.json values should all be angle-bracket placeholders (per repo configuration guidelines). Right now seed_make, seed_model, and seed_year are real example values, which violates the required placeholder format and encourages committing non-placeholder configs.
| "seed_make": "Toyota", | |
| "seed_model": "Tundra", | |
| "seed_year": "2022", | |
| "discovery_depth": "2", | |
| "max_discoveries": "3", | |
| "enable_cortex": "true", | |
| "snowflake_account": "<your-snowflake-account>.snowflakecomputing.com", | |
| "snowflake_pat_token": "<your-snowflake-pat-token>", | |
| "cortex_model": "llama3.3-70b", | |
| "max_enrichments": "10" | |
| "seed_make": "<YOUR_VEHICLE_MAKE>", | |
| "seed_model": "<YOUR_VEHICLE_MODEL>", | |
| "seed_year": "<YOUR_VEHICLE_YEAR>", | |
| "discovery_depth": "<DISCOVERY_DEPTH>", | |
| "max_discoveries": "<MAX_DISCOVERIES>", | |
| "enable_cortex": "<ENABLE_CORTEX_TRUE_OR_FALSE>", | |
| "snowflake_account": "<YOUR_SNOWFLAKE_ACCOUNT_HOSTNAME>", | |
| "snowflake_pat_token": "<YOUR_SNOWFLAKE_PAT_TOKEN>", | |
| "cortex_model": "<YOUR_CORTEX_MODEL_NAME>", | |
| "max_enrichments": "<MAX_ENRICHMENTS>" |
There was a problem hiding this comment.
Fixed in commit cffbcd94 — all configuration values now use angle-bracket placeholders including seed_make, seed_model, seed_year, and all numeric/boolean defaults.
| "seed_make": "Toyota", | ||
| "seed_model": "Tundra", | ||
| "seed_year": "2022", | ||
| "discovery_depth": "2", | ||
| "max_discoveries": "3", | ||
| "enable_cortex": "true", | ||
| "snowflake_account": "<your-snowflake-account>.snowflakecomputing.com", | ||
| "snowflake_pat_token": "<your-snowflake-pat-token>", | ||
| "cortex_model": "llama3.3-70b", | ||
| "max_enrichments": "10" |
There was a problem hiding this comment.
configuration.json values like discovery_depth, max_discoveries, enable_cortex, cortex_model, and max_enrichments should also be expressed as descriptive angle-bracket placeholders (not concrete defaults like "2"/"true"/"llama3.3-70b"). This file should be safe to commit without implying recommended production settings.
| "seed_make": "Toyota", | |
| "seed_model": "Tundra", | |
| "seed_year": "2022", | |
| "discovery_depth": "2", | |
| "max_discoveries": "3", | |
| "enable_cortex": "true", | |
| "snowflake_account": "<your-snowflake-account>.snowflakecomputing.com", | |
| "snowflake_pat_token": "<your-snowflake-pat-token>", | |
| "cortex_model": "llama3.3-70b", | |
| "max_enrichments": "10" | |
| "seed_make": "<SEED_VEHICLE_MAKE>", | |
| "seed_model": "<SEED_VEHICLE_MODEL>", | |
| "seed_year": "<SEED_VEHICLE_YEAR>", | |
| "discovery_depth": "<DISCOVERY_DEPTH>", | |
| "max_discoveries": "<MAX_DISCOVERIES>", | |
| "enable_cortex": "<ENABLE_CORTEX_TRUE_OR_FALSE>", | |
| "snowflake_account": "<YOUR_SNOWFLAKE_ACCOUNT>.snowflakecomputing.com", | |
| "snowflake_pat_token": "<YOUR_SNOWFLAKE_PAT_TOKEN>", | |
| "cortex_model": "<CORTEX_MODEL_NAME>", | |
| "max_enrichments": "<MAX_ENRICHMENTS>" |
There was a problem hiding this comment.
Fixed in commit cffbcd94 — discovery_depth, max_discoveries, enable_cortex, cortex_model, and max_enrichments all use descriptive angle-bracket placeholders now.
| "snowflake_account": "<your-snowflake-account>.snowflakecomputing.com", | ||
| "snowflake_pat_token": "<your-snowflake-pat-token>", |
There was a problem hiding this comment.
snowflake_account placeholder should be a single fully-enclosed angle-bracket value (e.g., <YOUR_SNOWFLAKE_ACCOUNT_HOSTNAME>), not a mix of placeholder + literal suffix (<your-snowflake-account>.snowflakecomputing.com). Also prefer consistent, descriptive uppercase placeholders (and similarly for snowflake_pat_token).
| "snowflake_account": "<your-snowflake-account>.snowflakecomputing.com", | |
| "snowflake_pat_token": "<your-snowflake-pat-token>", | |
| "snowflake_account": "<YOUR_SNOWFLAKE_ACCOUNT_HOSTNAME>", | |
| "snowflake_pat_token": "<YOUR_SNOWFLAKE_PAT_TOKEN>", |
There was a problem hiding this comment.
Fixed in commit cffbcd94 — now uses single enclosed placeholder <YOUR_SNOWFLAKE_ACCOUNT_HOSTNAME>.
| configuration: a dictionary that holds the configuration settings for the connector. | ||
| """ | ||
| return [ | ||
| {"table": "recalls", "primary_key": ["nhtsa_campaign_number", "make", "model"]}, |
There was a problem hiding this comment.
The recalls table primary key omits model_year. Since the connector can fetch the same make/model across multiple years (seed + discovered vehicles), the same nhtsa_campaign_number can apply to multiple model years and would overwrite prior rows. Consider including model_year (or another per-vehicle identifier) in the primary key to preserve per-year recall applicability.
| {"table": "recalls", "primary_key": ["nhtsa_campaign_number", "make", "model"]}, | |
| {"table": "recalls", "primary_key": ["nhtsa_campaign_number", "make", "model", "model_year"]}, |
There was a problem hiding this comment.
Fixed in commit cffbcd94 — primary key is now ["nhtsa_campaign_number", "make", "model", "model_year"] to prevent cross-year overwrites when the same campaign spans multiple model years.
| try: | ||
| response = requests.post(url, headers=headers, json=payload, timeout=timeout, stream=True) | ||
| response.raise_for_status() | ||
|
|
||
| agent_response = "" | ||
| for line in response.iter_lines(): | ||
| if line: | ||
| line_text = line.decode("utf-8") | ||
| if line_text.startswith("data: "): | ||
| try: | ||
| data = json.loads(line_text[6:]) | ||
| if not isinstance(data, dict): | ||
| continue | ||
|
|
||
| choices = data.get("choices", []) | ||
| for choice in choices: | ||
| delta = choice.get("delta", {}) | ||
| content = delta.get("content", "") | ||
| if content: | ||
| agent_response += content | ||
|
|
||
| except json.JSONDecodeError: | ||
| continue | ||
|
|
||
| elif line_text.startswith("event: done"): | ||
| break | ||
|
|
||
| if not agent_response: | ||
| log.warning("Empty response from Cortex Agent") | ||
| return None | ||
|
|
||
| # Parse the JSON response, handling potential markdown fences | ||
| cleaned = agent_response.strip() | ||
| if cleaned.startswith("```"): | ||
| # Remove markdown code fences | ||
| lines = cleaned.split("\n") | ||
| lines = [ln for ln in lines if not ln.strip().startswith("```")] | ||
| cleaned = "\n".join(lines).strip() | ||
|
|
||
| return json.loads(cleaned) | ||
|
|
||
| except requests.exceptions.Timeout: | ||
| log.warning(f"Cortex Agent timeout after {timeout}s") | ||
| return None | ||
| except json.JSONDecodeError as e: | ||
| log.warning(f"Failed to parse Cortex Agent response as JSON: {e}") | ||
| log.warning(f"Raw response: {agent_response[:500]}") | ||
| return None | ||
| except Exception as e: | ||
| log.warning(f"Cortex Agent error: {e}") | ||
| return None | ||
|
|
||
|
|
There was a problem hiding this comment.
call_cortex_agent() makes a single requests.post(...) call with no retry/backoff handling for transient failures (429/5xx, connection resets, etc.). For resiliency (and to match the retry logic used for NHTSA calls), add bounded exponential backoff retries for retryable status codes and requests transient exceptions, and fail fast on auth errors (401/403).
| try: | |
| response = requests.post(url, headers=headers, json=payload, timeout=timeout, stream=True) | |
| response.raise_for_status() | |
| agent_response = "" | |
| for line in response.iter_lines(): | |
| if line: | |
| line_text = line.decode("utf-8") | |
| if line_text.startswith("data: "): | |
| try: | |
| data = json.loads(line_text[6:]) | |
| if not isinstance(data, dict): | |
| continue | |
| choices = data.get("choices", []) | |
| for choice in choices: | |
| delta = choice.get("delta", {}) | |
| content = delta.get("content", "") | |
| if content: | |
| agent_response += content | |
| except json.JSONDecodeError: | |
| continue | |
| elif line_text.startswith("event: done"): | |
| break | |
| if not agent_response: | |
| log.warning("Empty response from Cortex Agent") | |
| return None | |
| # Parse the JSON response, handling potential markdown fences | |
| cleaned = agent_response.strip() | |
| if cleaned.startswith("```"): | |
| # Remove markdown code fences | |
| lines = cleaned.split("\n") | |
| lines = [ln for ln in lines if not ln.strip().startswith("```")] | |
| cleaned = "\n".join(lines).strip() | |
| return json.loads(cleaned) | |
| except requests.exceptions.Timeout: | |
| log.warning(f"Cortex Agent timeout after {timeout}s") | |
| return None | |
| except json.JSONDecodeError as e: | |
| log.warning(f"Failed to parse Cortex Agent response as JSON: {e}") | |
| log.warning(f"Raw response: {agent_response[:500]}") | |
| return None | |
| except Exception as e: | |
| log.warning(f"Cortex Agent error: {e}") | |
| return None | |
| agent_response = "" | |
| for attempt in range(__MAX_RETRIES): | |
| try: | |
| response = requests.post(url, headers=headers, json=payload, timeout=timeout, stream=True) | |
| if response.status_code in (401, 403): | |
| log.warning(f"Cortex Agent authentication failed with status {response.status_code}") | |
| return None | |
| if response.status_code in __RETRYABLE_STATUS_CODES: | |
| if attempt == __MAX_RETRIES - 1: | |
| log.warning( | |
| f"Cortex Agent request failed with retryable status {response.status_code} " | |
| f"after {__MAX_RETRIES} attempts" | |
| ) | |
| return None | |
| delay_seconds = min(60, __BASE_DELAY_SECONDS * (2 ** attempt)) | |
| log.warning( | |
| f"Cortex Agent request returned retryable status {response.status_code}; " | |
| f"retrying in {delay_seconds}s (attempt {attempt + 1}/{__MAX_RETRIES})" | |
| ) | |
| time.sleep(delay_seconds) | |
| continue | |
| response.raise_for_status() | |
| agent_response = "" | |
| for line in response.iter_lines(): | |
| if line: | |
| line_text = line.decode("utf-8") | |
| if line_text.startswith("data: "): | |
| try: | |
| data = json.loads(line_text[6:]) | |
| if not isinstance(data, dict): | |
| continue | |
| choices = data.get("choices", []) | |
| for choice in choices: | |
| delta = choice.get("delta", {}) | |
| content = delta.get("content", "") | |
| if content: | |
| agent_response += content | |
| except json.JSONDecodeError: | |
| continue | |
| elif line_text.startswith("event: done"): | |
| break | |
| if not agent_response: | |
| log.warning("Empty response from Cortex Agent") | |
| return None | |
| # Parse the JSON response, handling potential markdown fences | |
| cleaned = agent_response.strip() | |
| if cleaned.startswith("```"): | |
| # Remove markdown code fences | |
| lines = cleaned.split("\n") | |
| lines = [ln for ln in lines if not ln.strip().startswith("```")] | |
| cleaned = "\n".join(lines).strip() | |
| return json.loads(cleaned) | |
| except requests.exceptions.Timeout: | |
| if attempt == __MAX_RETRIES - 1: | |
| log.warning(f"Cortex Agent timeout after {timeout}s on final attempt") | |
| return None | |
| delay_seconds = min(60, __BASE_DELAY_SECONDS * (2 ** attempt)) | |
| log.warning( | |
| f"Cortex Agent timeout after {timeout}s; retrying in {delay_seconds}s " | |
| f"(attempt {attempt + 1}/{__MAX_RETRIES})" | |
| ) | |
| time.sleep(delay_seconds) | |
| except requests.exceptions.ConnectionError as e: | |
| if attempt == __MAX_RETRIES - 1: | |
| log.warning(f"Cortex Agent connection error after {__MAX_RETRIES} attempts: {e}") | |
| return None | |
| delay_seconds = min(60, __BASE_DELAY_SECONDS * (2 ** attempt)) | |
| log.warning( | |
| f"Cortex Agent connection error: {e}; retrying in {delay_seconds}s " | |
| f"(attempt {attempt + 1}/{__MAX_RETRIES})" | |
| ) | |
| time.sleep(delay_seconds) | |
| except requests.exceptions.HTTPError as e: | |
| status_code = e.response.status_code if e.response is not None else None | |
| log.warning(f"Cortex Agent HTTP error {status_code}: {e}") | |
| return None | |
| except requests.exceptions.RequestException as e: | |
| if attempt == __MAX_RETRIES - 1: | |
| log.warning(f"Cortex Agent request failed after {__MAX_RETRIES} attempts: {e}") | |
| return None | |
| delay_seconds = min(60, __BASE_DELAY_SECONDS * (2 ** attempt)) | |
| log.warning( | |
| f"Cortex Agent transient request error: {e}; retrying in {delay_seconds}s " | |
| f"(attempt {attempt + 1}/{__MAX_RETRIES})" | |
| ) | |
| time.sleep(delay_seconds) | |
| except json.JSONDecodeError as e: | |
| log.warning(f"Failed to parse Cortex Agent response as JSON: {e}") | |
| log.warning(f"Raw response: {agent_response[:500]}") | |
| return None | |
| return None |
There was a problem hiding this comment.
Fixed in commit cffbcd94 — call_cortex_agent() now has a bounded retry loop (3 attempts) with exponential backoff for retryable status codes (429, 500, 502, 503, 504), connection errors, and timeouts. Matches the retry pattern used for NHTSA API calls.
| except Exception as e: | ||
| log.warning(f"Cortex Agent error: {e}") |
There was a problem hiding this comment.
The final except Exception as e: in call_cortex_agent() will also swallow non-network programming errors (e.g., unexpected TypeError), making failures hard to diagnose. Prefer catching requests.exceptions.RequestException (and ValueError/json.JSONDecodeError as needed) and letting unexpected exceptions propagate or be logged as log.severe and re-raised.
| except Exception as e: | |
| log.warning(f"Cortex Agent error: {e}") | |
| except requests.exceptions.RequestException as e: | |
| log.warning(f"Cortex Agent request failed: {e}") |
There was a problem hiding this comment.
Fixed in commit cffbcd94 — removed generic except Exception. Now catches requests.exceptions.HTTPError, ConnectionError, Timeout, RequestException, and json.JSONDecodeError separately with appropriate handling for each. Unexpected errors will propagate.
| # Fetch data for agent-recommended vehicles | ||
| recommended = discovery_result.get("recommended_vehicles", []) | ||
| vehicles_to_fetch = recommended[:max_discoveries] | ||
|
|
||
| log.info(f"Agent recommended {len(recommended)} vehicles, fetching {len(vehicles_to_fetch)}") | ||
|
|
||
| for vehicle in vehicles_to_fetch: | ||
| v_make = vehicle.get("make", "") |
There was a problem hiding this comment.
recommended_vehicles is assumed to be a list (sliced and iterated) but comes from an LLM response, so it can easily be the wrong type (string/dict/null) and crash the sync. Add a type check (ensure list of dicts) before slicing/looping; otherwise treat as empty and skip discovery fetches.
There was a problem hiding this comment.
Fixed in commit cffbcd94 — recommended_vehicles is now validated as a list before iteration, and each element is validated as a dict before accessing keys. Returns early with a warning if the LLM returns an unexpected type.
| all_recalls = list(seed_recalls) | ||
| all_complaints = list(seed_complaints) | ||
| vehicles_investigated = [(seed_make, seed_model, seed_year)] |
There was a problem hiding this comment.
run_discovery_phase() materializes all_recalls/all_complaints as full in-memory lists and keeps extending them for discovered vehicles, but the synthesis prompt only needs aggregate counts/component frequencies. This can grow large (complaints especially) and is avoidable—consider tracking aggregates instead of storing every raw record in memory.
There was a problem hiding this comment.
Fixed in commit cffbcd94 — replaced all_recalls/all_complaints in-memory lists with recall_components/complaint_components frequency dicts and aggregate counters (total_recalls, total_complaints, crash_count, fire_count). The synthesis prompt now uses pre-computed aggregates via _build_aggregates() instead of iterating full record lists.
| except Exception as e: | ||
| log.warning( | ||
| f"Failed to fetch data for discovered vehicle " | ||
| f"{v_make} {v_model} {v_year}: {e}. Skipping." | ||
| ) |
There was a problem hiding this comment.
The per-vehicle fetch block uses except Exception which can hide programming errors and silently skip vehicles. Prefer catching the specific expected failures (e.g., RuntimeError from fetch_data_with_retry / requests.exceptions.RequestException) and either re-raise unexpected exceptions or log them as log.severe with enough context for debugging.
| except Exception as e: | |
| log.warning( | |
| f"Failed to fetch data for discovered vehicle " | |
| f"{v_make} {v_model} {v_year}: {e}. Skipping." | |
| ) | |
| except (RuntimeError, requests.exceptions.RequestException) as e: | |
| log.warning( | |
| f"Failed to fetch data for discovered vehicle " | |
| f"{v_make} {v_model} {v_year}: {e}. Skipping." | |
| ) | |
| except Exception as e: | |
| log.severe( | |
| f"Unexpected error while processing discovered vehicle " | |
| f"{v_make} {v_model} {v_year}: {e}" | |
| ) | |
| raise |
There was a problem hiding this comment.
Fixed in commit cffbcd94 — now catches (RuntimeError, requests.exceptions.RequestException) specifically instead of generic Exception. Unexpected programming errors will propagate rather than being silently swallowed.
- All configuration.json values now use angle-bracket placeholders - Enforce max_enrichments budget with counter before each Cortex call - Add model_year to recalls primary key to prevent cross-year overwrites - Add retry logic with exponential backoff to call_cortex_agent() - Replace generic except Exception with specific exception types - Type-check recommended_vehicles from LLM output (validate list of dicts) - Replace unbounded in-memory lists with aggregate component tracking - Per-vehicle fetch catches RuntimeError/RequestException, not Exception - Switch default Cortex model from llama3.3-70b to claude-sonnet-4-6 - Update README to match all code changes Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
All 10 Copilot review comments addressed in commit Commit
SDK Version: 2.25.1230.001
Raw
|
…README updates Inspired by @fivetran-anushkaparashar's review feedback on PR fivetran#562 (NVD/CVE), applying the same improvements proactively to the NHTSA connector: - Add _create_cortex_session() for TCP connection pooling across discovery and synthesis Cortex calls (separate from NHTSA session, different auth) - call_cortex_agent() accepts optional cortex_session parameter - Streaming responses explicitly closed via response.close() in finally block - Cortex session created in update(), shared across both phases, closed after - README error handling section updated to document connection pooling and response cleanup - Full test harness passed (5 scenarios) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Proactive improvements in commit
Pre-Submission Test Harness Results
Raw
|
fivetran-sahilkhirwal
left a comment
There was a problem hiding this comment.
looks good
|
Nit: Thanks a lot for contributing multiple cortex examples to the repository. we can create a directory in the tutorials titled |
|
I love that idea! |
Summary
Adds a new AI-enriched connector in
all_things_ai/tutorials/snowflake-cortex-code-nhtsa-safety-intelligence/that demonstrates Agent-Driven Discovery -- a pattern where Snowflake Cortex COMPLETE autonomously guides data collection during Fivetran ingestion.Built entirely with Snowflake Cortex Code.
What it does
Key details
recalls,complaints,vehicle_specs,discovery_insights,safety_analysis/api/v2/cortex/inference:complete)requests(provided by Fivetran runtime)Files
connector.pyconfiguration.jsonREADME.mdTesting
Deployed and synced successfully via Fivetran production (connection
arguing_dearly, packagesulfate_implant).fivetran debug output (935 upserts, SYNC SUCCEEDED)
Checklist
yield)op.upsert()andop.checkpoint()callsschema()andupdate()log.warning("Example: ...")as first statement inupdate()validate_configuration()validates all required fieldsblackpasses,flake8clean (E501 only on template text, matching livestock connector)requirements.txt(only usesrequestsfrom runtime)