-
Notifications
You must be signed in to change notification settings - Fork 8
feat: track model execution status with failure icon in explorer #54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
4905390
55332f7
7f56ecb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| from django.db import migrations, models | ||
|
|
||
|
|
||
| class Migration(migrations.Migration): | ||
|
|
||
| dependencies = [ | ||
| ("core", "0002_seed_data"), | ||
| ] | ||
|
|
||
| operations = [ | ||
| migrations.AddField( | ||
| model_name="configmodels", | ||
| name="run_status", | ||
| field=models.CharField( | ||
| choices=[ | ||
| ("NOT_STARTED", "Not Started"), | ||
| ("RUNNING", "Running"), | ||
| ("SUCCESS", "Success"), | ||
| ("FAILED", "Failed"), | ||
| ], | ||
| default="NOT_STARTED", | ||
| help_text="Current execution status of the model", | ||
| max_length=20, | ||
| ), | ||
| ), | ||
| migrations.AddField( | ||
| model_name="configmodels", | ||
| name="failure_reason", | ||
| field=models.TextField( | ||
| blank=True, | ||
| help_text="Error message if the model execution failed", | ||
| null=True, | ||
| ), | ||
| ), | ||
| migrations.AddField( | ||
| model_name="configmodels", | ||
| name="last_run_at", | ||
| field=models.DateTimeField( | ||
| blank=True, | ||
| help_text="Timestamp of the last execution", | ||
| null=True, | ||
| ), | ||
| ), | ||
| migrations.AddField( | ||
| model_name="configmodels", | ||
| name="run_duration", | ||
| field=models.FloatField( | ||
| blank=True, | ||
| help_text="Duration of last execution in seconds", | ||
| null=True, | ||
| ), | ||
| ), | ||
| ] | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -57,12 +57,17 @@ def execute_run_command(request: Request, project_id: str) -> Response: | |||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||
| logger.info(f"[execute_run_command] API called - project_id={project_id}, file_name={file_name}, environment_id={environment_id}") | ||||||||||||||||||||||||||||||||||||||||||||||||
| app = ApplicationContext(project_id=project_id) | ||||||||||||||||||||||||||||||||||||||||||||||||
| app.execute_visitran_run_command(current_model=file_name, environment_id=environment_id) | ||||||||||||||||||||||||||||||||||||||||||||||||
| app.visitran_context.close_db_connection() | ||||||||||||||||||||||||||||||||||||||||||||||||
| app.backup_current_no_code_model() | ||||||||||||||||||||||||||||||||||||||||||||||||
| logger.info(f"[execute_run_command] Completed successfully for file_name={file_name}") | ||||||||||||||||||||||||||||||||||||||||||||||||
| _data = {"status": "success"} | ||||||||||||||||||||||||||||||||||||||||||||||||
| return Response(data=_data) | ||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||
| app.execute_visitran_run_command(current_model=file_name, environment_id=environment_id) | ||||||||||||||||||||||||||||||||||||||||||||||||
| app.visitran_context.close_db_connection() | ||||||||||||||||||||||||||||||||||||||||||||||||
| app.backup_current_no_code_model() | ||||||||||||||||||||||||||||||||||||||||||||||||
| logger.info(f"[execute_run_command] Completed successfully for file_name={file_name}") | ||||||||||||||||||||||||||||||||||||||||||||||||
| _data = {"status": "success"} | ||||||||||||||||||||||||||||||||||||||||||||||||
| return Response(data=_data) | ||||||||||||||||||||||||||||||||||||||||||||||||
| except Exception: | ||||||||||||||||||||||||||||||||||||||||||||||||
| logger.exception(f"[execute_run_command] DAG execution failed for file_name={file_name}") | ||||||||||||||||||||||||||||||||||||||||||||||||
| _data = {"status": "failed", "error_message": "Model execution failed. Check server logs for details."} | ||||||||||||||||||||||||||||||||||||||||||||||||
| return Response(data=_data, status=status.HTTP_400_BAD_REQUEST) | ||||||||||||||||||||||||||||||||||||||||||||||||
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Show fixed
Hide fixed
Comment on lines
+60
to
+70
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Prompt To Fix With AIThis is a comment left during a code review.
Path: backend/backend/core/routers/execute/views.py
Line: 60-70
Comment:
**DB connection leaked on execution failure**
`close_db_connection()` is only called in the happy path. When `execute_visitran_run_command` raises, the `except` branch returns immediately without ever closing the connection, leaking it every time a DAG run fails. This should use a `finally` block:
```suggestion
try:
app.execute_visitran_run_command(current_model=file_name, environment_id=environment_id)
app.backup_current_no_code_model()
logger.info(f"[execute_run_command] Completed successfully for file_name={file_name}")
_data = {"status": "success"}
return Response(data=_data)
except Exception:
logger.exception(f"[execute_run_command] DAG execution failed for file_name={file_name}")
_data = {"status": "failed", "error_message": "Model execution failed. Check server logs for details."}
return Response(data=_data, status=status.HTTP_400_BAD_REQUEST)
finally:
app.visitran_context.close_db_connection()
```
How can I resolve this? If you propose a fix, please make it concise. |
||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ | |
|
|
||
| import concurrent.futures | ||
| import datetime | ||
| import time | ||
| import importlib | ||
| import logging | ||
| import re | ||
|
|
@@ -15,6 +16,7 @@ | |
|
|
||
| import ibis | ||
| import networkx as nx | ||
| from django.utils import timezone | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The PR description explicitly states "CLI mode (no Django) is handled — To protect CLI mode as stated, wrap both imports: try:
from django.utils import timezone
from backend.core.models.config_models import ConfigModels
_DJANGO_AVAILABLE = True
except ImportError:
_DJANGO_AVAILABLE = False
timezone = None
ConfigModels = NoneThen gate Prompt To Fix With AIThis is a comment left during a code review.
Path: backend/visitran/visitran.py
Line: 19
Comment:
**Unconditional Django imports break CLI mode**
`from django.utils import timezone` (and `from backend.core.models.config_models import ConfigModels` at line 76) are top-level, unconditional imports. Before this PR, `visitran.py` had no Django imports. Any caller that uses visitran in CLI mode without Django configured will now get a `ModuleNotFoundError` or `django.core.exceptions.ImproperlyConfigured` at module load time — before any try/except can help.
The PR description explicitly states *"CLI mode (no Django) is handled — `ConfigModels` import is wrapped in try/except"*, but neither import has a guard in the code. The try/except inside `_update_model_status` only catches runtime failures, not import-time failures.
To protect CLI mode as stated, wrap both imports:
```python
try:
from django.utils import timezone
from backend.core.models.config_models import ConfigModels
_DJANGO_AVAILABLE = True
except ImportError:
_DJANGO_AVAILABLE = False
timezone = None
ConfigModels = None
```
Then gate `_update_model_status` on `_DJANGO_AVAILABLE`.
How can I resolve this? If you propose a fix, please make it concise. |
||
| from visitran import utils | ||
| from visitran.adapters.adapter import BaseAdapter | ||
| from visitran.adapters.seed import BaseSeed | ||
|
|
@@ -71,6 +73,8 @@ | |
| from visitran.templates.model import VisitranModel | ||
| from visitran.templates.snapshot import VisitranSnapshot | ||
|
|
||
| from backend.core.models.config_models import ConfigModels | ||
|
|
||
| warnings.filterwarnings("ignore", message=".*?pkg_resources.*?") | ||
| from matplotlib import pyplot as plt # noqa: E402 | ||
|
|
||
|
|
@@ -228,6 +232,53 @@ def sort_func(node_key: str): | |
| self.sorted_dag_nodes = list(nx.lexicographical_topological_sort(self.dag, key=sort_func)) | ||
| fire_event(SortedDAGNodes(sorted_dag_nodes=str(self.sorted_dag_nodes))) | ||
|
|
||
| def _update_model_status( | ||
| self, | ||
| model_name: str, | ||
| run_status: str, | ||
| failure_reason: str = None, | ||
| run_duration: float = None, | ||
| ) -> None: | ||
| """Update the run status of a model in the database.""" | ||
| try: | ||
| # node_name str looks like: "<class 'project.models.stg_order_summaries.StgOrderSummaries'>" | ||
| # ConfigModels.model_name stores the module/file name (e.g. 'stg_order_summaries'), | ||
| # which is the second-to-last dotted segment — not the CamelCase class name. | ||
| class_name = model_name.split("'")[1].split(".")[-2] if "'" in model_name else model_name | ||
|
|
||
| session = getattr(self.context, "session", None) | ||
| if not session: | ||
| raise ValueError( | ||
| f"Cannot update status for model '{class_name}': no session on execution context" | ||
| ) | ||
|
|
||
| project_id = session.project_id | ||
| if not project_id: | ||
| raise ValueError( | ||
| f"Cannot update status for model '{class_name}': session has no project_id" | ||
| ) | ||
|
|
||
| model_instance = ConfigModels.objects.get( | ||
| project_instance__project_uuid=project_id, | ||
| model_name=class_name, | ||
| ) | ||
| model_instance.run_status = run_status | ||
| model_instance.last_run_at = timezone.now() | ||
|
|
||
| if run_status == ConfigModels.RunStatus.FAILED: | ||
| model_instance.failure_reason = failure_reason | ||
| elif run_status == ConfigModels.RunStatus.SUCCESS: | ||
| model_instance.failure_reason = None | ||
|
|
||
| update_fields = ["run_status", "last_run_at", "failure_reason"] | ||
| if run_duration is not None: | ||
| model_instance.run_duration = run_duration | ||
| update_fields.append("run_duration") | ||
|
|
||
| model_instance.save(update_fields=update_fields) | ||
| except Exception: | ||
| logging.exception(f"Failed to update model status for {model_name}") | ||
|
|
||
| def execute_graph(self) -> None: | ||
| """Executes the sorted DAG elements one by one.""" | ||
| dag_nodes = self.sorted_dag_nodes | ||
|
|
@@ -236,7 +287,11 @@ def execute_graph(self) -> None: | |
| node_name: VisitranModel = dag_nodes.pop(0) | ||
| node = self.dag.nodes[node_name]["model_object"] | ||
| is_executable = self.dag.nodes[node_name].get("executable", True) | ||
| start_time = time.monotonic() | ||
| try: | ||
| if is_executable: | ||
| self._update_model_status(str(node_name), ConfigModels.RunStatus.RUNNING) | ||
|
|
||
| # Apply model_configs override from deployment configuration | ||
| self._apply_model_config_override(node) | ||
|
|
||
|
|
@@ -270,6 +325,12 @@ def execute_graph(self) -> None: | |
| self.db_adapter.db_connection.create_schema(node.destination_schema_name) # create if not exists | ||
| self.db_adapter.run_model(visitran_model=node) | ||
|
|
||
| self._update_model_status( | ||
| str(node_name), | ||
| ConfigModels.RunStatus.SUCCESS, | ||
| run_duration=time.monotonic() - start_time, | ||
| ) | ||
|
|
||
| base_result = BaseResult( | ||
| node_name=str(node_name), | ||
| sequence_num=sequence_number, | ||
|
|
@@ -282,11 +343,24 @@ def execute_graph(self) -> None: | |
| sequence_number += 1 | ||
| BASE_RESULT.append(base_result) | ||
| except VisitranBaseExceptions as visitran_err: | ||
| self._update_model_status( | ||
| str(node_name), | ||
| ConfigModels.RunStatus.FAILED, | ||
| failure_reason=str(visitran_err), | ||
| run_duration=time.monotonic() - start_time, | ||
| ) | ||
tahierhussain marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| raise visitran_err | ||
| except Exception as err: | ||
| dest_table = node.destination_table_name | ||
| sch_name = node.destination_schema_name | ||
| err_trace = repr(err) | ||
|
|
||
| self._update_model_status( | ||
| str(node_name), | ||
| ConfigModels.RunStatus.FAILED, | ||
| failure_reason=err_trace, | ||
| run_duration=time.monotonic() - start_time, | ||
| ) | ||
| base_result = BaseResult( | ||
| node_name=str(node_name), | ||
| sequence_num=sequence_number, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@abhizipstack Can you please address this comment in this PR?
Context: We need to make the required update in the
project_detailsmodel and add the corresponding migration file changes as well.#55 (comment)