diff --git a/absurd-sqlite-extension/src/checkpoint.rs b/absurd-sqlite-extension/src/checkpoint.rs index 1bb4735..a275ae4 100644 --- a/absurd-sqlite-extension/src/checkpoint.rs +++ b/absurd-sqlite-extension/src/checkpoint.rs @@ -1,4 +1,5 @@ use crate::sql; +use crate::types::TaskState; use crate::validate; use serde_json::Value as JsonValue; use sqlite3ext_sys::sqlite3; @@ -93,8 +94,11 @@ pub fn absurd_set_task_checkpoint_state( let task_state = row .get::(1) .map_err(|err| Error::new_message(format!("failed to read task state: {:?}", err)))?; + let task_state = task_state + .parse::() + .map_err(|err| Error::new_message(format!("invalid task state: {}", err)))?; - if task_state == "cancelled" { + if task_state == TaskState::Cancelled { return Err(Error::new_message("Task has been cancelled")); } diff --git a/absurd-sqlite-extension/src/claim.rs b/absurd-sqlite-extension/src/claim.rs index 5940146..aa585a2 100644 --- a/absurd-sqlite-extension/src/claim.rs +++ b/absurd-sqlite-extension/src/claim.rs @@ -1,5 +1,6 @@ use crate::retry; use crate::sql; +use crate::types::{RunState, TaskState}; use crate::validate; use serde_json::Value as JsonValue; use sqlite3ext_sys::sqlite3; @@ -241,7 +242,7 @@ fn expire_claims(db: *mut sqlite3, queue_name: &str, now: i64) -> Result<()> { Some(max_attempts) }; let allow_retry = max_attempts_opt.is_none_or(|max| next_attempt <= max); - let mut task_state = "failed"; + let mut task_state = TaskState::Failed; let mut last_attempt_run = run_id.clone(); let mut cancelled_at = ""; let mut recorded_attempt = attempt; @@ -261,17 +262,18 @@ fn expire_claims(db: *mut sqlite3, queue_name: &str, now: i64) -> Result<()> { }; if cancel_task { - task_state = "cancelled"; + task_state = TaskState::Cancelled; cancelled_at = &now_value; } else { let new_run_id = Uuid::now_v7().to_string(); let next_available_value = next_available.to_string(); let next_attempt_value = next_attempt.to_string(); let run_state = if next_available > now { - "sleeping" + RunState::Sleeping } else { - "pending" + RunState::Pending }; + let run_state_str = run_state.to_string(); sql::exec_with_bind_text( db, "insert into absurd_runs ( @@ -303,17 +305,29 @@ fn expire_claims(db: *mut sqlite3, queue_name: &str, now: i64) -> Result<()> { &new_run_id, &task_id, &next_attempt_value, - run_state, + &run_state_str, &next_available_value, ], )?; - task_state = run_state; + task_state = match run_state { + RunState::Sleeping => TaskState::Sleeping, + RunState::Pending => TaskState::Pending, + // These states are impossible here since run_state is derived from + // the conditional above which only produces Sleeping or Pending + RunState::Running + | RunState::Completed + | RunState::Failed + | RunState::Cancelled => { + unreachable!("run_state can only be Sleeping or Pending in this context") + } + }; last_attempt_run = new_run_id; recorded_attempt = next_attempt; } } let attempt_value = recorded_attempt.to_string(); + let task_state_str = task_state.to_string(); sql::exec_with_bind_text( db, "update absurd_tasks @@ -327,7 +341,7 @@ fn expire_claims(db: *mut sqlite3, queue_name: &str, now: i64) -> Result<()> { where queue_name = ?5 and task_id = ?6", &[ - task_state, + &task_state_str, &attempt_value, &last_attempt_run, cancelled_at, diff --git a/absurd-sqlite-extension/src/event.rs b/absurd-sqlite-extension/src/event.rs index b9f0aba..8b5317b 100644 --- a/absurd-sqlite-extension/src/event.rs +++ b/absurd-sqlite-extension/src/event.rs @@ -1,4 +1,5 @@ use crate::sql; +use crate::types::{RunState, TaskState}; use crate::validate; use serde_json::Value as JsonValue; use sqlite3ext_sys::sqlite3; @@ -146,8 +147,11 @@ fn await_event_impl( let task_state = run_row .get::(3) .map_err(|err| Error::new_message(format!("failed to read task state: {:?}", err)))?; + let task_state = task_state + .parse::() + .map_err(|err| Error::new_message(format!("invalid task state: {}", err)))?; - if task_state == "cancelled" { + if task_state == TaskState::Cancelled { return Err(Error::new_message("Task has been cancelled")); } @@ -191,7 +195,11 @@ fn await_event_impl( } } - if run_state != "running" { + let run_state = run_state + .parse::() + .map_err(|err| Error::new_message(format!("invalid run state: {}", err)))?; + + if run_state != RunState::Running { return Err(Error::new_message( "Run must be running to await absurd_events", )); diff --git a/absurd-sqlite-extension/src/lib.rs b/absurd-sqlite-extension/src/lib.rs index fb32dc4..685f6b3 100644 --- a/absurd-sqlite-extension/src/lib.rs +++ b/absurd-sqlite-extension/src/lib.rs @@ -15,8 +15,12 @@ mod run; mod settings; mod spawn; mod sql; +pub mod types; mod validate; +// Re-export public types for convenience +pub use types::{RunState, TaskState}; + /// SQL: absurd_version() /// Usage: return extension version and git commit. /// Section: Meta diff --git a/absurd-sqlite-extension/src/run.rs b/absurd-sqlite-extension/src/run.rs index 5d181b5..b0897e4 100644 --- a/absurd-sqlite-extension/src/run.rs +++ b/absurd-sqlite-extension/src/run.rs @@ -1,5 +1,6 @@ use crate::retry; use crate::sql; +use crate::types::{RunState, TaskState}; use crate::validate; use chrono::DateTime; use serde_json::Value as JsonValue; @@ -179,7 +180,7 @@ fn fail_run_impl( Some(max_attempts) }; let allow_retry = max_attempts_opt.is_none_or(|max| next_attempt <= max); - let mut task_state = "failed"; + let mut task_state = TaskState::Failed; let mut last_attempt_run = run_id.to_string(); let mut cancelled_at = ""; let mut recorded_attempt = attempt; @@ -202,17 +203,18 @@ fn fail_run_impl( }; if cancel_task { - task_state = "cancelled"; + task_state = TaskState::Cancelled; cancelled_at = &now_value; } else { let new_run_id = Uuid::now_v7().to_string(); let next_available_value = next_available.to_string(); let next_attempt_value = next_attempt.to_string(); let run_state = if next_available > now { - "sleeping" + RunState::Sleeping } else { - "pending" + RunState::Pending }; + let run_state_str = run_state.to_string(); sql::exec_with_bind_text( db, "insert into absurd_runs ( @@ -244,17 +246,29 @@ fn fail_run_impl( &new_run_id, &task_id, &next_attempt_value, - run_state, + &run_state_str, &next_available_value, ], )?; - task_state = run_state; + task_state = match run_state { + RunState::Sleeping => TaskState::Sleeping, + RunState::Pending => TaskState::Pending, + // These states are impossible here since run_state is derived from + // the conditional above which only produces Sleeping or Pending + RunState::Running + | RunState::Completed + | RunState::Failed + | RunState::Cancelled => { + unreachable!("run_state can only be Sleeping or Pending in this context") + } + }; last_attempt_run = new_run_id; recorded_attempt = next_attempt; } } let attempt_value = recorded_attempt.to_string(); + let task_state_str = task_state.to_string(); sql::exec_with_bind_text( db, "update absurd_tasks @@ -268,7 +282,7 @@ fn fail_run_impl( where queue_name = ?5 and task_id = ?6", &[ - task_state, + &task_state_str, &attempt_value, &last_attempt_run, cancelled_at, @@ -360,8 +374,11 @@ pub fn absurd_complete_run( let run_state = row .get::(1) .map_err(|err| Error::new_message(format!("failed to read state: {:?}", err)))?; + let run_state = run_state + .parse::() + .map_err(|err| Error::new_message(format!("invalid run state: {}", err)))?; - if run_state != "running" { + if run_state != RunState::Running { return Err(Error::new_message("run is not currently running")); } @@ -486,8 +503,11 @@ pub fn absurd_extend_claim( let task_state = row .get::(0) .map_err(|err| Error::new_message(format!("failed to read state: {:?}", err)))?; + let task_state = task_state + .parse::() + .map_err(|err| Error::new_message(format!("invalid task state: {}", err)))?; - if task_state == "cancelled" { + if task_state == TaskState::Cancelled { return Err(Error::new_message("Task has been cancelled")); } @@ -816,8 +836,14 @@ pub fn absurd_cancel_task( let task_state = row .get::(0) .map_err(|err| Error::new_message(format!("failed to read task state: {:?}", err)))?; - - if task_state == "completed" || task_state == "failed" || task_state == "cancelled" { + let task_state = task_state + .parse::() + .map_err(|err| Error::new_message(format!("invalid task state: {}", err)))?; + + if task_state == TaskState::Completed + || task_state == TaskState::Failed + || task_state == TaskState::Cancelled + { return Ok(()); } diff --git a/absurd-sqlite-extension/src/types.rs b/absurd-sqlite-extension/src/types.rs new file mode 100644 index 0000000..aad88ce --- /dev/null +++ b/absurd-sqlite-extension/src/types.rs @@ -0,0 +1,179 @@ +//! Public types for the Absurd SQLite extension. +//! +//! This module defines the core types that can be used by consumers of the library, +//! including task and run state enums. + +use std::fmt; +use std::str::FromStr; + +/// Represents the state of a task in the Absurd system. +/// +/// Tasks transition through various states during their lifecycle: +/// - `Pending`: Task is waiting to be executed +/// - `Running`: Task is currently being executed +/// - `Sleeping`: Task is suspended, waiting for an event or timeout +/// - `Completed`: Task has successfully completed +/// - `Failed`: Task execution failed +/// - `Cancelled`: Task was cancelled +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum TaskState { + Pending, + Running, + Sleeping, + Completed, + Failed, + Cancelled, +} + +impl fmt::Display for TaskState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match self { + TaskState::Pending => "pending", + TaskState::Running => "running", + TaskState::Sleeping => "sleeping", + TaskState::Completed => "completed", + TaskState::Failed => "failed", + TaskState::Cancelled => "cancelled", + }; + write!(f, "{}", s) + } +} + +impl FromStr for TaskState { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "pending" => Ok(TaskState::Pending), + "running" => Ok(TaskState::Running), + "sleeping" => Ok(TaskState::Sleeping), + "completed" => Ok(TaskState::Completed), + "failed" => Ok(TaskState::Failed), + "cancelled" => Ok(TaskState::Cancelled), + _ => Err(format!("Invalid task state: {}", s)), + } + } +} + +/// Represents the state of a run (task attempt) in the Absurd system. +/// +/// Runs transition through various states during their execution: +/// - `Pending`: Run is waiting to be claimed and executed +/// - `Running`: Run is currently being executed +/// - `Sleeping`: Run is suspended, waiting for an event or timeout +/// - `Completed`: Run has successfully completed +/// - `Failed`: Run execution failed +/// - `Cancelled`: Run was cancelled +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum RunState { + Pending, + Running, + Sleeping, + Completed, + Failed, + Cancelled, +} + +impl fmt::Display for RunState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match self { + RunState::Pending => "pending", + RunState::Running => "running", + RunState::Sleeping => "sleeping", + RunState::Completed => "completed", + RunState::Failed => "failed", + RunState::Cancelled => "cancelled", + }; + write!(f, "{}", s) + } +} + +impl FromStr for RunState { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "pending" => Ok(RunState::Pending), + "running" => Ok(RunState::Running), + "sleeping" => Ok(RunState::Sleeping), + "completed" => Ok(RunState::Completed), + "failed" => Ok(RunState::Failed), + "cancelled" => Ok(RunState::Cancelled), + _ => Err(format!("Invalid run state: {}", s)), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_task_state_display() { + assert_eq!(TaskState::Pending.to_string(), "pending"); + assert_eq!(TaskState::Running.to_string(), "running"); + assert_eq!(TaskState::Sleeping.to_string(), "sleeping"); + assert_eq!(TaskState::Completed.to_string(), "completed"); + assert_eq!(TaskState::Failed.to_string(), "failed"); + assert_eq!(TaskState::Cancelled.to_string(), "cancelled"); + } + + #[test] + fn test_task_state_from_str() { + assert_eq!("pending".parse::().unwrap(), TaskState::Pending); + assert_eq!("running".parse::().unwrap(), TaskState::Running); + assert_eq!( + "sleeping".parse::().unwrap(), + TaskState::Sleeping + ); + assert_eq!( + "completed".parse::().unwrap(), + TaskState::Completed + ); + assert_eq!("failed".parse::().unwrap(), TaskState::Failed); + assert_eq!( + "cancelled".parse::().unwrap(), + TaskState::Cancelled + ); + assert!("invalid".parse::().is_err()); + } + + #[test] + fn test_run_state_display() { + assert_eq!(RunState::Pending.to_string(), "pending"); + assert_eq!(RunState::Running.to_string(), "running"); + assert_eq!(RunState::Sleeping.to_string(), "sleeping"); + assert_eq!(RunState::Completed.to_string(), "completed"); + assert_eq!(RunState::Failed.to_string(), "failed"); + assert_eq!(RunState::Cancelled.to_string(), "cancelled"); + } + + #[test] + fn test_run_state_from_str() { + assert_eq!("pending".parse::().unwrap(), RunState::Pending); + assert_eq!("running".parse::().unwrap(), RunState::Running); + assert_eq!("sleeping".parse::().unwrap(), RunState::Sleeping); + assert_eq!( + "completed".parse::().unwrap(), + RunState::Completed + ); + assert_eq!("failed".parse::().unwrap(), RunState::Failed); + assert_eq!( + "cancelled".parse::().unwrap(), + RunState::Cancelled + ); + assert!("invalid".parse::().is_err()); + } + + #[test] + fn test_task_state_equality() { + assert_eq!(TaskState::Pending, TaskState::Pending); + assert_ne!(TaskState::Pending, TaskState::Running); + } + + #[test] + fn test_run_state_equality() { + assert_eq!(RunState::Pending, RunState::Pending); + assert_ne!(RunState::Pending, RunState::Running); + } +}