Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion absurd-sqlite-extension/src/checkpoint.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::sql;
use crate::types::TaskState;
use crate::validate;
use serde_json::Value as JsonValue;
use sqlite3ext_sys::sqlite3;
Expand Down Expand Up @@ -93,8 +94,11 @@ pub fn absurd_set_task_checkpoint_state(
let task_state = row
.get::<String>(1)
.map_err(|err| Error::new_message(format!("failed to read task state: {:?}", err)))?;
let task_state = task_state
.parse::<TaskState>()
.map_err(|err| Error::new_message(format!("invalid task state: {}", err)))?;

if task_state == "cancelled" {
if task_state == TaskState::Cancelled {
return Err(Error::new_message("Task has been cancelled"));
}

Expand Down
28 changes: 21 additions & 7 deletions absurd-sqlite-extension/src/claim.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::retry;
use crate::sql;
use crate::types::{RunState, TaskState};
use crate::validate;
use serde_json::Value as JsonValue;
use sqlite3ext_sys::sqlite3;
Expand Down Expand Up @@ -241,7 +242,7 @@ fn expire_claims(db: *mut sqlite3, queue_name: &str, now: i64) -> Result<()> {
Some(max_attempts)
};
let allow_retry = max_attempts_opt.is_none_or(|max| next_attempt <= max);
let mut task_state = "failed";
let mut task_state = TaskState::Failed;
let mut last_attempt_run = run_id.clone();
let mut cancelled_at = "";
let mut recorded_attempt = attempt;
Expand All @@ -261,17 +262,18 @@ fn expire_claims(db: *mut sqlite3, queue_name: &str, now: i64) -> Result<()> {
};

if cancel_task {
task_state = "cancelled";
task_state = TaskState::Cancelled;
cancelled_at = &now_value;
} else {
let new_run_id = Uuid::now_v7().to_string();
let next_available_value = next_available.to_string();
let next_attempt_value = next_attempt.to_string();
let run_state = if next_available > now {
"sleeping"
RunState::Sleeping
} else {
"pending"
RunState::Pending
};
let run_state_str = run_state.to_string();
sql::exec_with_bind_text(
db,
"insert into absurd_runs (
Expand Down Expand Up @@ -303,17 +305,29 @@ fn expire_claims(db: *mut sqlite3, queue_name: &str, now: i64) -> Result<()> {
&new_run_id,
&task_id,
&next_attempt_value,
run_state,
&run_state_str,
&next_available_value,
],
)?;
task_state = run_state;
task_state = match run_state {
RunState::Sleeping => TaskState::Sleeping,
RunState::Pending => TaskState::Pending,
// These states are impossible here since run_state is derived from
// the conditional above which only produces Sleeping or Pending
RunState::Running
| RunState::Completed
| RunState::Failed
| RunState::Cancelled => {
unreachable!("run_state can only be Sleeping or Pending in this context")
}
};
last_attempt_run = new_run_id;
recorded_attempt = next_attempt;
}
}

let attempt_value = recorded_attempt.to_string();
let task_state_str = task_state.to_string();
sql::exec_with_bind_text(
db,
"update absurd_tasks
Expand All @@ -327,7 +341,7 @@ fn expire_claims(db: *mut sqlite3, queue_name: &str, now: i64) -> Result<()> {
where queue_name = ?5
and task_id = ?6",
&[
task_state,
&task_state_str,
&attempt_value,
&last_attempt_run,
cancelled_at,
Expand Down
12 changes: 10 additions & 2 deletions absurd-sqlite-extension/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::sql;
use crate::types::{RunState, TaskState};
use crate::validate;
use serde_json::Value as JsonValue;
use sqlite3ext_sys::sqlite3;
Expand Down Expand Up @@ -146,8 +147,11 @@ fn await_event_impl(
let task_state = run_row
.get::<String>(3)
.map_err(|err| Error::new_message(format!("failed to read task state: {:?}", err)))?;
let task_state = task_state
.parse::<TaskState>()
.map_err(|err| Error::new_message(format!("invalid task state: {}", err)))?;

if task_state == "cancelled" {
if task_state == TaskState::Cancelled {
return Err(Error::new_message("Task has been cancelled"));
}

Expand Down Expand Up @@ -191,7 +195,11 @@ fn await_event_impl(
}
}

if run_state != "running" {
let run_state = run_state
.parse::<RunState>()
.map_err(|err| Error::new_message(format!("invalid run state: {}", err)))?;

if run_state != RunState::Running {
return Err(Error::new_message(
"Run must be running to await absurd_events",
));
Expand Down
4 changes: 4 additions & 0 deletions absurd-sqlite-extension/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@ mod run;
mod settings;
mod spawn;
mod sql;
pub mod types;
mod validate;

// Re-export public types for convenience
pub use types::{RunState, TaskState};

/// SQL: absurd_version()
/// Usage: return extension version and git commit.
/// Section: Meta
Expand Down
48 changes: 37 additions & 11 deletions absurd-sqlite-extension/src/run.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::retry;
use crate::sql;
use crate::types::{RunState, TaskState};
use crate::validate;
use chrono::DateTime;
use serde_json::Value as JsonValue;
Expand Down Expand Up @@ -179,7 +180,7 @@ fn fail_run_impl(
Some(max_attempts)
};
let allow_retry = max_attempts_opt.is_none_or(|max| next_attempt <= max);
let mut task_state = "failed";
let mut task_state = TaskState::Failed;
let mut last_attempt_run = run_id.to_string();
let mut cancelled_at = "";
let mut recorded_attempt = attempt;
Expand All @@ -202,17 +203,18 @@ fn fail_run_impl(
};

if cancel_task {
task_state = "cancelled";
task_state = TaskState::Cancelled;
cancelled_at = &now_value;
} else {
let new_run_id = Uuid::now_v7().to_string();
let next_available_value = next_available.to_string();
let next_attempt_value = next_attempt.to_string();
let run_state = if next_available > now {
"sleeping"
RunState::Sleeping
} else {
"pending"
RunState::Pending
};
let run_state_str = run_state.to_string();
sql::exec_with_bind_text(
db,
"insert into absurd_runs (
Expand Down Expand Up @@ -244,17 +246,29 @@ fn fail_run_impl(
&new_run_id,
&task_id,
&next_attempt_value,
run_state,
&run_state_str,
&next_available_value,
],
)?;
task_state = run_state;
task_state = match run_state {
RunState::Sleeping => TaskState::Sleeping,
RunState::Pending => TaskState::Pending,
// These states are impossible here since run_state is derived from
// the conditional above which only produces Sleeping or Pending
RunState::Running
| RunState::Completed
| RunState::Failed
| RunState::Cancelled => {
unreachable!("run_state can only be Sleeping or Pending in this context")
}
};
last_attempt_run = new_run_id;
recorded_attempt = next_attempt;
}
}

let attempt_value = recorded_attempt.to_string();
let task_state_str = task_state.to_string();
sql::exec_with_bind_text(
db,
"update absurd_tasks
Expand All @@ -268,7 +282,7 @@ fn fail_run_impl(
where queue_name = ?5
and task_id = ?6",
&[
task_state,
&task_state_str,
&attempt_value,
&last_attempt_run,
cancelled_at,
Expand Down Expand Up @@ -360,8 +374,11 @@ pub fn absurd_complete_run(
let run_state = row
.get::<String>(1)
.map_err(|err| Error::new_message(format!("failed to read state: {:?}", err)))?;
let run_state = run_state
.parse::<RunState>()
.map_err(|err| Error::new_message(format!("invalid run state: {}", err)))?;

if run_state != "running" {
if run_state != RunState::Running {
return Err(Error::new_message("run is not currently running"));
}

Expand Down Expand Up @@ -486,8 +503,11 @@ pub fn absurd_extend_claim(
let task_state = row
.get::<String>(0)
.map_err(|err| Error::new_message(format!("failed to read state: {:?}", err)))?;
let task_state = task_state
.parse::<TaskState>()
.map_err(|err| Error::new_message(format!("invalid task state: {}", err)))?;

if task_state == "cancelled" {
if task_state == TaskState::Cancelled {
return Err(Error::new_message("Task has been cancelled"));
}

Expand Down Expand Up @@ -816,8 +836,14 @@ pub fn absurd_cancel_task(
let task_state = row
.get::<String>(0)
.map_err(|err| Error::new_message(format!("failed to read task state: {:?}", err)))?;

if task_state == "completed" || task_state == "failed" || task_state == "cancelled" {
let task_state = task_state
.parse::<TaskState>()
.map_err(|err| Error::new_message(format!("invalid task state: {}", err)))?;

if task_state == TaskState::Completed
|| task_state == TaskState::Failed
|| task_state == TaskState::Cancelled
{
return Ok(());
}

Expand Down
Loading