Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions backend/backend/application/file_explorer/file_explorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,25 @@ def load_models(self, session: Session):
# Sort models by execution order (DAG order)
sorted_model_names = topological_sort_models(models_with_refs)

# Build a lookup from model name -> model object for status fields
model_lookup = {m.model_name: m for m in all_models}

# Build the model structure in sorted order
no_code_model_structure = []
for no_code_model_name in sorted_model_names:
no_code_model_structure.append(
{
"extension": no_code_model_name,
"title": no_code_model_name,
"key": f"{self.project_name}/models/no_code/{no_code_model_name}",
"is_folder": False,
"type": "NO_CODE_MODEL",
}
)
model = model_lookup.get(no_code_model_name)
model_data = {
"extension": no_code_model_name,
"title": no_code_model_name,
"key": f"{self.project_name}/models/no_code/{no_code_model_name}",
"is_folder": False,
"type": "NO_CODE_MODEL",
"run_status": getattr(model, "run_status", None),
"failure_reason": getattr(model, "failure_reason", None),
"last_run_at": model.last_run_at.isoformat() if getattr(model, "last_run_at", None) else None,
"run_duration": getattr(model, "run_duration", None),
}
no_code_model_structure.append(model_data)
model_structure: dict[str, Any] = {
"title": "models",
"key": f"{self.project_name}/models",
Expand Down
53 changes: 53 additions & 0 deletions backend/backend/core/migrations/0003_add_model_run_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from django.db import migrations, models
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abhizipstack Can you please address this comment in this PR?

Context: We need to make the required update in the project_details model and add the corresponding migration file changes as well.

#55 (comment)



class Migration(migrations.Migration):

dependencies = [
("core", "0002_seed_data"),
]

operations = [
migrations.AddField(
model_name="configmodels",
name="run_status",
field=models.CharField(
choices=[
("NOT_STARTED", "Not Started"),
("RUNNING", "Running"),
("SUCCESS", "Success"),
("FAILED", "Failed"),
],
default="NOT_STARTED",
help_text="Current execution status of the model",
max_length=20,
),
),
migrations.AddField(
model_name="configmodels",
name="failure_reason",
field=models.TextField(
blank=True,
help_text="Error message if the model execution failed",
null=True,
),
),
migrations.AddField(
model_name="configmodels",
name="last_run_at",
field=models.DateTimeField(
blank=True,
help_text="Timestamp of the last execution",
null=True,
),
),
migrations.AddField(
model_name="configmodels",
name="run_duration",
field=models.FloatField(
blank=True,
help_text="Duration of last execution in seconds",
null=True,
),
),
]
29 changes: 29 additions & 0 deletions backend/backend/core/models/config_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ class ConfigModels(DefaultOrganizationMixin, BaseModel):
This model is used to store the no code models.
"""

class RunStatus(models.TextChoices):
NOT_STARTED = "NOT_STARTED", "Not Started"
RUNNING = "RUNNING", "Running"
SUCCESS = "SUCCESS", "Success"
FAILED = "FAILED", "Failed"

def get_model_upload_path(self, filename: str) -> str:
"""
This returns the file path based on the org and project dynamically.
Expand Down Expand Up @@ -94,6 +100,29 @@ class Meta:
last_modified_by = models.JSONField(default=dict)
last_modified_at = models.DateTimeField(auto_now=True)

# Execution status tracking
run_status = models.CharField(
max_length=20,
choices=RunStatus.choices,
default=RunStatus.NOT_STARTED,
help_text="Current execution status of the model",
)
failure_reason = models.TextField(
null=True,
blank=True,
help_text="Error message if the model execution failed",
)
last_run_at = models.DateTimeField(
null=True,
blank=True,
help_text="Timestamp of the last execution",
)
run_duration = models.FloatField(
null=True,
blank=True,
help_text="Duration of last execution in seconds",
)

# Current Manager
config_objects = models.Manager()

Expand Down
17 changes: 11 additions & 6 deletions backend/backend/core/routers/execute/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,17 @@ def execute_run_command(request: Request, project_id: str) -> Response:
)
logger.info(f"[execute_run_command] API called - project_id={project_id}, file_name={file_name}, environment_id={environment_id}")
app = ApplicationContext(project_id=project_id)
app.execute_visitran_run_command(current_model=file_name, environment_id=environment_id)
app.visitran_context.close_db_connection()
app.backup_current_no_code_model()
logger.info(f"[execute_run_command] Completed successfully for file_name={file_name}")
_data = {"status": "success"}
return Response(data=_data)
try:
app.execute_visitran_run_command(current_model=file_name, environment_id=environment_id)
app.visitran_context.close_db_connection()
app.backup_current_no_code_model()
logger.info(f"[execute_run_command] Completed successfully for file_name={file_name}")
_data = {"status": "success"}
return Response(data=_data)
except Exception:
logger.exception(f"[execute_run_command] DAG execution failed for file_name={file_name}")
_data = {"status": "failed", "error_message": "Model execution failed. Check server logs for details."}
return Response(data=_data, status=status.HTTP_400_BAD_REQUEST)
Comment on lines +60 to +70
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 DB connection leaked on execution failure

close_db_connection() is only called in the happy path. When execute_visitran_run_command raises, the except branch returns immediately without ever closing the connection, leaking it every time a DAG run fails. This should use a finally block:

Suggested change
try:
app.execute_visitran_run_command(current_model=file_name, environment_id=environment_id)
app.visitran_context.close_db_connection()
app.backup_current_no_code_model()
logger.info(f"[execute_run_command] Completed successfully for file_name={file_name}")
_data = {"status": "success"}
return Response(data=_data)
except Exception:
logger.exception(f"[execute_run_command] DAG execution failed for file_name={file_name}")
_data = {"status": "failed", "error_message": "Model execution failed. Check server logs for details."}
return Response(data=_data, status=status.HTTP_400_BAD_REQUEST)
try:
app.execute_visitran_run_command(current_model=file_name, environment_id=environment_id)
app.backup_current_no_code_model()
logger.info(f"[execute_run_command] Completed successfully for file_name={file_name}")
_data = {"status": "success"}
return Response(data=_data)
except Exception:
logger.exception(f"[execute_run_command] DAG execution failed for file_name={file_name}")
_data = {"status": "failed", "error_message": "Model execution failed. Check server logs for details."}
return Response(data=_data, status=status.HTTP_400_BAD_REQUEST)
finally:
app.visitran_context.close_db_connection()
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/backend/core/routers/execute/views.py
Line: 60-70

Comment:
**DB connection leaked on execution failure**

`close_db_connection()` is only called in the happy path. When `execute_visitran_run_command` raises, the `except` branch returns immediately without ever closing the connection, leaking it every time a DAG run fails. This should use a `finally` block:

```suggestion
    try:
        app.execute_visitran_run_command(current_model=file_name, environment_id=environment_id)
        app.backup_current_no_code_model()
        logger.info(f"[execute_run_command] Completed successfully for file_name={file_name}")
        _data = {"status": "success"}
        return Response(data=_data)
    except Exception:
        logger.exception(f"[execute_run_command] DAG execution failed for file_name={file_name}")
        _data = {"status": "failed", "error_message": "Model execution failed. Check server logs for details."}
        return Response(data=_data, status=status.HTTP_400_BAD_REQUEST)
    finally:
        app.visitran_context.close_db_connection()
```

How can I resolve this? If you propose a fix, please make it concise.




Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def wrapped(view_or_request, *args, **kwargs):

except Exception as e:
Logger.exception("Error executing view function")
raise

return response

Expand Down
74 changes: 74 additions & 0 deletions backend/visitran/visitran.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import concurrent.futures
import datetime
import time
import importlib
import logging
import re
Expand All @@ -15,6 +16,7 @@

import ibis
import networkx as nx
from django.utils import timezone
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Unconditional Django imports break CLI mode

from django.utils import timezone (and from backend.core.models.config_models import ConfigModels at line 76) are top-level, unconditional imports. Before this PR, visitran.py had no Django imports. Any caller that uses visitran in CLI mode without Django configured will now get a ModuleNotFoundError or django.core.exceptions.ImproperlyConfigured at module load time — before any try/except can help.

The PR description explicitly states "CLI mode (no Django) is handled — ConfigModels import is wrapped in try/except", but neither import has a guard in the code. The try/except inside _update_model_status only catches runtime failures, not import-time failures.

To protect CLI mode as stated, wrap both imports:

try:
    from django.utils import timezone
    from backend.core.models.config_models import ConfigModels
    _DJANGO_AVAILABLE = True
except ImportError:
    _DJANGO_AVAILABLE = False
    timezone = None
    ConfigModels = None

Then gate _update_model_status on _DJANGO_AVAILABLE.

Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/visitran/visitran.py
Line: 19

Comment:
**Unconditional Django imports break CLI mode**

`from django.utils import timezone` (and `from backend.core.models.config_models import ConfigModels` at line 76) are top-level, unconditional imports. Before this PR, `visitran.py` had no Django imports. Any caller that uses visitran in CLI mode without Django configured will now get a `ModuleNotFoundError` or `django.core.exceptions.ImproperlyConfigured` at module load time — before any try/except can help.

The PR description explicitly states *"CLI mode (no Django) is handled — `ConfigModels` import is wrapped in try/except"*, but neither import has a guard in the code. The try/except inside `_update_model_status` only catches runtime failures, not import-time failures.

To protect CLI mode as stated, wrap both imports:

```python
try:
    from django.utils import timezone
    from backend.core.models.config_models import ConfigModels
    _DJANGO_AVAILABLE = True
except ImportError:
    _DJANGO_AVAILABLE = False
    timezone = None
    ConfigModels = None
```

Then gate `_update_model_status` on `_DJANGO_AVAILABLE`.

How can I resolve this? If you propose a fix, please make it concise.

from visitran import utils
from visitran.adapters.adapter import BaseAdapter
from visitran.adapters.seed import BaseSeed
Expand Down Expand Up @@ -71,6 +73,8 @@
from visitran.templates.model import VisitranModel
from visitran.templates.snapshot import VisitranSnapshot

from backend.core.models.config_models import ConfigModels

warnings.filterwarnings("ignore", message=".*?pkg_resources.*?")
from matplotlib import pyplot as plt # noqa: E402

Expand Down Expand Up @@ -228,6 +232,53 @@ def sort_func(node_key: str):
self.sorted_dag_nodes = list(nx.lexicographical_topological_sort(self.dag, key=sort_func))
fire_event(SortedDAGNodes(sorted_dag_nodes=str(self.sorted_dag_nodes)))

def _update_model_status(
self,
model_name: str,
run_status: str,
failure_reason: str = None,
run_duration: float = None,
) -> None:
"""Update the run status of a model in the database."""
try:
# node_name str looks like: "<class 'project.models.stg_order_summaries.StgOrderSummaries'>"
# ConfigModels.model_name stores the module/file name (e.g. 'stg_order_summaries'),
# which is the second-to-last dotted segment — not the CamelCase class name.
class_name = model_name.split("'")[1].split(".")[-2] if "'" in model_name else model_name

session = getattr(self.context, "session", None)
if not session:
raise ValueError(
f"Cannot update status for model '{class_name}': no session on execution context"
)

project_id = session.project_id
if not project_id:
raise ValueError(
f"Cannot update status for model '{class_name}': session has no project_id"
)

model_instance = ConfigModels.objects.get(
project_instance__project_uuid=project_id,
model_name=class_name,
)
model_instance.run_status = run_status
model_instance.last_run_at = timezone.now()

if run_status == ConfigModels.RunStatus.FAILED:
model_instance.failure_reason = failure_reason
elif run_status == ConfigModels.RunStatus.SUCCESS:
model_instance.failure_reason = None

update_fields = ["run_status", "last_run_at", "failure_reason"]
if run_duration is not None:
model_instance.run_duration = run_duration
update_fields.append("run_duration")

model_instance.save(update_fields=update_fields)
except Exception:
logging.exception(f"Failed to update model status for {model_name}")

def execute_graph(self) -> None:
"""Executes the sorted DAG elements one by one."""
dag_nodes = self.sorted_dag_nodes
Expand All @@ -236,7 +287,11 @@ def execute_graph(self) -> None:
node_name: VisitranModel = dag_nodes.pop(0)
node = self.dag.nodes[node_name]["model_object"]
is_executable = self.dag.nodes[node_name].get("executable", True)
start_time = time.monotonic()
try:
if is_executable:
self._update_model_status(str(node_name), ConfigModels.RunStatus.RUNNING)

# Apply model_configs override from deployment configuration
self._apply_model_config_override(node)

Expand Down Expand Up @@ -270,6 +325,12 @@ def execute_graph(self) -> None:
self.db_adapter.db_connection.create_schema(node.destination_schema_name) # create if not exists
self.db_adapter.run_model(visitran_model=node)

self._update_model_status(
str(node_name),
ConfigModels.RunStatus.SUCCESS,
run_duration=time.monotonic() - start_time,
)

base_result = BaseResult(
node_name=str(node_name),
sequence_num=sequence_number,
Expand All @@ -282,11 +343,24 @@ def execute_graph(self) -> None:
sequence_number += 1
BASE_RESULT.append(base_result)
except VisitranBaseExceptions as visitran_err:
self._update_model_status(
str(node_name),
ConfigModels.RunStatus.FAILED,
failure_reason=str(visitran_err),
run_duration=time.monotonic() - start_time,
)
raise visitran_err
except Exception as err:
dest_table = node.destination_table_name
sch_name = node.destination_schema_name
err_trace = repr(err)

self._update_model_status(
str(node_name),
ConfigModels.RunStatus.FAILED,
failure_reason=err_trace,
run_duration=time.monotonic() - start_time,
)
base_result = BaseResult(
node_name=str(node_name),
sequence_num=sequence_number,
Expand Down
2 changes: 2 additions & 0 deletions frontend/src/ide/editor/no-code-model/no-code-model.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -1602,6 +1602,7 @@ function NoCodeModel({ nodeData }) {
axios(requestOptions)
.then(() => {
getSampleData(undefined, undefined, spec);
setRefreshModels(true);
})
.catch((error) => {
const notifKey = notify({
Expand Down Expand Up @@ -1636,6 +1637,7 @@ function NoCodeModel({ nodeData }) {
});
setTransformationErrorFlag(true);
setIsLoading(false);
setRefreshModels(true);
});
};

Expand Down
Loading
Loading