Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions packages/agent-server-rust/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ fn migrate_to_encrypted(path: &str, key: &str) -> Result<Connection, String> {
std::fs::remove_file(path).map_err(|e| format!("Failed to remove old db: {e}"))?;
std::fs::remove_file(format!("{path}-wal")).ok();
std::fs::remove_file(format!("{path}-shm")).ok();
std::fs::rename(&tmp_path, path)
.map_err(|e| format!("Failed to rename encrypted db: {e}"))?;
std::fs::rename(&tmp_path, path).map_err(|e| format!("Failed to rename encrypted db: {e}"))?;

tracing::info!("[DB] Migrated unencrypted database to encrypted");

Expand All @@ -72,8 +71,7 @@ fn migrate_to_encrypted(path: &str, key: &str) -> Result<Connection, String> {

/// Initialize the database: set encryption key, run migrations, set pragmas.
pub fn init_db() -> Result<(), String> {
let db_path =
std::env::var("AGENT_DB_PATH").unwrap_or_else(|_| "/data/agent.db".to_string());
let db_path = std::env::var("AGENT_DB_PATH").unwrap_or_else(|_| "/data/agent.db".to_string());

// Ensure directory exists
if let Some(parent) = Path::new(&db_path).parent() {
Expand Down Expand Up @@ -106,8 +104,7 @@ pub fn init_db() -> Result<(), String> {
}
} else {
// New DB — just open encrypted
open_encrypted(&db_path, key)
.map_err(|e| format!("Failed to create encrypted db: {e}"))?
open_encrypted(&db_path, key).map_err(|e| format!("Failed to create encrypted db: {e}"))?
};

// Run migrations (refinery)
Expand Down
92 changes: 91 additions & 1 deletion packages/agent-server-rust/src/db/queries.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use rusqlite::{params, Connection};
use std::collections::HashMap;

// ============================================
// SYNC STATE QUERIES
Expand Down Expand Up @@ -37,6 +38,91 @@ pub fn set_sync_state(conn: &Connection, key: &str, value: &str, session_id: Opt
.ok();
}

const PAYMENT_RECEIPTS_PREFIX: &str = "payment_receipts:";

fn payment_receipts_key(chat_id: &str) -> String {
format!("{PAYMENT_RECEIPTS_PREFIX}{chat_id}")
}

fn read_payment_receipts(
conn: &Connection,
session_id: &str,
chat_id: &str,
) -> HashMap<String, String> {
get_sync_state(conn, &payment_receipts_key(chat_id), Some(session_id))
.and_then(|json| serde_json::from_str::<HashMap<String, String>>(&json).ok())
.unwrap_or_default()
}

fn write_payment_receipts(
conn: &Connection,
session_id: &str,
chat_id: &str,
receipts: &HashMap<String, String>,
) {
if let Ok(json) = serde_json::to_string(receipts) {
set_sync_state(
conn,
&payment_receipts_key(chat_id),
&json,
Some(session_id),
);
}
}

pub fn get_payment_receipts(
conn: &Connection,
session_id: &str,
chat_id: &str,
) -> HashMap<i64, String> {
read_payment_receipts(conn, session_id, chat_id)
.into_iter()
.filter_map(|(local_id, received_at)| {
local_id.parse::<i64>().ok().map(|id| (id, received_at))
})
.collect()
}

pub fn mark_payment_received(
conn: &Connection,
session_id: &str,
chat_id: &str,
local_id: i64,
) -> String {
let mut receipts = read_payment_receipts(conn, session_id, chat_id);
let now = chrono::Utc::now().to_rfc3339();
let received_at = receipts
.entry(local_id.to_string())
.or_insert_with(|| now.clone())
.clone();

write_payment_receipts(conn, session_id, chat_id, &receipts);

received_at
}

pub fn remove_payment_receipts(
conn: &Connection,
session_id: &str,
chat_id: &str,
local_ids: &[i64],
) {
if local_ids.is_empty() {
return;
}

let mut receipts = read_payment_receipts(conn, session_id, chat_id);
let mut changed = false;

for local_id in local_ids {
changed |= receipts.remove(&local_id.to_string()).is_some();
}

if changed {
write_payment_receipts(conn, session_id, chat_id, &receipts);
}
}

// ============================================
// SESSION QUERIES
// ============================================
Expand All @@ -57,7 +143,11 @@ pub fn update_session_logged_in_user(
logged_in_user: Option<&str>,
) {
let now = chrono::Utc::now().to_rfc3339();
let login_state = if logged_in_user.is_some() { "logged_in" } else { "logged_out" };
let login_state = if logged_in_user.is_some() {
"logged_in"
} else {
"logged_out"
};
conn.execute(
"UPDATE sessions SET logged_in_user = ?1, login_state = ?2, updated_at = ?3 WHERE id = ?4",
params![logged_in_user, login_state, now, session_id],
Expand Down
4 changes: 3 additions & 1 deletion packages/agent-server-rust/src/execution/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ pub fn execute_action<'a>(
let args_ref: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
exec_command("click", &args_ref, options).await;
} else {
tracing::warn!("[action] click selector '{selector}' matched but no bounds");
tracing::warn!(
"[action] click selector '{selector}' matched but no bounds"
);
}
} else {
tracing::warn!("[action] click selector '{selector}' — no match");
Expand Down
89 changes: 54 additions & 35 deletions packages/agent-server-rust/src/execution/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
pub mod actions;

use base64::Engine;
use crate::context::Context;
use crate::ia::{find_state_by_id, identify_states};
use crate::db::get_db;
use crate::effects::collect_effects;
use crate::ia::types::*;
use crate::ia::{find_state_by_id, identify_states};
use crate::tools::a11y::get_a11y_desktop;
use crate::tools::exec::ExecOptions;
use crate::tools::screenshot::capture_screenshot;
use crate::effects::collect_effects;
use crate::db::get_db;
use base64::Engine;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;

Expand Down Expand Up @@ -73,20 +73,26 @@ where
for step in 0..MAX_STEPS {
// Check execution timeout
if execution_start.elapsed().as_millis() as u64 > EXECUTION_TIMEOUT_MS {
return (ExecutionResult {
success: false,
error: Some(format!(
"Execution timeout after {}s",
execution_start.elapsed().as_secs()
)),
}, plan_state);
return (
ExecutionResult {
success: false,
error: Some(format!(
"Execution timeout after {}s",
execution_start.elapsed().as_secs()
)),
},
plan_state,
);
}

if cancel.is_cancelled() {
return (ExecutionResult {
success: false,
error: Some("Aborted".to_string()),
}, plan_state);
return (
ExecutionResult {
success: false,
error: Some("Aborted".to_string()),
},
plan_state,
);
}

// 1. OBSERVE: get a11y tree + screenshot
Expand All @@ -112,13 +118,16 @@ where
let elapsed = unknown_state_since.unwrap().elapsed();
if elapsed.as_millis() as u64 > UNKNOWN_STATE_TIMEOUT_MS {
tracing::error!("[exec] Unknown state timeout after {}s", elapsed.as_secs());
return (ExecutionResult {
success: false,
error: Some(format!(
"Unknown state for {}s - no matching IAState found",
elapsed.as_secs()
)),
}, plan_state);
return (
ExecutionResult {
success: false,
error: Some(format!(
"Unknown state for {}s - no matching IAState found",
elapsed.as_secs()
)),
},
plan_state,
);
}
tracing::warn!("[exec] Unknown state ({}s), waiting...", elapsed.as_secs());
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
Expand Down Expand Up @@ -243,28 +252,38 @@ where

// 7. EXECUTE: run the action (emits fire inline via callback)
if let Some(sel) = &selected {
actions::execute_action(&sel.action, sel.frame.as_ref(), &exec_options, &a11y, emit).await;
actions::execute_action(&sel.action, sel.frame.as_ref(), &exec_options, &a11y, emit)
.await;
}

// 8. GOAL CHECK (after action)
if plan.is_goal_reached(&context.state, &plan_state) {
return (ExecutionResult {
success: true,
error: None,
}, plan_state);
return (
ExecutionResult {
success: true,
error: None,
},
plan_state,
);
}

// No action = stuck (only if plan returns None)
if selected.is_none() {
return (ExecutionResult {
success: false,
error: Some("No action selected".to_string()),
}, plan_state);
return (
ExecutionResult {
success: false,
error: Some("No action selected".to_string()),
},
plan_state,
);
}
}

(ExecutionResult {
success: false,
error: Some("Max steps reached".to_string()),
}, plan_state)
(
ExecutionResult {
success: false,
error: Some("Max steps reached".to_string()),
},
plan_state,
)
}
19 changes: 15 additions & 4 deletions packages/agent-server-rust/src/ia/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ pub fn get_bounds_center(bounds: &Bounds) -> (f64, f64) {
pub fn frame_hint_from_node(node: &A11yNode) -> Option<FrameHint> {
let bounds = node.bounds.clone()?;
Some(FrameHint {
name: if node.name.is_empty() { None } else { Some(node.name.clone()) },
name: if node.name.is_empty() {
None
} else {
Some(node.name.clone())
},
bounds,
pid: node.window.as_ref().map(|w| w.pid),
})
Expand All @@ -50,8 +54,16 @@ pub fn frame_hint_from_node(node: &A11yNode) -> Option<FrameHint> {
/// Walks the tree top-down, preferring deeper frames so we get the tightest
/// enclosing frame (e.g. "Settings" frame, not the root desktop-frame).
pub fn find_frame_for(a11y: &A11yNode, selector: &str) -> Option<FrameHint> {
fn walk<'a>(node: &'a A11yNode, selector: &str, current_frame: Option<&'a A11yNode>) -> Option<&'a A11yNode> {
let frame = if node.role == "frame" { Some(node) } else { current_frame };
fn walk<'a>(
node: &'a A11yNode,
selector: &str,
current_frame: Option<&'a A11yNode>,
) -> Option<&'a A11yNode> {
let frame = if node.role == "frame" {
Some(node)
} else {
current_frame
};

// If this subtree contains the target, the deepest frame wins
if query_selector(node, selector).is_some() {
Expand All @@ -70,4 +82,3 @@ pub fn find_frame_for(a11y: &A11yNode, selector: &str) -> Option<FrameHint> {
}
walk(a11y, selector, None).and_then(frame_hint_from_node)
}

3 changes: 2 additions & 1 deletion packages/agent-server-rust/src/ia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ pub fn identify_states(a11y_tree: &A11yNode, screenshot: &str) -> IdentifiedStat
}

// Stop if we found all slots
if main_window.is_some() && popup.is_some() && contact_card.is_some() && settings.is_some() {
if main_window.is_some() && popup.is_some() && contact_card.is_some() && settings.is_some()
{
break;
}
}
Expand Down
Loading