Skip to content

Commit c11dd9c

Browse files
committed
feat: sync architecture upgrades from omnicloud-support-agent
Major improvements: - SQS: Upgrade to FIFO queue for message ordering guarantee - Docker: Upgrade Node.js to 20+ for MCP undici compatibility - Docker: Change HOME to /tmp for MCP auth file writes - Security: Add Telegram webhook secret token verification (HMAC) - Handler: Add FIFO MessageGroupId/DeduplicationId support - Handler: Add /debug command for session file download - Handler: Add /start welcome message handler - Handler: Refactor local commands to support handler type - Consumer: Add continuous typing indicator (every 4s) - Consumer: Add Markdown conversion pipeline for Telegram - Consumer: Add message_time passthrough - Config: Add LocalCommand dataclass for handler type support - Dependencies: Add telegramify-markdown for Markdown conversion
1 parent 0d38514 commit c11dd9c

8 files changed

Lines changed: 608 additions & 159 deletions

File tree

agent-sdk-client/config.py

Lines changed: 68 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,14 @@
66
from pathlib import Path
77
from typing import Optional
88

9+
10+
@dataclass
11+
class LocalCommand:
12+
"""Local command configuration."""
13+
type: str # "static" or "handler"
14+
response: str = "" # for static type
15+
handler: str = "" # for handler type
16+
917
logger = logging.getLogger(__name__)
1018
DEFAULT_CONFIG_PATH = Path(__file__).with_name("config.toml")
1119

@@ -28,7 +36,46 @@ def extract_command(text: Optional[str]) -> Optional[str]:
2836
return command
2937

3038

31-
def _load_config(config_path: Path = DEFAULT_CONFIG_PATH) -> tuple[list[str], dict[str, str], list[int | str]]:
39+
def _parse_local_command(name: str, value) -> tuple[str, LocalCommand | None]:
40+
"""Parse a single local command entry.
41+
42+
Args:
43+
name: Command name (with or without leading slash)
44+
value: Command value (string for legacy, dict for new format)
45+
46+
Returns:
47+
Tuple of (normalized_cmd, LocalCommand) or (normalized_cmd, None) if invalid
48+
"""
49+
cmd = f"/{name.lstrip('/')}" if not name.startswith('/') else name
50+
51+
# Legacy format: string value = static response
52+
if isinstance(value, str):
53+
return cmd, LocalCommand(type="static", response=value)
54+
55+
# New format: dict with type field
56+
if isinstance(value, dict):
57+
cmd_type = value.get('type', '')
58+
if cmd_type == 'static':
59+
response = value.get('response', '')
60+
if not response:
61+
logger.warning(f"Local command {cmd} has no response; skipping")
62+
return cmd, None
63+
return cmd, LocalCommand(type="static", response=response)
64+
elif cmd_type == 'handler':
65+
handler = value.get('handler', '')
66+
if not handler:
67+
logger.warning(f"Local command {cmd} has no handler; skipping")
68+
return cmd, None
69+
return cmd, LocalCommand(type="handler", handler=handler)
70+
else:
71+
logger.warning(f"Local command {cmd} has unknown type: {cmd_type}; skipping")
72+
return cmd, None
73+
74+
logger.warning(f"Local command {cmd} has invalid value type; skipping")
75+
return cmd, None
76+
77+
78+
def _load_config(config_path: Path = DEFAULT_CONFIG_PATH) -> tuple[list[str], dict[str, LocalCommand], list[int | str]]:
3279
"""Load commands and security config from TOML config file.
3380
3481
Returns:
@@ -48,16 +95,18 @@ def _load_config(config_path: Path = DEFAULT_CONFIG_PATH) -> tuple[list[str], di
4895
agent_commands = []
4996
agent_commands = [cmd for cmd in agent_commands if isinstance(cmd, str)]
5097

51-
# Load local commands
98+
# Load local commands (supports both legacy string and new dict format)
5299
local_commands_raw = data.get('local_commands', {})
53100
if not isinstance(local_commands_raw, dict):
54101
logger.warning("Local commands config is not a table; ignoring configuration")
55102
local_commands_raw = {}
56-
local_commands = {
57-
f"/{name.lstrip('/')}" if not name.startswith('/') else name: str(value)
58-
for name, value in local_commands_raw.items()
59-
if isinstance(name, str) and isinstance(value, str)
60-
}
103+
local_commands: dict[str, LocalCommand] = {}
104+
for name, value in local_commands_raw.items():
105+
if not isinstance(name, str):
106+
continue
107+
cmd, parsed = _parse_local_command(name, value)
108+
if parsed:
109+
local_commands[cmd] = parsed
61110

62111
# Load security whitelist
63112
security = data.get('security', {})
@@ -92,8 +141,9 @@ class Config:
92141
auth_token: str
93142
queue_url: str
94143
agent_commands: list[str]
95-
local_commands: dict[str, str]
144+
local_commands: dict[str, LocalCommand]
96145
user_whitelist: list[int | str]
146+
telegram_webhook_secret: str = ""
97147

98148
@classmethod
99149
def from_env(cls, config_path: Optional[Path] = None) -> 'Config':
@@ -107,6 +157,7 @@ def from_env(cls, config_path: Optional[Path] = None) -> 'Config':
107157
agent_commands=agent_cmds,
108158
local_commands=local_cmds,
109159
user_whitelist=whitelist,
160+
telegram_webhook_secret=os.getenv('TELEGRAM_WEBHOOK_SECRET', ''),
110161
)
111162

112163
def get_command(self, text: Optional[str]) -> Optional[str]:
@@ -118,8 +169,16 @@ def is_agent_command(self, cmd: Optional[str]) -> bool:
118169
def is_local_command(self, cmd: Optional[str]) -> bool:
119170
return bool(cmd) and cmd in self.local_commands
120171

172+
def get_local_command(self, cmd: str) -> LocalCommand | None:
173+
"""Get local command config by command name."""
174+
return self.local_commands.get(cmd)
175+
121176
def local_response(self, cmd: str) -> str:
122-
return self.local_commands.get(cmd, "Unsupported command.")
177+
"""Get static response for a local command (legacy compatibility)."""
178+
local_cmd = self.local_commands.get(cmd)
179+
if local_cmd and local_cmd.type == "static":
180+
return local_cmd.response
181+
return "Unsupported command."
123182

124183
def unknown_command_message(self) -> str:
125184
parts = []

agent-sdk-client/config.toml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,15 @@ commands = [
77

88
[local_commands]
99
# Local-only commands handled by the client
10-
help = "Hello World"
11-
newchat = "创建新对话"
10+
# Supports two formats:
11+
# - Legacy: help = "static response text"
12+
# - New: help = { type = "static", response = "text" }
13+
# - Handler: newchat = { type = "handler", handler = "newchat" }
14+
15+
help = { type = "static", response = "Hello World" }
16+
newchat = { type = "handler", handler = "newchat" }
17+
start = { type = "handler", handler = "start" }
18+
debug = { type = "handler", handler = "debug" }
1219

1320
[security]
1421
# User IDs allowed to add bot to groups and send private messages.

agent-sdk-client/consumer.py

Lines changed: 127 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,91 @@
44
"""
55
import asyncio
66
import json
7+
import re
78
from typing import Any
89

910
import httpx
1011
from telegram import Bot, Update
1112
from telegram.constants import ParseMode, ChatAction
12-
from telegram.helpers import escape_markdown
13+
from telegramify_markdown import markdownify
1314
from telegram.error import BadRequest
1415

1516
from config import Config
1617

1718

19+
def fix_heading_bold(text: str) -> str:
20+
"""Remove bold markers from headings: ## **Title** -> ## Title.
21+
22+
Only applies when heading contains **bold** markers.
23+
"""
24+
if re.search(r'^#{1,6}\s*\*\*', text, flags=re.MULTILINE):
25+
return re.sub(r'^(#{1,6})\s*\*\*(.+?)\*\*\s*$', r'\1 \2', text, flags=re.MULTILINE)
26+
return text
27+
28+
29+
def fix_code_escaping(text: str) -> str:
30+
"""Remove escaping inside code blocks: \\| -> |, \\- -> -.
31+
32+
Only applies when code blocks contain escaped characters.
33+
"""
34+
if '```' not in text and '`' not in text:
35+
return text
36+
37+
escaped_chars = '`|-.()+!#={}[]><_*~'
38+
39+
def unescape(content: str) -> str:
40+
for char in escaped_chars:
41+
content = content.replace(f'\\{char}', char)
42+
return content
43+
44+
# Fix fenced code blocks
45+
if '```' in text:
46+
text = re.sub(
47+
r'```(.*?)```',
48+
lambda m: f'```{unescape(m.group(1))}```',
49+
text,
50+
flags=re.DOTALL
51+
)
52+
# Fix inline code
53+
if '`' in text:
54+
text = re.sub(
55+
r'`([^`]+)`',
56+
lambda m: f'`{unescape(m.group(1))}`',
57+
text
58+
)
59+
return text
60+
61+
62+
def fix_unescaped_chars(text: str) -> str:
63+
"""Escape special chars outside code blocks that markdownify missed.
64+
65+
Only applies when unescaped special chars exist outside code blocks.
66+
"""
67+
# Extract code blocks to protect them
68+
blocks = []
69+
def save(m):
70+
blocks.append(m.group(0))
71+
return f'\x00{len(blocks)-1}\x00'
72+
73+
protected = re.sub(r'```.*?```', save, text, flags=re.DOTALL)
74+
protected = re.sub(r'`[^`]+`', save, protected)
75+
76+
# Check if any unescaped chars exist
77+
chars = r'-.!()+=|{}[]#>'
78+
if not re.search(rf'(?<!\\)[{re.escape(chars)}]', protected):
79+
return text
80+
81+
# Escape unescaped special chars
82+
for char in chars:
83+
protected = re.sub(rf'(?<!\\){re.escape(char)}', f'\\{char}', protected)
84+
85+
# Restore code blocks
86+
for i, block in enumerate(blocks):
87+
protected = protected.replace(f'\x00{i}\x00', block)
88+
89+
return protected
90+
91+
1892
def lambda_handler(event: dict, context: Any) -> dict:
1993
"""SQS Consumer Lambda entry point."""
2094
for record in event['Records']:
@@ -43,6 +117,7 @@ async def process_message(message_data: dict) -> None:
43117
"""Process single message from SQS queue."""
44118
import logging
45119
logger = logging.getLogger()
120+
logger.setLevel(logging.INFO)
46121

47122
config = Config.from_env()
48123
bot = Bot(config.telegram_token)
@@ -55,51 +130,6 @@ async def process_message(message_data: dict) -> None:
55130
logger.warning("Received update with no message or edited_message")
56131
return
57132

58-
cmd = config.get_command(message.text)
59-
if cmd:
60-
if config.is_local_command(cmd):
61-
logger.info(
62-
"Handling local command in consumer (fallback path)",
63-
extra={'chat_id': message.chat_id, 'message_id': message.message_id},
64-
)
65-
try:
66-
await bot.send_message(
67-
chat_id=message.chat_id,
68-
text=config.local_response(cmd),
69-
message_thread_id=message.message_thread_id,
70-
reply_to_message_id=message.message_id,
71-
)
72-
except Exception:
73-
logger.warning("Failed to send local command response", exc_info=True)
74-
return
75-
76-
if not config.is_agent_command(cmd):
77-
# Defensive guard: producer should already block non-agent commands.
78-
logger.info(
79-
"Skipping non-agent command (consumer fallback)",
80-
extra={
81-
'chat_id': message.chat_id,
82-
'message_id': message.message_id,
83-
},
84-
)
85-
try:
86-
await bot.send_message(
87-
chat_id=message.chat_id,
88-
text=config.unknown_command_message(),
89-
message_thread_id=message.message_thread_id,
90-
reply_to_message_id=message.message_id,
91-
)
92-
except Exception:
93-
logger.warning("Failed to send local command response", exc_info=True)
94-
return
95-
96-
# Send typing indicator
97-
await bot.send_chat_action(
98-
chat_id=message.chat_id,
99-
action=ChatAction.TYPING,
100-
message_thread_id=message.message_thread_id,
101-
)
102-
103133
# Initialize result with default error response
104134
# This ensures result is always defined, even if Agent Server call fails
105135
result = {
@@ -112,8 +142,21 @@ async def process_message(message_data: dict) -> None:
112142
user_message = message_data.get('text') or message.text
113143
thread_id = message_data.get('thread_id') or message.message_thread_id
114144

115-
# Call Agent Server
116-
try:
145+
async def keep_typing():
146+
"""Send typing indicator every 4 seconds (Telegram typing expires after 5s)."""
147+
while True:
148+
try:
149+
await bot.send_chat_action(
150+
chat_id=message.chat_id,
151+
action=ChatAction.TYPING,
152+
message_thread_id=thread_id,
153+
)
154+
except Exception:
155+
pass # Ignore typing errors, don't interrupt main flow
156+
await asyncio.sleep(4)
157+
158+
async def call_agent_server():
159+
"""Call Agent Server and return result."""
117160
async with httpx.AsyncClient(timeout=600.0) as client:
118161
response = await client.post(
119162
config.agent_server_url,
@@ -125,10 +168,16 @@ async def process_message(message_data: dict) -> None:
125168
'user_message': user_message,
126169
'chat_id': str(message.chat_id),
127170
'thread_id': str(thread_id) if thread_id else None,
171+
'message_time': message_data.get('message_time'),
128172
},
129173
)
130174
response.raise_for_status()
131-
result = response.json()
175+
return response.json()
176+
177+
# Call Agent Server with continuous typing indicator
178+
typing_task = asyncio.create_task(keep_typing())
179+
try:
180+
result = await call_agent_server()
132181

133182
except httpx.TimeoutException:
134183
logger.warning(f"Agent Server timeout for chat_id={message.chat_id}")
@@ -152,6 +201,14 @@ async def process_message(message_data: dict) -> None:
152201
logger.error(f"Failed to send error message to Telegram: {send_error}")
153202
# Don't re-raise - error message already sent to user, retrying would cause duplicate messages
154203

204+
finally:
205+
# Stop typing indicator
206+
typing_task.cancel()
207+
try:
208+
await typing_task
209+
except asyncio.CancelledError:
210+
pass
211+
155212
# Format response (result is guaranteed to be defined now)
156213
if result.get('is_error'):
157214
text = f"Agent error: {result.get('error_message', 'Unknown')}"
@@ -162,23 +219,38 @@ async def process_message(message_data: dict) -> None:
162219
text = text[:4000] + "\n\n... (truncated)"
163220

164221
# Send response to Telegram
222+
# Convert standard Markdown to Telegram MarkdownV2 format
223+
# Pipeline: fix_heading_bold -> markdownify -> fix_code_escaping -> fix_unescaped_chars
224+
telegram_text = fix_unescaped_chars(fix_code_escaping(markdownify(fix_heading_bold(text))))
225+
226+
# Only reply_to original message if thread_id matches (not for /newchat)
227+
reply_to_id = (
228+
message.message_id
229+
if thread_id == message.message_thread_id
230+
else None
231+
)
232+
165233
try:
166234
await bot.send_message(
167235
chat_id=message.chat_id,
168-
text=text,
236+
text=telegram_text,
169237
parse_mode=ParseMode.MARKDOWN_V2,
170238
message_thread_id=thread_id,
171-
reply_to_message_id=message.message_id,
239+
reply_to_message_id=reply_to_id,
240+
)
241+
logger.info(
242+
"Sent response to Telegram",
243+
extra={'chat_id': message.chat_id, 'thread_id': thread_id},
172244
)
173245
except BadRequest as e:
246+
logger.warning(f"BadRequest sending message: {e}")
174247
if "parse entities" in str(e).lower():
175-
safe_text = escape_markdown(text, version=2)
248+
# Fallback: send as plain text without any formatting
176249
await bot.send_message(
177250
chat_id=message.chat_id,
178-
text=safe_text,
179-
parse_mode=ParseMode.MARKDOWN_V2,
251+
text=text,
180252
message_thread_id=thread_id,
181-
reply_to_message_id=message.message_id,
253+
reply_to_message_id=reply_to_id,
182254
)
183255
else:
184256
raise

0 commit comments

Comments
 (0)