Skip to content
Closed

Sync #733

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
965 changes: 844 additions & 121 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ uuid = { version = "1", features = ["v4", "v5", "serde"] }

# Database
rusqlite = { version = "0.31", features = ["bundled", "serde_json"] }
mongodb = { version = "3" }
bson = { version = "2", features = ["chrono-0_4"] }

# CLI
clap = { version = "4", features = ["derive"] }
Expand Down
17 changes: 10 additions & 7 deletions crates/openfang-api/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5169,7 +5169,8 @@ pub async fn usage_stats(State(state): State<Arc<AppState>>) -> impl IntoRespons

/// GET /api/usage/summary — Get overall usage summary from UsageStore.
pub async fn usage_summary(State(state): State<Arc<AppState>>) -> impl IntoResponse {
match state.kernel.memory.usage().query_summary(None) {
let usage_store = state.kernel.memory.create_usage_store();
match usage_store.query_summary(None) {
Ok(s) => Json(serde_json::json!({
"total_input_tokens": s.total_input_tokens,
"total_output_tokens": s.total_output_tokens,
Expand All @@ -5189,7 +5190,8 @@ pub async fn usage_summary(State(state): State<Arc<AppState>>) -> impl IntoRespo

/// GET /api/usage/by-model — Get usage grouped by model.
pub async fn usage_by_model(State(state): State<Arc<AppState>>) -> impl IntoResponse {
match state.kernel.memory.usage().query_by_model() {
let usage_store = state.kernel.memory.create_usage_store();
match usage_store.query_by_model() {
Ok(models) => {
let list: Vec<serde_json::Value> = models
.iter()
Expand All @@ -5211,9 +5213,10 @@ pub async fn usage_by_model(State(state): State<Arc<AppState>>) -> impl IntoResp

/// GET /api/usage/daily — Get daily usage breakdown for the last 7 days.
pub async fn usage_daily(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let days = state.kernel.memory.usage().query_daily_breakdown(7);
let today_cost = state.kernel.memory.usage().query_today_cost();
let first_event = state.kernel.memory.usage().query_first_event_date();
let usage_store = state.kernel.memory.create_usage_store();
let days = usage_store.query_daily_breakdown(7);
let today_cost = usage_store.query_today_cost();
let first_event = usage_store.query_first_event_date();

let days_list = match days {
Ok(d) => d
Expand Down Expand Up @@ -5312,7 +5315,7 @@ pub async fn agent_budget_status(
};

let quota = &entry.manifest.resources;
let usage_store = openfang_memory::usage::UsageStore::new(state.kernel.memory.usage_conn());
let usage_store = state.kernel.memory.create_usage_store();
let hourly = usage_store.query_hourly(agent_id).unwrap_or(0.0);
let daily = usage_store.query_daily(agent_id).unwrap_or(0.0);
let monthly = usage_store.query_monthly(agent_id).unwrap_or(0.0);
Expand Down Expand Up @@ -5352,7 +5355,7 @@ pub async fn agent_budget_status(

/// GET /api/budget/agents — Per-agent cost ranking (top spenders).
pub async fn agent_budget_ranking(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let usage_store = openfang_memory::usage::UsageStore::new(state.kernel.memory.usage_conn());
let usage_store = state.kernel.memory.create_usage_store();
let agents: Vec<serde_json::Value> = state
.kernel
.registry
Expand Down
2,990 changes: 2,990 additions & 0 deletions crates/openfang-desktop/gen/schemas/macOS-schema.json

Large diffs are not rendered by default.

39 changes: 26 additions & 13 deletions crates/openfang-kernel/src/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,16 +555,26 @@ impl OpenFangKernel {
std::fs::create_dir_all(&config.data_dir)
.map_err(|e| KernelError::BootFailed(format!("Failed to create data dir: {e}")))?;

// Initialize memory substrate
let db_path = config
.memory
.sqlite_path
.clone()
.unwrap_or_else(|| config.data_dir.join("openfang.db"));
let memory = Arc::new(
MemorySubstrate::open(&db_path, config.memory.decay_rate)
.map_err(|e| KernelError::BootFailed(format!("Memory init failed: {e}")))?,
);
// Initialize memory substrate (SQLite or MongoDB)
let memory: Arc<MemorySubstrate> = Arc::new(match config.memory.backend.as_str() {
"mongodb" => {
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(
MemorySubstrate::open_with_config(&config.memory),
)
})
.map_err(|e| KernelError::BootFailed(format!("MongoDB memory init failed: {e}")))?
}
_ => {
let db_path = config
.memory
.sqlite_path
.clone()
.unwrap_or_else(|| config.data_dir.join("openfang.db"));
MemorySubstrate::open(&db_path, config.memory.decay_rate)
.map_err(|e| KernelError::BootFailed(format!("Memory init failed: {e}")))?
}
});

// Initialize credential resolver (vault → dotenv → env var)
let credential_resolver = {
Expand Down Expand Up @@ -715,9 +725,9 @@ impl OpenFangKernel {
Arc::new(StubDriver) as Arc<dyn LlmDriver>
};

// Initialize metering engine (shares the same SQLite connection as the memory substrate)
// Initialize metering engine (shares the same DB connection as the memory substrate)
let metering = Arc::new(MeteringEngine::new(Arc::new(
openfang_memory::usage::UsageStore::new(memory.usage_conn()),
memory.create_usage_store(),
)));

let supervisor = Supervisor::new();
Expand Down Expand Up @@ -1010,7 +1020,10 @@ impl OpenFangKernel {
workflows: WorkflowEngine::new(),
triggers: TriggerEngine::new(),
background,
audit_log: Arc::new(AuditLog::with_db(memory.usage_conn())),
audit_log: Arc::new(match memory.is_mongo() {
true => AuditLog::with_mongo_db(memory.mongo_db().unwrap()),
false => AuditLog::with_db(memory.usage_conn().unwrap()),
}),
metering,
default_driver: driver,
wasm_sandbox,
Expand Down
2 changes: 1 addition & 1 deletion crates/openfang-kernel/src/metering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ mod tests {

fn setup() -> MeteringEngine {
let substrate = MemorySubstrate::open_in_memory(0.1).unwrap();
let store = Arc::new(UsageStore::new(substrate.usage_conn()));
let store = Arc::new(substrate.create_usage_store());
MeteringEngine::new(store)
}

Expand Down
3 changes: 3 additions & 0 deletions crates/openfang-memory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ serde = { workspace = true }
serde_json = { workspace = true }
rmp-serde = { workspace = true }
rusqlite = { workspace = true }
mongodb = { workspace = true }
bson = { workspace = true }
futures = { workspace = true }
chrono = { workspace = true }
uuid = { workspace = true }
thiserror = { workspace = true }
Expand Down
14 changes: 9 additions & 5 deletions crates/openfang-memory/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
//! Memory substrate for the OpenFang Agent Operating System.
//!
//! Provides a unified memory API over three storage backends:
//! - **Structured store** (SQLite): Key-value pairs, sessions, agent state
//! - **Semantic store**: Text-based search (Phase 1: LIKE matching, Phase 2: Qdrant vectors)
//! - **Knowledge graph** (SQLite): Entities and relations
//! Provides a unified memory API backed by either SQLite or MongoDB.
//! The backend is selected via `MemoryConfig::backend` ("sqlite" or "mongodb").
//!
//! Agents interact with a single `Memory` trait that abstracts over all three stores.
//! Storage layers:
//! - **Structured store**: Key-value pairs, sessions, agent state
//! - **Semantic store**: Text-based search with optional vector embeddings
//! - **Knowledge graph**: Entities and relations
//!
//! Agents interact with a single `Memory` trait that abstracts over all stores.

pub mod consolidation;
pub mod knowledge;
pub mod migration;
pub mod mongo;
pub mod semantic;
pub mod session;
pub mod structured;
Expand Down
63 changes: 63 additions & 0 deletions crates/openfang-memory/src/mongo/consolidation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
//! MongoDB memory consolidation and decay logic.

use chrono::Utc;
use bson::doc;
use mongodb::Collection;
use openfang_types::error::{OpenFangError, OpenFangResult};
use openfang_types::memory::ConsolidationReport;

/// Memory consolidation engine backed by MongoDB.
#[derive(Clone)]
pub struct MongoConsolidationEngine {
memories: Collection<bson::Document>,
decay_rate: f32,
}

impl MongoConsolidationEngine {
pub fn new(db: mongodb::Database, decay_rate: f32) -> Self {
Self {
memories: db.collection("memories"),
decay_rate,
}
}

/// Run a consolidation cycle: decay old memories.
pub async fn consolidate(&self) -> OpenFangResult<ConsolidationReport> {
let start = std::time::Instant::now();

// Decay confidence of memories not accessed in the last 7 days
let cutoff = bson::DateTime::from_chrono(
Utc::now() - chrono::Duration::days(7),
);
let decay_factor = 1.0 - self.decay_rate as f64;

let filter = doc! {
"deleted": false,
"accessed_at": { "$lt": cutoff },
"confidence": { "$gt": 0.1 },
};

// Use an aggregation pipeline update to compute new confidence in-place
let update = vec![doc! {
"$set": {
"confidence": {
"$max": [0.1, { "$multiply": ["$confidence", decay_factor] }]
}
}
}];

let result = self
.memories
.update_many(filter, update)
.await
.map_err(|e| OpenFangError::Memory(e.to_string()))?;

let duration_ms = start.elapsed().as_millis() as u64;

Ok(ConsolidationReport {
memories_merged: 0,
memories_decayed: result.modified_count,
duration_ms,
})
}
}
154 changes: 154 additions & 0 deletions crates/openfang-memory/src/mongo/indexes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
//! MongoDB collection index setup.

use mongodb::Database;
use mongodb::IndexModel;
use mongodb::options::IndexOptions;
use bson::doc;
use openfang_types::error::{OpenFangError, OpenFangResult};

/// Ensure all collections exist and indexes are created.
pub async fn ensure_indexes(db: &Database) -> OpenFangResult<()> {
// agents
db.collection::<bson::Document>("agents")
.create_index(
IndexModel::builder()
.keys(doc! { "name": 1 })
.options(IndexOptions::builder().unique(true).build())
.build(),
)
.await
.map_err(|e| OpenFangError::Memory(format!("Index creation failed (agents): {e}")))?;

// kv_store
db.collection::<bson::Document>("kv_store")
.create_index(
IndexModel::builder()
.keys(doc! { "agent_id": 1, "key": 1 })
.options(IndexOptions::builder().unique(true).build())
.build(),
)
.await
.map_err(|e| OpenFangError::Memory(format!("Index creation failed (kv_store): {e}")))?;

// sessions
let sessions = db.collection::<bson::Document>("sessions");
sessions
.create_index(IndexModel::builder().keys(doc! { "agent_id": 1 }).build())
.await
.map_err(|e| OpenFangError::Memory(format!("Index creation failed (sessions): {e}")))?;
sessions
.create_index(
IndexModel::builder()
.keys(doc! { "agent_id": 1, "label": 1 })
.build(),
)
.await
.map_err(|e| OpenFangError::Memory(format!("Index creation failed (sessions label): {e}")))?;

// memories
let memories = db.collection::<bson::Document>("memories");
memories
.create_index(IndexModel::builder().keys(doc! { "agent_id": 1 }).build())
.await
.map_err(|e| OpenFangError::Memory(format!("Index creation failed (memories agent): {e}")))?;
memories
.create_index(
IndexModel::builder()
.keys(doc! { "deleted": 1, "scope": 1 })
.build(),
)
.await
.map_err(|e| OpenFangError::Memory(format!("Index creation failed (memories deleted): {e}")))?;
memories
.create_index(
IndexModel::builder()
.keys(doc! { "accessed_at": -1 })
.build(),
)
.await
.map_err(|e| OpenFangError::Memory(format!("Index creation failed (memories accessed): {e}")))?;

// entities
db.collection::<bson::Document>("entities")
.create_index(IndexModel::builder().keys(doc! { "name": 1 }).build())
.await
.map_err(|e| OpenFangError::Memory(format!("Index creation failed (entities): {e}")))?;

// relations
let relations = db.collection::<bson::Document>("relations");
relations
.create_index(
IndexModel::builder()
.keys(doc! { "source_entity": 1 })
.build(),
)
.await
.map_err(|e| OpenFangError::Memory(format!("Index creation failed (relations src): {e}")))?;
relations
.create_index(
IndexModel::builder()
.keys(doc! { "target_entity": 1 })
.build(),
)
.await
.map_err(|e| OpenFangError::Memory(format!("Index creation failed (relations tgt): {e}")))?;
relations
.create_index(
IndexModel::builder()
.keys(doc! { "relation_type": 1 })
.build(),
)
.await
.map_err(|e| OpenFangError::Memory(format!("Index creation failed (relations type): {e}")))?;

// usage_events
let usage = db.collection::<bson::Document>("usage_events");
usage
.create_index(
IndexModel::builder()
.keys(doc! { "agent_id": 1, "timestamp": -1 })
.build(),
)
.await
.map_err(|e| OpenFangError::Memory(format!("Index creation failed (usage agent): {e}")))?;
usage
.create_index(
IndexModel::builder()
.keys(doc! { "timestamp": -1 })
.build(),
)
.await
.map_err(|e| OpenFangError::Memory(format!("Index creation failed (usage ts): {e}")))?;

// task_queue
db.collection::<bson::Document>("task_queue")
.create_index(
IndexModel::builder()
.keys(doc! { "status": 1, "priority": -1, "created_at": 1 })
.build(),
)
.await
.map_err(|e| OpenFangError::Memory(format!("Index creation failed (task_queue): {e}")))?;

// audit_entries
let audit = db.collection::<bson::Document>("audit_entries");
audit
.create_index(
IndexModel::builder()
.keys(doc! { "seq": 1 })
.options(IndexOptions::builder().unique(true).build())
.build(),
)
.await
.map_err(|e| OpenFangError::Memory(format!("Index creation failed (audit seq): {e}")))?;
audit
.create_index(
IndexModel::builder()
.keys(doc! { "agent_id": 1 })
.build(),
)
.await
.map_err(|e| OpenFangError::Memory(format!("Index creation failed (audit agent): {e}")))?;

Ok(())
}
Loading