From 4c4f356e55c5d0bada23f8065c127b1ca3726a1d Mon Sep 17 00:00:00 2001 From: Marcello Alarcon Date: Mon, 11 May 2026 08:40:39 -0300 Subject: [PATCH 1/2] =?UTF-8?q?feat(wpp-retry):=20PR-1=20=E2=80=94=20migra?= =?UTF-8?q?tion=20idempotency=5Fkey=20+=20silent=20dedup=20(Steps=201+2)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Step 1 — Migration (models.py + app.py): - TriggerExecution ganha 3 colunas nullable: idempotency_key, error_category, last_replay_at - to_dict() exposto com os 3 campos novos - Auto-migrate idempotente no startup: ALTER TABLE + IF NOT EXISTS em cada bloco - Partial unique index uq_trigger_idem (trigger_id, idempotency_key) WHERE NOT NULL - Basic index ix_trigger_executions_idem_key para lookups por key - SQLite 3.51 confirmado — partial index nativo; EXPLAIN QUERY PLAN confirma uso do índice Step 2 — Silent dedup (triggers.py): - webhook_receiver extrai idem_key de idempotency_key / messageId / data.messageId - Se key já existe: log idempotent_replay + 200 OK silencioso (pattern F6) - Race condition: IntegrityError no db.commit() → rollback + 200 OK silencioso - Legado (GitHub, Stripe, Linear): sem key → idem_key=None → fluxo normal inalterado - Limpeza: current_app movido para import no topo; imports inline removidos Testes passados: migration up/down idempotente, partial index unicidade, NULLs livres, extração de key (6 casos), race condition via IntegrityError, EXPLAIN QUERY PLAN. Co-Authored-By: Claude Sonnet 4.6 --- dashboard/backend/app.py | 37 ++++++++++++++++++++++++ dashboard/backend/models.py | 10 ++++++- dashboard/backend/routes/triggers.py | 42 +++++++++++++++++++++++++--- 3 files changed, 84 insertions(+), 5 deletions(-) diff --git a/dashboard/backend/app.py b/dashboard/backend/app.py index 2dccb597..7b3e1acb 100644 --- a/dashboard/backend/app.py +++ b/dashboard/backend/app.py @@ -613,6 +613,43 @@ def _cors_allowed_origins(): except Exception: pass _conn.commit() + + # --- WhatsApp retry pattern: idempotency_key + error_category + last_replay_at (PR-1 2026-05-11) --- + # Rollback: DROP INDEX uq_trigger_idem; DROP INDEX ix_trigger_executions_idem_key + # Columns are nullable — old code ignores them without breaking. + _te_cols = {row[1] for row in _cur.execute("PRAGMA table_info(trigger_executions)").fetchall()} + if "idempotency_key" not in _te_cols: + _cur.execute("ALTER TABLE trigger_executions ADD COLUMN idempotency_key TEXT") + _conn.commit() + if "error_category" not in _te_cols: + _cur.execute("ALTER TABLE trigger_executions ADD COLUMN error_category TEXT") + _conn.commit() + if "last_replay_at" not in _te_cols: + _cur.execute("ALTER TABLE trigger_executions ADD COLUMN last_replay_at TIMESTAMP") + _conn.commit() + # Basic index for idempotency lookups by key alone + try: + _cur.execute( + "CREATE INDEX IF NOT EXISTS ix_trigger_executions_idem_key " + "ON trigger_executions (idempotency_key)" + ) + _conn.commit() + except Exception: + pass + # Partial unique index: enforces (trigger_id, idempotency_key) uniqueness only when key IS NOT NULL. + # SQLite >= 3.8 supports partial indices natively; our runtime is 3.51 (confirmed). + # This is the DB-level guard against race-condition duplicates (Step 2 handles app-level dedup). + try: + _cur.execute( + "CREATE UNIQUE INDEX IF NOT EXISTS uq_trigger_idem " + "ON trigger_executions (trigger_id, idempotency_key) " + "WHERE idempotency_key IS NOT NULL" + ) + _conn.commit() + except Exception: + pass + # --- End WhatsApp retry pattern migration --- + _conn.close() # --- End auto-migrate --- diff --git a/dashboard/backend/models.py b/dashboard/backend/models.py index a151bcd8..2dcff6f1 100644 --- a/dashboard/backend/models.py +++ b/dashboard/backend/models.py @@ -333,12 +333,17 @@ class TriggerExecution(db.Model): id = db.Column(db.Integer, primary_key=True) trigger_id = db.Column(db.Integer, db.ForeignKey("triggers.id", ondelete="CASCADE"), nullable=False) event_data = db.Column(db.Text, nullable=True, default="{}") # JSON payload received - status = db.Column(db.String(20), nullable=False, default="pending") # pending, running, completed, failed + status = db.Column(db.String(20), nullable=False, default="pending") # pending, running, completed, failed, failed_retryable result_summary = db.Column(db.Text, nullable=True) error = db.Column(db.Text, nullable=True) duration_seconds = db.Column(db.Float, nullable=True) started_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc)) completed_at = db.Column(db.DateTime, nullable=True) + # WhatsApp retry pattern (PR-1: migration 2026-05-11) + # rollback: DROP indices uq_trigger_idem + ix_trigger_executions_idem_key; columns are nullable, ignored by old code + idempotency_key = db.Column(db.String(255), nullable=True, index=True) # messageId WPP or other source dedup key + error_category = db.Column(db.String(20), nullable=True) # transient | permanent | validation | unknown + last_replay_at = db.Column(db.DateTime, nullable=True) # rate-limit: 60s between replays of the same execution @property def event_data_dict(self) -> dict: @@ -358,6 +363,9 @@ def to_dict(self): "duration_seconds": self.duration_seconds, "started_at": self.started_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") if self.started_at else None, "completed_at": self.completed_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") if self.completed_at else None, + "idempotency_key": self.idempotency_key, + "error_category": self.error_category, + "last_replay_at": self.last_replay_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") if self.last_replay_at else None, } diff --git a/dashboard/backend/routes/triggers.py b/dashboard/backend/routes/triggers.py index 50bc5336..9d7c6d3b 100644 --- a/dashboard/backend/routes/triggers.py +++ b/dashboard/backend/routes/triggers.py @@ -12,9 +12,10 @@ import time from pathlib import Path from datetime import datetime, timezone -from flask import Blueprint, jsonify, request +from flask import Blueprint, jsonify, request, current_app from flask_login import current_user from models import db, Trigger, TriggerExecution, has_permission, audit +from sqlalchemy.exc import IntegrityError bp = Blueprint("triggers", __name__) @@ -245,7 +246,6 @@ def test_trigger(trigger_id): trigger_id_int = trigger.id trigger_name = trigger.name - from flask import current_app app = current_app._get_current_object() def _run(): @@ -318,14 +318,49 @@ def webhook_receiver(trigger_id): if not _matches_filter(event_data, trigger.event_filter_dict): return jsonify({"status": "ok"}), 200 + # --- WhatsApp retry pattern: idempotency key extraction (PR-1 2026-05-11) --- + # WPP channel: N8N forwards messageId as idempotency_key or data.messageId. + # Other sources (GitHub, Stripe, Linear): no key → idem_key=None → check is skipped. + idem_key = None + if isinstance(event_data, dict): + idem_key = ( + event_data.get("idempotency_key") + or event_data.get("messageId") + or (event_data.get("data") or {}).get("messageId") + or None + ) + + # Silent dedup (F6 pattern): second POST with same key returns 200 OK without re-executing. + if idem_key: + existing = TriggerExecution.query.filter_by( + trigger_id=trigger.id, idempotency_key=idem_key + ).first() + if existing: + current_app.logger.info( + f"evt=idempotent_replay trigger_id={trigger.id} key={idem_key} existing_exec_id={existing.id}" + ) + return jsonify({"status": "ok"}), 200 + # --- End idempotency dedup --- + # Create execution and run async execution = TriggerExecution( trigger_id=trigger.id, event_data=json.dumps(event_data), status="pending", + idempotency_key=idem_key, ) db.session.add(execution) - db.session.commit() + try: + db.session.commit() + except IntegrityError: + # Race condition: two simultaneous POSTs with same idempotency_key; + # the DB partial unique index rejected the second INSERT. + # Silent dedup — return 200 OK (F6) without re-executing. + db.session.rollback() + current_app.logger.info( + f"evt=idempotent_replay_race trigger_id={trigger.id} key={idem_key}" + ) + return jsonify({"status": "ok"}), 200 # Capture IDs BEFORE handing off to the worker thread (see test_trigger # for the same DetachedInstanceError issue) — accessing ``execution.id`` @@ -333,7 +368,6 @@ def webhook_receiver(trigger_id): execution_id = execution.id trigger_id_int = trigger.id - from flask import current_app app = current_app._get_current_object() def _run(): From 9c7cd5e0d4ccd84e648459318d9055b507f1f859 Mon Sep 17 00:00:00 2001 From: Marcello Alarcon Date: Wed, 13 May 2026 20:23:34 -0300 Subject: [PATCH 2/2] =?UTF-8?q?fix(wpp-retry):=20endere=C3=A7a=20review=20?= =?UTF-8?q?do=20Sourcery=20=E2=80=94=20IntegrityError=20narrow=20+=20?= =?UTF-8?q?=C3=ADndice=20+=20logs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Consolidado dos fixes pedidos pelo Sourcery no review do PR #78: 1. `except IntegrityError` restrito à violação de idempotência (dashboard/backend/routes/triggers.py) Antes, qualquer IntegrityError no `webhook_receiver` virava 200 OK silencioso — tratado como "replay idempotente". Outros problemas (NOT NULL, FK quebrada, novo unique constraint adicionado depois) ficariam mascarados como sucesso e o cliente nunca saberia que o evento não foi processado. Agora o catch só absorve o erro se: a) `idem_key` está definido (sem chave de idempotência não há como ser violação de uq_trigger_idem) b) A mensagem do erro do driver menciona `uq_trigger_idem` ou `idempotency_key` (constraint específica) Para os demais IntegrityError, loga com contexto completo (trigger_id, key, mensagem original) e re-raise — webhook retorna 500 e o erro fica visível em logs em vez de virar 200 silencioso. 2. Remove índice redundante (models.py) `TriggerExecution.idempotency_key` estava criando 3 índices pra mesma coluna: - `ix_trigger_executions_idempotency_key` (auto-gerado via index=True no model) - `ix_trigger_executions_idem_key` (raw SQL no startup) - `uq_trigger_idem` (partial unique, raw SQL) Mantemos só os dois últimos — explicitamente criados pela migration de startup, com nomes versionados no rollback plan. `index=True` removido da definição do model. 3. Loga falha de CREATE INDEX em vez de silenciar (app.py) Substitui `except Exception: pass` por log estruturado nos dois `CREATE INDEX` (basic e partial unique). Antes, qualquer falha (ex.: versão de SQLite sem suporte a partial index) era engolida silenciosamente — operador só descobriria no primeiro race. Agora loga `sqlite_lib`, `sqlite_runtime` e a exceção original. Sintaxe validada com `python3 -m py_compile`. Sem teste dedicado pro path nessa branch (testes WPP só entram nos PRs #79/#80). --- dashboard/backend/app.py | 37 +++++++++++++++++++++++++--- dashboard/backend/models.py | 5 +++- dashboard/backend/routes/triggers.py | 19 ++++++++++++-- 3 files changed, 54 insertions(+), 7 deletions(-) diff --git a/dashboard/backend/app.py b/dashboard/backend/app.py index 7b3e1acb..e5a7ac3f 100644 --- a/dashboard/backend/app.py +++ b/dashboard/backend/app.py @@ -634,8 +634,22 @@ def _cors_allowed_origins(): "ON trigger_executions (idempotency_key)" ) _conn.commit() - except Exception: - pass + except Exception as exc: + # Não silenciar: se este índice não for criado, a dedup cai pra + # caminho app-only (mais lento e com janela de race). Logar com a + # versão do SQLite ajuda a diagnosticar incompatibilidade. + import logging + import sqlite3 as _sqlite3 + _lib_ver = getattr(_sqlite3, "sqlite_version", None) + try: + _rt_ver = _cur.execute("SELECT sqlite_version()").fetchone()[0] + except Exception: + _rt_ver = None + logging.getLogger(__name__).warning( + "Failed to create ix_trigger_executions_idem_key — falling back " + "to app-level idempotency only. sqlite_lib=%r sqlite_runtime=%r err=%r", + _lib_ver, _rt_ver, exc, + ) # Partial unique index: enforces (trigger_id, idempotency_key) uniqueness only when key IS NOT NULL. # SQLite >= 3.8 supports partial indices natively; our runtime is 3.51 (confirmed). # This is the DB-level guard against race-condition duplicates (Step 2 handles app-level dedup). @@ -646,8 +660,23 @@ def _cors_allowed_origins(): "WHERE idempotency_key IS NOT NULL" ) _conn.commit() - except Exception: - pass + except Exception as exc: + # Mesmo tratamento: se o partial unique não for criado, perdemos a + # garantia DB-level contra duplicate race — webhook_receiver cai + # 100% no caminho app-level (já existe, mas tem janela TOCTOU). + import logging + import sqlite3 as _sqlite3 + _lib_ver = getattr(_sqlite3, "sqlite_version", None) + try: + _rt_ver = _cur.execute("SELECT sqlite_version()").fetchone()[0] + except Exception: + _rt_ver = None + logging.getLogger(__name__).warning( + "Failed to create uq_trigger_idem partial unique — DB-level race " + "guard inactive, relying on app-level dedup only. " + "sqlite_lib=%r sqlite_runtime=%r err=%r", + _lib_ver, _rt_ver, exc, + ) # --- End WhatsApp retry pattern migration --- _conn.close() diff --git a/dashboard/backend/models.py b/dashboard/backend/models.py index 2dcff6f1..437d431a 100644 --- a/dashboard/backend/models.py +++ b/dashboard/backend/models.py @@ -341,7 +341,10 @@ class TriggerExecution(db.Model): completed_at = db.Column(db.DateTime, nullable=True) # WhatsApp retry pattern (PR-1: migration 2026-05-11) # rollback: DROP indices uq_trigger_idem + ix_trigger_executions_idem_key; columns are nullable, ignored by old code - idempotency_key = db.Column(db.String(255), nullable=True, index=True) # messageId WPP or other source dedup key + # Note: index intentionally NOT declared here — the raw-SQL migration in app.py + # creates `ix_trigger_executions_idem_key` (basic) + `uq_trigger_idem` (partial unique). + # Declaring `index=True` here would create a third redundant index (Sourcery #78). + idempotency_key = db.Column(db.String(255), nullable=True) # messageId WPP or other source dedup key error_category = db.Column(db.String(20), nullable=True) # transient | permanent | validation | unknown last_replay_at = db.Column(db.DateTime, nullable=True) # rate-limit: 60s between replays of the same execution diff --git a/dashboard/backend/routes/triggers.py b/dashboard/backend/routes/triggers.py index 9d7c6d3b..88db8604 100644 --- a/dashboard/backend/routes/triggers.py +++ b/dashboard/backend/routes/triggers.py @@ -352,11 +352,26 @@ def webhook_receiver(trigger_id): db.session.add(execution) try: db.session.commit() - except IntegrityError: + except IntegrityError as exc: + # Restringe o catch à violação do índice de idempotência + # (uq_trigger_idem). Qualquer outro IntegrityError (NOT NULL, FK, + # outros uniques) é um problema real e precisa propagar — caso + # contrário viraria 200 OK silencioso e mascararia bug de schema. + db.session.rollback() + err_text = str(getattr(exc, "orig", exc)).lower() + is_idem_violation = ( + idem_key is not None + and ("uq_trigger_idem" in err_text or "idempotency_key" in err_text) + ) + if not is_idem_violation: + current_app.logger.error( + f"evt=integrity_error_unexpected trigger_id={trigger.id} " + f"key={idem_key} err={err_text!r}" + ) + raise # Race condition: two simultaneous POSTs with same idempotency_key; # the DB partial unique index rejected the second INSERT. # Silent dedup — return 200 OK (F6) without re-executing. - db.session.rollback() current_app.logger.info( f"evt=idempotent_replay_race trigger_id={trigger.id} key={idem_key}" )