Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions flexus_client_kit/ckit_automation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
from __future__ import annotations

import asyncio
import json
import logging
from pathlib import Path
from typing import Any, Optional

import jsonschema
from pymongo.errors import PyMongoError

logger = logging.getLogger(__name__)

# Loaded by set_automation_schema_dict() from ckit_automation_v1_schema_build (authoritative) or
# set_automation_schema(path) for tests / offline fixtures.
_AUTOMATION_SCHEMA: dict | None = None


def set_automation_schema_dict(schema: dict) -> None:
global _AUTOMATION_SCHEMA
if not isinstance(schema, dict):
raise TypeError("set_automation_schema_dict expects dict")
_AUTOMATION_SCHEMA = schema


def set_automation_schema(schema_path: str) -> None:
"""
Load automation JSON Schema from disk. Fail-fast: raises if file is missing or invalid JSON.
"""
global _AUTOMATION_SCHEMA
_AUTOMATION_SCHEMA = json.loads(Path(schema_path).read_text(encoding="utf-8"))


def extract_automation_published(persona_setup: dict) -> dict:
"""
Extract published automation config from persona_setup JSONB.
Returns empty dict if no published automations exist.
Published config lives inside persona_setup so changes trigger bot restart
via the existing subscription comparison in ckit_bot_exec.py.
"""
try:
result = persona_setup.get("automation_published", {})
return result if isinstance(result, dict) else {}
except (AttributeError, TypeError) as e:
logger.error("extract_automation_published failed", exc_info=e)
return {}


def extract_automation_draft(persona_automation_draft: Any) -> dict:
"""
Extract draft automation config from the separate persona_automation_draft column.
Draft lives in its own Prisma column to avoid triggering bot restarts on save.
Returns empty dict if column is NULL or invalid.
"""
try:
if persona_automation_draft is None:
return {}
if isinstance(persona_automation_draft, dict):
return persona_automation_draft
return {}
except (AttributeError, TypeError) as e:
logger.error("extract_automation_draft failed", exc_info=e)
return {}


def validate_automation_json(data: dict) -> list[str]:
"""
Validate an automation config dict against automation_v1.schema.json.
Returns list of error strings (empty list = valid).
Used by GraphQL mutations automation_draft_save and automation_publish.
"""
if _AUTOMATION_SCHEMA is None:
return ["automation schema not loaded -- call set_automation_schema_dict at backend startup"]
errors = []
try:
validator = jsonschema.Draft202012Validator(_AUTOMATION_SCHEMA)
for error in validator.iter_errors(data):
path = ".".join(str(p) for p in error.absolute_path) if error.absolute_path else "$"
errors.append("%s: %s" % (path, error.message))
except jsonschema.SchemaError as e:
errors.append("schema error: %s" % e.message)
return errors


class DisabledRulesCache:
def __init__(self, mongo_db: Any, interval: float = 30.0):
self._mongo_db = mongo_db
self._interval = interval
self._disabled: set = set()
self._task: Optional[asyncio.Task] = None

async def start(self) -> None:
await self._refresh()
self._task = asyncio.create_task(self._loop())

async def stop(self) -> None:
if self._task and not self._task.done():
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass

def get(self) -> set:
return self._disabled

async def _refresh(self) -> None:
try:
doc = await self._mongo_db["bot_runtime_config"].find_one({"_id": "disabled_rule_ids"})
if doc and isinstance(doc.get("ids"), list):
self._disabled = {str(x) for x in doc["ids"] if x}
else:
self._disabled = set()
except PyMongoError as e:
logger.warning("DisabledRulesCache refresh failed (mongo), keeping last known state: %s %s", type(e).__name__, e)
except (TypeError, ValueError) as e:
logger.warning("DisabledRulesCache refresh failed (bad doc), keeping last known state: %s %s", type(e).__name__, e)

async def _loop(self) -> None:
while True:
try:
await asyncio.sleep(self._interval)
await self._refresh()
except asyncio.CancelledError:
break


def filter_active_rules(all_rules: list, disabled: set) -> list:
if not disabled:
return all_rules
return [r for r in all_rules if r.get("rule_id", "") not in disabled]
Loading