Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 104 additions & 24 deletions .claude/skills/int-evolution-go/scripts/evolution_go_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
import argparse
import json
import os
import random
import socket
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
Expand Down Expand Up @@ -43,38 +47,103 @@ def get_config():
return url.rstrip("/"), key


def _backoff_retry_or_log_final(attempt, max_attempts, base_delay, max_delay, extras):
"""Sleep with exponential backoff + jitter, ou loga falha final.

Centraliza a lógica idêntica que estava duplicada nos branches 5xx
(HTTPError) e URLError/socket.timeout do retry loop. `extras` carrega
as chaves variantes do log (`http_status` ou `error`).
"""
if attempt < max_attempts - 1:
delay = min(base_delay * (2 ** attempt) + random.uniform(0, 0.5), max_delay)
print(
json.dumps({
"evt": "api_request_retry",
"attempt": attempt + 1,
"max_attempts": max_attempts,
**extras,
"delay_s": round(delay, 2),
})
)
time.sleep(delay)
else:
print(
json.dumps({
"evt": "api_request_failed",
"attempt": attempt + 1,
"max_attempts": max_attempts,
**extras,
"category": "transient",
})
)


def _retry_http_call_client(do_call, max_attempts=3, base_delay=2.0, max_delay=8.0):
"""Exponential backoff + jitter for Evolution Go API calls.

Retries on HTTP 5xx, urllib.error.URLError, and socket.timeout (transient).
NEVER retries on HTTP 4xx (deterministic client errors).

Returns the result of do_call() on success.
Raises the last exception after max_attempts are exhausted.
Raises immediately on HTTP 4xx (no retry).
Raises ValueError if max_attempts < 1 (caller error).
"""
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
if max_attempts < 1:
raise ValueError("max_attempts must be >= 1")

last_exc = None
for attempt in range(max_attempts):
try:
return do_call()
except urllib.error.HTTPError as e:
if e.code < 500:
# 4xx — deterministic, raise immediately (caller decides sys.exit vs raise)
raise
last_exc = e
_backoff_retry_or_log_final(
attempt, max_attempts, base_delay, max_delay,
{"http_status": e.code},
)
except (urllib.error.URLError, socket.timeout) as e:
last_exc = e
_backoff_retry_or_log_final(
attempt, max_attempts, base_delay, max_delay,
{"error": str(e)},
)
raise last_exc


def api_request(method, path, data=None):
"""Make an HTTP request to the Evolution Go API."""
"""Make an HTTP request to the Evolution Go API.

Applies exponential backoff + jitter on HTTP 5xx / network errors (up to 3 attempts).
On HTTP 4xx: raises urllib.error.HTTPError immediately (no retry, deterministic error).
On persistent failure after retries: raises the last exception instead of sys.exit(1),
allowing library callers to handle it; CLI __main__ catches and sys.exit(1) as before.
"""
base_url, api_key = get_config()
url = f"{base_url}{path}"

body = json.dumps(data).encode("utf-8") if data else None
req = urllib.request.Request(
url,
data=body,
method=method,
headers={
"apikey": api_key,
"Content-Type": "application/json",
},
)

try:
def _do_call():
req = urllib.request.Request(
url,
data=body,
method=method,
headers={
"apikey": api_key,
"Content-Type": "application/json",
},
)
with urllib.request.urlopen(req) as resp:
raw = resp.read()
if raw:
return json.loads(raw)
return {"message": "success"}
except urllib.error.HTTPError as e:
try:
error_body = json.loads(e.read())
except Exception:
error_body = {"error": str(e)}
print(json.dumps({"error": f"HTTP {e.code}", "details": error_body}, indent=2))
sys.exit(1)
except urllib.error.URLError as e:
print(json.dumps({"error": f"Connection failed: {e.reason}"}))
sys.exit(1)

return _retry_http_call_client(_do_call)


def to_jid(number):
Expand Down Expand Up @@ -523,12 +592,23 @@ def main():
}

handler = commands.get(args.command)
if handler:
handler(args)
else:
if not handler:
print(json.dumps({"error": f"Unknown command: {args.command}"}))
sys.exit(1)

try:
handler(args)
except urllib.error.HTTPError as e:
try:
error_body = json.loads(e.read())
except Exception:
error_body = {"error": str(e)}
print(json.dumps({"error": f"HTTP {e.code}", "details": error_body}, indent=2))
sys.exit(1)
except (urllib.error.URLError, socket.timeout) as e:
print(json.dumps({"error": f"Connection failed: {e}"}))
sys.exit(1)


if __name__ == "__main__":
main()
37 changes: 37 additions & 0 deletions dashboard/backend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,43 @@ def _cors_allowed_origins():
except Exception:
pass
_conn.commit()

# --- WhatsApp retry pattern: idempotency_key + error_category + last_replay_at (PR-1 2026-05-11) ---
# Rollback: DROP INDEX uq_trigger_idem; DROP INDEX ix_trigger_executions_idem_key
# Columns are nullable — old code ignores them without breaking.
_te_cols = {row[1] for row in _cur.execute("PRAGMA table_info(trigger_executions)").fetchall()}
if "idempotency_key" not in _te_cols:
_cur.execute("ALTER TABLE trigger_executions ADD COLUMN idempotency_key TEXT")
_conn.commit()
if "error_category" not in _te_cols:
_cur.execute("ALTER TABLE trigger_executions ADD COLUMN error_category TEXT")
_conn.commit()
if "last_replay_at" not in _te_cols:
_cur.execute("ALTER TABLE trigger_executions ADD COLUMN last_replay_at TIMESTAMP")
_conn.commit()
# Basic index for idempotency lookups by key alone
try:
_cur.execute(
"CREATE INDEX IF NOT EXISTS ix_trigger_executions_idem_key "
"ON trigger_executions (idempotency_key)"
)
_conn.commit()
except Exception:
pass
# Partial unique index: enforces (trigger_id, idempotency_key) uniqueness only when key IS NOT NULL.
# SQLite >= 3.8 supports partial indices natively; our runtime is 3.51 (confirmed).
# This is the DB-level guard against race-condition duplicates (Step 2 handles app-level dedup).
try:
_cur.execute(
"CREATE UNIQUE INDEX IF NOT EXISTS uq_trigger_idem "
"ON trigger_executions (trigger_id, idempotency_key) "
"WHERE idempotency_key IS NOT NULL"
)
_conn.commit()
except Exception:
pass
# --- End WhatsApp retry pattern migration ---

_conn.close()
# --- End auto-migrate ---

Expand Down
10 changes: 9 additions & 1 deletion dashboard/backend/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,12 +333,17 @@ class TriggerExecution(db.Model):
id = db.Column(db.Integer, primary_key=True)
trigger_id = db.Column(db.Integer, db.ForeignKey("triggers.id", ondelete="CASCADE"), nullable=False)
event_data = db.Column(db.Text, nullable=True, default="{}") # JSON payload received
status = db.Column(db.String(20), nullable=False, default="pending") # pending, running, completed, failed
status = db.Column(db.String(20), nullable=False, default="pending") # pending, running, completed, failed, failed_retryable
result_summary = db.Column(db.Text, nullable=True)
error = db.Column(db.Text, nullable=True)
duration_seconds = db.Column(db.Float, nullable=True)
started_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
completed_at = db.Column(db.DateTime, nullable=True)
# WhatsApp retry pattern (PR-1: migration 2026-05-11)
# rollback: DROP indices uq_trigger_idem + ix_trigger_executions_idem_key; columns are nullable, ignored by old code
idempotency_key = db.Column(db.String(255), nullable=True, index=True) # messageId WPP or other source dedup key
error_category = db.Column(db.String(20), nullable=True) # transient | permanent | validation | unknown
last_replay_at = db.Column(db.DateTime, nullable=True) # rate-limit: 60s between replays of the same execution

@property
def event_data_dict(self) -> dict:
Expand All @@ -358,6 +363,9 @@ def to_dict(self):
"duration_seconds": self.duration_seconds,
"started_at": self.started_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") if self.started_at else None,
"completed_at": self.completed_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") if self.completed_at else None,
"idempotency_key": self.idempotency_key,
"error_category": self.error_category,
"last_replay_at": self.last_replay_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") if self.last_replay_at else None,
}


Expand Down
42 changes: 38 additions & 4 deletions dashboard/backend/routes/triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
import time
from pathlib import Path
from datetime import datetime, timezone
from flask import Blueprint, jsonify, request
from flask import Blueprint, jsonify, request, current_app
from flask_login import current_user
from models import db, Trigger, TriggerExecution, has_permission, audit
from sqlalchemy.exc import IntegrityError

bp = Blueprint("triggers", __name__)

Expand Down Expand Up @@ -245,7 +246,6 @@ def test_trigger(trigger_id):
trigger_id_int = trigger.id
trigger_name = trigger.name

from flask import current_app
app = current_app._get_current_object()

def _run():
Expand Down Expand Up @@ -318,22 +318,56 @@ def webhook_receiver(trigger_id):
if not _matches_filter(event_data, trigger.event_filter_dict):
return jsonify({"status": "ok"}), 200

# --- WhatsApp retry pattern: idempotency key extraction (PR-1 2026-05-11) ---
# WPP channel: N8N forwards messageId as idempotency_key or data.messageId.
# Other sources (GitHub, Stripe, Linear): no key → idem_key=None → check is skipped.
idem_key = None
if isinstance(event_data, dict):
idem_key = (
event_data.get("idempotency_key")
or event_data.get("messageId")
or (event_data.get("data") or {}).get("messageId")
or None
)

# Silent dedup (F6 pattern): second POST with same key returns 200 OK without re-executing.
if idem_key:
existing = TriggerExecution.query.filter_by(
trigger_id=trigger.id, idempotency_key=idem_key
).first()
if existing:
current_app.logger.info(
f"evt=idempotent_replay trigger_id={trigger.id} key={idem_key} existing_exec_id={existing.id}"
)
return jsonify({"status": "ok"}), 200
# --- End idempotency dedup ---

# Create execution and run async
execution = TriggerExecution(
trigger_id=trigger.id,
event_data=json.dumps(event_data),
status="pending",
idempotency_key=idem_key,
)
db.session.add(execution)
db.session.commit()
try:
db.session.commit()
except IntegrityError:
# Race condition: two simultaneous POSTs with same idempotency_key;
# the DB partial unique index rejected the second INSERT.
# Silent dedup — return 200 OK (F6) without re-executing.
db.session.rollback()
current_app.logger.info(
f"evt=idempotent_replay_race trigger_id={trigger.id} key={idem_key}"
)
return jsonify({"status": "ok"}), 200

# Capture IDs BEFORE handing off to the worker thread (see test_trigger
# for the same DetachedInstanceError issue) — accessing ``execution.id``
# or ``trigger.id`` after the request session closes blows up.
execution_id = execution.id
trigger_id_int = trigger.id

from flask import current_app
app = current_app._get_current_object()

def _run():
Expand Down
Empty file added tests/whatsapp/__init__.py
Empty file.
Loading