Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions dashboard/backend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,72 @@ def _cors_allowed_origins():
except Exception:
pass
_conn.commit()

# --- WhatsApp retry pattern: idempotency_key + error_category + last_replay_at (PR-1 2026-05-11) ---
# Rollback: DROP INDEX uq_trigger_idem; DROP INDEX ix_trigger_executions_idem_key
# Columns are nullable — old code ignores them without breaking.
_te_cols = {row[1] for row in _cur.execute("PRAGMA table_info(trigger_executions)").fetchall()}
if "idempotency_key" not in _te_cols:
_cur.execute("ALTER TABLE trigger_executions ADD COLUMN idempotency_key TEXT")
_conn.commit()
if "error_category" not in _te_cols:
_cur.execute("ALTER TABLE trigger_executions ADD COLUMN error_category TEXT")
_conn.commit()
if "last_replay_at" not in _te_cols:
_cur.execute("ALTER TABLE trigger_executions ADD COLUMN last_replay_at TIMESTAMP")
_conn.commit()
# Basic index for idempotency lookups by key alone
try:
_cur.execute(
"CREATE INDEX IF NOT EXISTS ix_trigger_executions_idem_key "
"ON trigger_executions (idempotency_key)"
)
_conn.commit()
except Exception as exc:
# Não silenciar: se este índice não for criado, a dedup cai pra
# caminho app-only (mais lento e com janela de race). Logar com a
# versão do SQLite ajuda a diagnosticar incompatibilidade.
import logging
import sqlite3 as _sqlite3
_lib_ver = getattr(_sqlite3, "sqlite_version", None)
try:
_rt_ver = _cur.execute("SELECT sqlite_version()").fetchone()[0]
except Exception:
_rt_ver = None
logging.getLogger(__name__).warning(
"Failed to create ix_trigger_executions_idem_key — falling back "
"to app-level idempotency only. sqlite_lib=%r sqlite_runtime=%r err=%r",
_lib_ver, _rt_ver, exc,
)
# Partial unique index: enforces (trigger_id, idempotency_key) uniqueness only when key IS NOT NULL.
# SQLite >= 3.8 supports partial indices natively; our runtime is 3.51 (confirmed).
# This is the DB-level guard against race-condition duplicates (Step 2 handles app-level dedup).
try:
_cur.execute(
"CREATE UNIQUE INDEX IF NOT EXISTS uq_trigger_idem "
"ON trigger_executions (trigger_id, idempotency_key) "
"WHERE idempotency_key IS NOT NULL"
)
_conn.commit()
except Exception as exc:
# Mesmo tratamento: se o partial unique não for criado, perdemos a
# garantia DB-level contra duplicate race — webhook_receiver cai
# 100% no caminho app-level (já existe, mas tem janela TOCTOU).
import logging
import sqlite3 as _sqlite3
_lib_ver = getattr(_sqlite3, "sqlite_version", None)
try:
_rt_ver = _cur.execute("SELECT sqlite_version()").fetchone()[0]
except Exception:
_rt_ver = None
logging.getLogger(__name__).warning(
"Failed to create uq_trigger_idem partial unique — DB-level race "
"guard inactive, relying on app-level dedup only. "
"sqlite_lib=%r sqlite_runtime=%r err=%r",
_lib_ver, _rt_ver, exc,
)
# --- End WhatsApp retry pattern migration ---

_conn.close()
# --- End auto-migrate ---

Expand Down
13 changes: 12 additions & 1 deletion dashboard/backend/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,12 +333,20 @@ class TriggerExecution(db.Model):
id = db.Column(db.Integer, primary_key=True)
trigger_id = db.Column(db.Integer, db.ForeignKey("triggers.id", ondelete="CASCADE"), nullable=False)
event_data = db.Column(db.Text, nullable=True, default="{}") # JSON payload received
status = db.Column(db.String(20), nullable=False, default="pending") # pending, running, completed, failed
status = db.Column(db.String(20), nullable=False, default="pending") # pending, running, completed, failed, failed_retryable
result_summary = db.Column(db.Text, nullable=True)
error = db.Column(db.Text, nullable=True)
duration_seconds = db.Column(db.Float, nullable=True)
started_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
completed_at = db.Column(db.DateTime, nullable=True)
# WhatsApp retry pattern (PR-1: migration 2026-05-11)
# rollback: DROP indices uq_trigger_idem + ix_trigger_executions_idem_key; columns are nullable, ignored by old code
# Note: index intentionally NOT declared here — the raw-SQL migration in app.py
# creates `ix_trigger_executions_idem_key` (basic) + `uq_trigger_idem` (partial unique).
# Declaring `index=True` here would create a third redundant index (Sourcery #78).
idempotency_key = db.Column(db.String(255), nullable=True) # messageId WPP or other source dedup key
error_category = db.Column(db.String(20), nullable=True) # transient | permanent | validation | unknown
last_replay_at = db.Column(db.DateTime, nullable=True) # rate-limit: 60s between replays of the same execution

@property
def event_data_dict(self) -> dict:
Expand All @@ -358,6 +366,9 @@ def to_dict(self):
"duration_seconds": self.duration_seconds,
"started_at": self.started_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") if self.started_at else None,
"completed_at": self.completed_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") if self.completed_at else None,
"idempotency_key": self.idempotency_key,
"error_category": self.error_category,
"last_replay_at": self.last_replay_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") if self.last_replay_at else None,
}


Expand Down
57 changes: 53 additions & 4 deletions dashboard/backend/routes/triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
import time
from pathlib import Path
from datetime import datetime, timezone
from flask import Blueprint, jsonify, request
from flask import Blueprint, jsonify, request, current_app
from flask_login import current_user
from models import db, Trigger, TriggerExecution, has_permission, audit
from sqlalchemy.exc import IntegrityError

bp = Blueprint("triggers", __name__)

Expand Down Expand Up @@ -245,7 +246,6 @@ def test_trigger(trigger_id):
trigger_id_int = trigger.id
trigger_name = trigger.name

from flask import current_app
app = current_app._get_current_object()

def _run():
Expand Down Expand Up @@ -318,22 +318,71 @@ def webhook_receiver(trigger_id):
if not _matches_filter(event_data, trigger.event_filter_dict):
return jsonify({"status": "ok"}), 200

# --- WhatsApp retry pattern: idempotency key extraction (PR-1 2026-05-11) ---
# WPP channel: N8N forwards messageId as idempotency_key or data.messageId.
# Other sources (GitHub, Stripe, Linear): no key → idem_key=None → check is skipped.
idem_key = None
if isinstance(event_data, dict):
idem_key = (
event_data.get("idempotency_key")
or event_data.get("messageId")
or (event_data.get("data") or {}).get("messageId")
or None
)

# Silent dedup (F6 pattern): second POST with same key returns 200 OK without re-executing.
if idem_key:
existing = TriggerExecution.query.filter_by(
trigger_id=trigger.id, idempotency_key=idem_key
).first()
if existing:
current_app.logger.info(
f"evt=idempotent_replay trigger_id={trigger.id} key={idem_key} existing_exec_id={existing.id}"
)
return jsonify({"status": "ok"}), 200
# --- End idempotency dedup ---

# Create execution and run async
execution = TriggerExecution(
trigger_id=trigger.id,
event_data=json.dumps(event_data),
status="pending",
idempotency_key=idem_key,
)
db.session.add(execution)
db.session.commit()
try:
db.session.commit()
except IntegrityError as exc:
# Restringe o catch à violação do índice de idempotência
# (uq_trigger_idem). Qualquer outro IntegrityError (NOT NULL, FK,
# outros uniques) é um problema real e precisa propagar — caso
# contrário viraria 200 OK silencioso e mascararia bug de schema.
db.session.rollback()
err_text = str(getattr(exc, "orig", exc)).lower()
is_idem_violation = (
idem_key is not None
and ("uq_trigger_idem" in err_text or "idempotency_key" in err_text)
)
if not is_idem_violation:
current_app.logger.error(
f"evt=integrity_error_unexpected trigger_id={trigger.id} "
f"key={idem_key} err={err_text!r}"
)
raise
# Race condition: two simultaneous POSTs with same idempotency_key;
# the DB partial unique index rejected the second INSERT.
# Silent dedup — return 200 OK (F6) without re-executing.
current_app.logger.info(
f"evt=idempotent_replay_race trigger_id={trigger.id} key={idem_key}"
)
return jsonify({"status": "ok"}), 200

# Capture IDs BEFORE handing off to the worker thread (see test_trigger
# for the same DetachedInstanceError issue) — accessing ``execution.id``
# or ``trigger.id`` after the request session closes blows up.
execution_id = execution.id
trigger_id_int = trigger.id

from flask import current_app
app = current_app._get_current_object()

def _run():
Expand Down