Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions aw-client-rust/src/single_instance.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use dirs::cache_dir;
use fs4::fs_std::FileExt;
use log::{debug, error};
use log::debug;
use std::fs::{File, OpenOptions};
use std::io;
use std::sync::atomic::{AtomicBool, Ordering};
Expand Down Expand Up @@ -48,7 +48,8 @@ impl SingleInstance {
locked: Arc::new(AtomicBool::new(true)),
}),
Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
error!("Another instance is already running");
// Qualified so the unix build doesn't carry an unused `error` import
log::error!("Another instance is already running");
Err(SingleInstanceError::AlreadyRunning)
}
Err(e) => Err(SingleInstanceError::Io(e)),
Expand All @@ -58,7 +59,12 @@ impl SingleInstance {
#[cfg(unix)]
{
// On Unix-like systems, use flock
match OpenOptions::new().write(true).create(true).open(&lockfile) {
match OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&lockfile)
{
Ok(file) => match file.try_lock_exclusive() {
Ok(true) => Ok(SingleInstance {
file: Some(file),
Expand Down
14 changes: 9 additions & 5 deletions aw-client-rust/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ mod test {

// Keep the listener alive until the server binds — prevents TOCTOU race in reserve_port
thread_local! {
static RESERVED_PORT: RefCell<Option<TcpListener>> = RefCell::new(None);
static RESERVED_PORT: RefCell<Option<TcpListener>> = const { RefCell::new(None) };
}

fn wait_for_server(timeout_s: u32, client: &AwClient) {
Expand All @@ -115,13 +115,17 @@ mod test {
use aw_server::endpoints::ServerState;

let state = ServerState {
datastore: Mutex::new(aw_datastore::Datastore::new_in_memory(false)),
datastore: aw_datastore::Datastore::new_in_memory(false),
asset_resolver: AssetResolver::new(None),
device_id: "test_id".to_string(),
};
let mut aw_config = aw_server::config::AWConfig::default();
aw_config.port = port;
aw_config.auth.api_key = api_key.map(str::to_owned);
let aw_config = aw_server::config::AWConfig {
port,
auth: aw_server::config::AWAuthConfig {
api_key: api_key.map(str::to_owned),
},
..Default::default()
};
let server = aw_server::endpoints::build_rocket(state, aw_config);
let server = block_on(server.ignite()).unwrap();
let shutdown_handler = server.shutdown();
Expand Down
4 changes: 4 additions & 0 deletions aw-datastore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ zeroize = { version = "1", optional = true, features = ["alloc"] }
aw-models = { path = "../aw-models" }
aw-transform = { path = "../aw-transform" }
regex = "1"

[dev-dependencies]
# Used by migration tests to construct databases with old schema versions
rusqlite = { version = "0.30", features = ["chrono", "serde_json"] }
77 changes: 58 additions & 19 deletions aw-datastore/src/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ fn _get_db_version(conn: &Connection) -> i32 {
* 2: Added 'data' field to 'buckets' table
* 3: see: https://github.com/ActivityWatch/aw-server-rust/pull/52
* 4: Added 'key_value' table for storing key - value pairs
* 5: Replaced single-column events indexes with a composite index
*/
static NEWEST_DB_VERSION: i32 = 4;
static NEWEST_DB_VERSION: i32 = 5;

fn _create_tables(conn: &Connection, version: i32) -> bool {
let mut first_init = false;
Expand All @@ -52,6 +53,10 @@ fn _create_tables(conn: &Connection, version: i32) -> bool {
_migrate_v3_to_v4(conn);
}

if version < 5 {
_migrate_v4_to_v5(conn);
}

first_init
}

Expand Down Expand Up @@ -169,6 +174,39 @@ fn _migrate_v3_to_v4(conn: &Connection) {
.expect("Failed to update database version!");
}

fn _migrate_v4_to_v5(conn: &Connection) {
info!(
"Upgrading database to v5, replacing single-column events indexes with a composite index"
);
// Every event query filters on bucketrow and a starttime/endtime range,
// ordered by starttime. A composite index serves the seek, the range scan
// and the ORDER BY in one pass (with endtime checked from the index
// without fetching the row), where the single-column indexes could only
// cover one predicate and left the rest as scan + sort. Dropping them
// also makes inserts cheaper (one index to maintain instead of three).
//
// starttime is DESC so a forward scan yields the query's newest-first
// order with equal-timestamp events in rowid (insertion) order, matching
// the ordering callers observed before this index existed.
//
// The drops run before the create so the pages they free are reused to
// build the new index within the same transaction; creating first would
// permanently grow the database file by the new index's size.
conn.execute_batch(
"
BEGIN EXCLUSIVE TRANSACTION;
DROP INDEX IF EXISTS events_bucketrow_index;
DROP INDEX IF EXISTS events_starttime_index;
DROP INDEX IF EXISTS events_endtime_index;
CREATE INDEX IF NOT EXISTS events_bucketrow_starttime_endtime_index
ON events(bucketrow, starttime DESC, endtime);
PRAGMA user_version = 5;
COMMIT;
",
)
.expect("Failed to run v5 migration transaction");
}

pub struct DatastoreInstance {
buckets_cache: HashMap<String, Bucket>,
first_init: bool,
Expand Down Expand Up @@ -207,7 +245,7 @@ impl DatastoreInstance {
}

fn get_stored_buckets(&mut self, conn: &Connection) -> Result<(), DatastoreError> {
let mut stmt = match conn.prepare(
let mut stmt = match conn.prepare_cached(
"
SELECT buckets.id, buckets.name, buckets.type, buckets.client,
buckets.hostname, buckets.created,
Expand Down Expand Up @@ -324,7 +362,7 @@ impl DatastoreInstance {
Some(created) => Some(created),
None => Some(Utc::now()),
};
let mut stmt = match conn.prepare(
let mut stmt = match conn.prepare_cached(
"
INSERT INTO buckets (name, type, client, hostname, created, data)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
Expand Down Expand Up @@ -430,7 +468,7 @@ impl DatastoreInstance {
) -> Result<Vec<Event>, DatastoreError> {
let mut bucket = self.get_bucket(bucket_id)?;

let mut stmt = match conn.prepare(
let mut stmt = match conn.prepare_cached(
"
INSERT OR REPLACE INTO events(bucketrow, id, starttime, endtime, data)
VALUES (?1, ?2, ?3, ?4, ?5)",
Expand Down Expand Up @@ -484,7 +522,7 @@ impl DatastoreInstance {
event_ids: Vec<i64>,
) -> Result<(), DatastoreError> {
let bucket = self.get_bucket(bucket_id)?;
let mut stmt = match conn.prepare(
let mut stmt = match conn.prepare_cached(
"
DELETE FROM events
WHERE bucketrow = ?1 AND id = ?2",
Expand Down Expand Up @@ -557,7 +595,7 @@ impl DatastoreInstance {
let mut bucket = self.get_bucket(bucket_id)?;

// Use event ID directly instead of max(endtime) to avoid mismatch with get_events ordering
let mut stmt = match conn.prepare(
let mut stmt = match conn.prepare_cached(
"
UPDATE events
SET starttime = ?2, endtime = ?3, data = ?4
Expand Down Expand Up @@ -665,7 +703,7 @@ impl DatastoreInstance {
) -> Result<Event, DatastoreError> {
let bucket = self.get_bucket(bucket_id)?;

let mut stmt = match conn.prepare(
let mut stmt = match conn.prepare_cached(
"
SELECT id, starttime, endtime, data
FROM events
Expand Down Expand Up @@ -743,7 +781,7 @@ impl DatastoreInstance {
None => -1,
};

let mut stmt = match conn.prepare(
let mut stmt = match conn.prepare_cached(
"
SELECT id, starttime, endtime, data
FROM events
Expand Down Expand Up @@ -866,7 +904,7 @@ impl DatastoreInstance {
return Ok(0);
}

let mut stmt = match conn.prepare(
let mut stmt = match conn.prepare_cached(
"
SELECT count(*) FROM events
WHERE bucketrow = ?1
Expand Down Expand Up @@ -906,7 +944,7 @@ impl DatastoreInstance {
key: &str,
data: &str,
) -> Result<(), DatastoreError> {
let mut stmt = match conn.prepare(
let mut stmt = match conn.prepare_cached(
"
INSERT OR REPLACE INTO key_value(key, value, last_modified)
VALUES (?1, ?2, ?3)",
Expand All @@ -932,7 +970,7 @@ impl DatastoreInstance {
}

pub fn get_key_value(&self, conn: &Connection, key: &str) -> Result<String, DatastoreError> {
let mut stmt = match conn.prepare(
let mut stmt = match conn.prepare_cached(
"
SELECT * FROM key_value WHERE KEY = ?1",
) {
Expand Down Expand Up @@ -962,14 +1000,15 @@ impl DatastoreInstance {
conn: &Connection,
pattern: &str,
) -> Result<HashMap<String, String>, DatastoreError> {
let mut stmt = match conn.prepare("SELECT key, value FROM key_value WHERE key LIKE ?") {
Ok(stmt) => stmt,
Err(err) => {
return Err(DatastoreError::InternalError(format!(
"Failed to prepare get_value SQL statement: {err}"
)))
}
};
let mut stmt =
match conn.prepare_cached("SELECT key, value FROM key_value WHERE key LIKE ?") {
Ok(stmt) => stmt,
Err(err) => {
return Err(DatastoreError::InternalError(format!(
"Failed to prepare get_value SQL statement: {err}"
)))
}
};

let mut output = HashMap::<String, String>::new();
// Rusqlite's get wants index and item type as parameters.
Expand Down
2 changes: 1 addition & 1 deletion aw-datastore/src/privacy_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl PrivacyFilterEngine {
.map_err(|e| format!("Failed to parse privacy filter rules: {e}"))?;
for rule in &rules {
if rule.action == PrivacyFilterAction::Redact
&& rule.replacement.as_deref().map_or(true, str::is_empty)
&& rule.replacement.as_deref().is_none_or(str::is_empty)
{
return Err(format!(
"Redact rule with pattern {:?} is missing `replacement` — add a non-empty replacement string or use action=drop",
Expand Down
Loading
Loading