Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 105 additions & 8 deletions crates/lib/backend-postgres/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use waymark_core_backend::{
ActionDone, GraphUpdate, InstanceDone, InstanceLockStatus, LockClaim, QueuedInstance,
};
use waymark_observability::obs;
use waymark_runner_state::RunnerState;
use waymark_timed_future::TimedFutureExt as _;

const INSTANCE_STATUS_QUEUED: &str = "queued";
Expand Down Expand Up @@ -829,6 +828,8 @@ impl PostgresBackend {
let graph: GraphUpdate = crate::codec::deserialize(&state_payload)
.map_err(PollQueuedInstancesError::GraphUpdateDecode)?;

let merged_state = graph.merge_onto_base_state(instance.state.as_ref());

let action_node_ids: Vec<ExecutionId> = graph
.nodes
.iter()
Expand All @@ -840,12 +841,7 @@ impl PostgresBackend {
action_node_ids_by_instance.insert(instance_id, action_node_ids);
}

instance.state = Some(RunnerState::new(
None,
Some(graph.nodes),
Some(graph.edges),
false,
));
instance.state = Some(merged_state);

Ok(instance)
})
Expand Down Expand Up @@ -1174,7 +1170,10 @@ mod tests {
use super::*;

use waymark_dag::EdgeType;
use waymark_runner_state::{ActionCallSpec, ExecutionNode, NodeStatus};
use waymark_runner_state::value_visitor::ValueExpr;
use waymark_runner_state::{
ActionCallSpec, ExecutionNode, LiteralValue, NodeStatus, RunnerState,
};

fn sample_runner_state() -> RunnerState {
RunnerState::new(None, None, None, false)
Expand Down Expand Up @@ -1214,6 +1213,29 @@ mod tests {
}
}

fn sample_assignment_node(node_id: ExecutionId, label: &str, target: &str) -> ExecutionNode {
ExecutionNode {
node_id,
node_type: "assignment".to_string(),
label: label.to_string(),
status: NodeStatus::Completed,
template_id: Some("assign".to_string()),
targets: vec![target.to_string()],
action: None,
value_expr: None,
assignments: HashMap::from([(
target.to_string(),
ValueExpr::Literal(LiteralValue {
value: serde_json::json!(4),
}),
)]),
action_attempt: 0,
started_at: None,
completed_at: Some(Utc::now()),
scheduled_at: None,
}
}

fn sample_lock_claim() -> LockClaim {
LockClaim {
lock_uuid: LockId::new_uuid_v4(),
Expand Down Expand Up @@ -1490,6 +1512,81 @@ mod tests {
);
}

#[serial(postgres)]
#[tokio::test]
async fn core_get_queued_instances_merges_sparse_runner_state_with_queued_payload_state() {
let backend = setup_backend().await;
let instance_id = InstanceId::new_uuid_v4();
let entry_node = ExecutionId::new_uuid_v4();
let queued = QueuedInstance {
workflow_version_id: Uuid::new_v4(),
schedule_id: None,
dag: None,
entry_node,
state: Some(RunnerState::new(
None,
Some(HashMap::from([(
entry_node,
sample_assignment_node(entry_node, "input x = 4", "x"),
)])),
Some(HashSet::new()),
false,
)),
action_results: HashMap::new(),
instance_id,
scheduled_at: Some(Utc::now() - Duration::seconds(1)),
};
CoreBackend::queue_instances(&backend, &[queued])
.await
.expect("queue instances");

let initial_claim = sample_lock_claim();
let _initial_batch = CoreBackend::poll_queued_instances(
&backend,
NonZeroUsize::new(1).unwrap(),
initial_claim.clone(),
)
.await
.expect("initial claim");

let action_node_id = ExecutionId::new_uuid_v4();
let mut action_node = sample_execution_node(action_node_id);
action_node.scheduled_at = None;
let graph = GraphUpdate {
instance_id,
nodes: HashMap::from([(action_node_id, action_node)]),
edges: HashSet::from([waymark_runner_state::ExecutionEdge {
source: entry_node,
target: action_node_id,
edge_type: EdgeType::StateMachine,
}]),
};
CoreBackend::save_graphs(&backend, initial_claim.clone(), &[graph])
.await
.expect("persist sparse graph");
CoreBackend::release_instance_locks(&backend, initial_claim.lock_uuid, &[instance_id])
.await
.expect("release initial lock");

let second_claim = sample_lock_claim();
let batch = CoreBackend::poll_queued_instances(
&backend,
NonZeroUsize::new(1).unwrap(),
second_claim,
)
.await
.expect("rehydrate merged instance");

let state = batch[0].state.as_ref().expect("merged runner state");
assert!(state.nodes.contains_key(&entry_node));
assert!(state.nodes.contains_key(&action_node_id));
assert!(state.edges.contains(&waymark_runner_state::ExecutionEdge {
source: entry_node,
target: action_node_id,
edge_type: EdgeType::StateMachine,
}));
}

#[serial(postgres)]
#[tokio::test]
async fn core_save_graphs_happy_path() {
Expand Down
Loading
Loading