diff --git a/dashboard/backend/app.py b/dashboard/backend/app.py index 2dccb597..e5a7ac3f 100644 --- a/dashboard/backend/app.py +++ b/dashboard/backend/app.py @@ -613,6 +613,72 @@ def _cors_allowed_origins(): except Exception: pass _conn.commit() + + # --- WhatsApp retry pattern: idempotency_key + error_category + last_replay_at (PR-1 2026-05-11) --- + # Rollback: DROP INDEX uq_trigger_idem; DROP INDEX ix_trigger_executions_idem_key + # Columns are nullable — old code ignores them without breaking. + _te_cols = {row[1] for row in _cur.execute("PRAGMA table_info(trigger_executions)").fetchall()} + if "idempotency_key" not in _te_cols: + _cur.execute("ALTER TABLE trigger_executions ADD COLUMN idempotency_key TEXT") + _conn.commit() + if "error_category" not in _te_cols: + _cur.execute("ALTER TABLE trigger_executions ADD COLUMN error_category TEXT") + _conn.commit() + if "last_replay_at" not in _te_cols: + _cur.execute("ALTER TABLE trigger_executions ADD COLUMN last_replay_at TIMESTAMP") + _conn.commit() + # Basic index for idempotency lookups by key alone + try: + _cur.execute( + "CREATE INDEX IF NOT EXISTS ix_trigger_executions_idem_key " + "ON trigger_executions (idempotency_key)" + ) + _conn.commit() + except Exception as exc: + # Não silenciar: se este índice não for criado, a dedup cai pra + # caminho app-only (mais lento e com janela de race). Logar com a + # versão do SQLite ajuda a diagnosticar incompatibilidade. + import logging + import sqlite3 as _sqlite3 + _lib_ver = getattr(_sqlite3, "sqlite_version", None) + try: + _rt_ver = _cur.execute("SELECT sqlite_version()").fetchone()[0] + except Exception: + _rt_ver = None + logging.getLogger(__name__).warning( + "Failed to create ix_trigger_executions_idem_key — falling back " + "to app-level idempotency only. sqlite_lib=%r sqlite_runtime=%r err=%r", + _lib_ver, _rt_ver, exc, + ) + # Partial unique index: enforces (trigger_id, idempotency_key) uniqueness only when key IS NOT NULL. + # SQLite >= 3.8 supports partial indices natively; our runtime is 3.51 (confirmed). + # This is the DB-level guard against race-condition duplicates (Step 2 handles app-level dedup). + try: + _cur.execute( + "CREATE UNIQUE INDEX IF NOT EXISTS uq_trigger_idem " + "ON trigger_executions (trigger_id, idempotency_key) " + "WHERE idempotency_key IS NOT NULL" + ) + _conn.commit() + except Exception as exc: + # Mesmo tratamento: se o partial unique não for criado, perdemos a + # garantia DB-level contra duplicate race — webhook_receiver cai + # 100% no caminho app-level (já existe, mas tem janela TOCTOU). + import logging + import sqlite3 as _sqlite3 + _lib_ver = getattr(_sqlite3, "sqlite_version", None) + try: + _rt_ver = _cur.execute("SELECT sqlite_version()").fetchone()[0] + except Exception: + _rt_ver = None + logging.getLogger(__name__).warning( + "Failed to create uq_trigger_idem partial unique — DB-level race " + "guard inactive, relying on app-level dedup only. " + "sqlite_lib=%r sqlite_runtime=%r err=%r", + _lib_ver, _rt_ver, exc, + ) + # --- End WhatsApp retry pattern migration --- + _conn.close() # --- End auto-migrate --- diff --git a/dashboard/backend/models.py b/dashboard/backend/models.py index a151bcd8..437d431a 100644 --- a/dashboard/backend/models.py +++ b/dashboard/backend/models.py @@ -333,12 +333,20 @@ class TriggerExecution(db.Model): id = db.Column(db.Integer, primary_key=True) trigger_id = db.Column(db.Integer, db.ForeignKey("triggers.id", ondelete="CASCADE"), nullable=False) event_data = db.Column(db.Text, nullable=True, default="{}") # JSON payload received - status = db.Column(db.String(20), nullable=False, default="pending") # pending, running, completed, failed + status = db.Column(db.String(20), nullable=False, default="pending") # pending, running, completed, failed, failed_retryable result_summary = db.Column(db.Text, nullable=True) error = db.Column(db.Text, nullable=True) duration_seconds = db.Column(db.Float, nullable=True) started_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc)) completed_at = db.Column(db.DateTime, nullable=True) + # WhatsApp retry pattern (PR-1: migration 2026-05-11) + # rollback: DROP indices uq_trigger_idem + ix_trigger_executions_idem_key; columns are nullable, ignored by old code + # Note: index intentionally NOT declared here — the raw-SQL migration in app.py + # creates `ix_trigger_executions_idem_key` (basic) + `uq_trigger_idem` (partial unique). + # Declaring `index=True` here would create a third redundant index (Sourcery #78). + idempotency_key = db.Column(db.String(255), nullable=True) # messageId WPP or other source dedup key + error_category = db.Column(db.String(20), nullable=True) # transient | permanent | validation | unknown + last_replay_at = db.Column(db.DateTime, nullable=True) # rate-limit: 60s between replays of the same execution @property def event_data_dict(self) -> dict: @@ -358,6 +366,9 @@ def to_dict(self): "duration_seconds": self.duration_seconds, "started_at": self.started_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") if self.started_at else None, "completed_at": self.completed_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") if self.completed_at else None, + "idempotency_key": self.idempotency_key, + "error_category": self.error_category, + "last_replay_at": self.last_replay_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") if self.last_replay_at else None, } diff --git a/dashboard/backend/routes/triggers.py b/dashboard/backend/routes/triggers.py index 50bc5336..88db8604 100644 --- a/dashboard/backend/routes/triggers.py +++ b/dashboard/backend/routes/triggers.py @@ -12,9 +12,10 @@ import time from pathlib import Path from datetime import datetime, timezone -from flask import Blueprint, jsonify, request +from flask import Blueprint, jsonify, request, current_app from flask_login import current_user from models import db, Trigger, TriggerExecution, has_permission, audit +from sqlalchemy.exc import IntegrityError bp = Blueprint("triggers", __name__) @@ -245,7 +246,6 @@ def test_trigger(trigger_id): trigger_id_int = trigger.id trigger_name = trigger.name - from flask import current_app app = current_app._get_current_object() def _run(): @@ -318,14 +318,64 @@ def webhook_receiver(trigger_id): if not _matches_filter(event_data, trigger.event_filter_dict): return jsonify({"status": "ok"}), 200 + # --- WhatsApp retry pattern: idempotency key extraction (PR-1 2026-05-11) --- + # WPP channel: N8N forwards messageId as idempotency_key or data.messageId. + # Other sources (GitHub, Stripe, Linear): no key → idem_key=None → check is skipped. + idem_key = None + if isinstance(event_data, dict): + idem_key = ( + event_data.get("idempotency_key") + or event_data.get("messageId") + or (event_data.get("data") or {}).get("messageId") + or None + ) + + # Silent dedup (F6 pattern): second POST with same key returns 200 OK without re-executing. + if idem_key: + existing = TriggerExecution.query.filter_by( + trigger_id=trigger.id, idempotency_key=idem_key + ).first() + if existing: + current_app.logger.info( + f"evt=idempotent_replay trigger_id={trigger.id} key={idem_key} existing_exec_id={existing.id}" + ) + return jsonify({"status": "ok"}), 200 + # --- End idempotency dedup --- + # Create execution and run async execution = TriggerExecution( trigger_id=trigger.id, event_data=json.dumps(event_data), status="pending", + idempotency_key=idem_key, ) db.session.add(execution) - db.session.commit() + try: + db.session.commit() + except IntegrityError as exc: + # Restringe o catch à violação do índice de idempotência + # (uq_trigger_idem). Qualquer outro IntegrityError (NOT NULL, FK, + # outros uniques) é um problema real e precisa propagar — caso + # contrário viraria 200 OK silencioso e mascararia bug de schema. + db.session.rollback() + err_text = str(getattr(exc, "orig", exc)).lower() + is_idem_violation = ( + idem_key is not None + and ("uq_trigger_idem" in err_text or "idempotency_key" in err_text) + ) + if not is_idem_violation: + current_app.logger.error( + f"evt=integrity_error_unexpected trigger_id={trigger.id} " + f"key={idem_key} err={err_text!r}" + ) + raise + # Race condition: two simultaneous POSTs with same idempotency_key; + # the DB partial unique index rejected the second INSERT. + # Silent dedup — return 200 OK (F6) without re-executing. + current_app.logger.info( + f"evt=idempotent_replay_race trigger_id={trigger.id} key={idem_key}" + ) + return jsonify({"status": "ok"}), 200 # Capture IDs BEFORE handing off to the worker thread (see test_trigger # for the same DetachedInstanceError issue) — accessing ``execution.id`` @@ -333,7 +383,6 @@ def webhook_receiver(trigger_id): execution_id = execution.id trigger_id_int = trigger.id - from flask import current_app app = current_app._get_current_object() def _run():