Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/bin/soak-harness/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ pub struct SoakArgs {
#[arg(long, default_value_t = 4096)]
pub payload_bytes: i64,

#[arg(long, default_value_t = false, action = clap::ArgAction::SetTrue)]
pub include_payload_in_result: bool,

#[arg(long, default_value_t = 1000)]
pub issue_min_ready_queue: i64,

Expand Down
16 changes: 15 additions & 1 deletion crates/bin/soak-harness/src/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ struct WorkItem {
pub step_delays_ms: Vec<i64>,
pub step_should_fail: Vec<bool>,
pub step_payload_bytes: Vec<i64>,
pub step_include_payload: Vec<bool>,
}

fn sample_work_item(args: &crate::cli::SoakArgs, rng: &mut StdRng) -> WorkItem {
Expand All @@ -292,18 +293,21 @@ fn sample_work_item(args: &crate::cli::SoakArgs, rng: &mut StdRng) -> WorkItem {
let mut step_delays_ms = Vec::with_capacity(len);
let mut step_should_fail = Vec::with_capacity(len);
let mut step_payload_bytes = Vec::with_capacity(len);
let mut step_include_payload = Vec::with_capacity(len);

for _ in 0..len {
let (delay_ms, should_fail) = sample_step_behavior(args, rng);
step_delays_ms.push(delay_ms);
step_should_fail.push(should_fail);
step_payload_bytes.push(jitter_payload(args.payload_bytes, rng));
step_include_payload.push(args.include_payload_in_result);
}

WorkItem {
step_delays_ms,
step_should_fail,
step_payload_bytes,
step_include_payload,
}
}

Expand Down Expand Up @@ -350,15 +354,17 @@ fn build_instance(
let mut state = RunnerState::new(Some(Arc::clone(&workflow.dag)), None, None, false);
if item.step_delays_ms.len() != item.step_should_fail.len()
|| item.step_delays_ms.len() != item.step_payload_bytes.len()
|| item.step_delays_ms.len() != item.step_include_payload.len()
{
bail!("step input vectors are not aligned");
}

for (step, ((delay_ms, should_fail), payload_bytes)) in item
for (step, (((delay_ms, should_fail), payload_bytes), include_payload)) in item
.step_delays_ms
.iter()
.zip(item.step_should_fail.iter())
.zip(item.step_payload_bytes.iter())
.zip(item.step_include_payload.iter())
.enumerate()
{
let idx = step + 1;
Expand Down Expand Up @@ -386,6 +392,14 @@ fn build_instance(
Some(format!("input payload_bytes_{idx} = {payload_bytes}")),
)
.map_err(|err| anyhow!(err.0))?;
state
.record_assignment(
vec![format!("include_payload_{idx}")],
&literal_bool(*include_payload),
None,
Some(format!("input include_payload_{idx} = {include_payload}")),
)
.map_err(|err| anyhow!(err.0))?;
}

let entry_node = state
Expand Down
5 changes: 3 additions & 2 deletions crates/bin/soak-harness/src/setup_workflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn workflow_source(
timeout_seconds: u32,
actions_per_workflow: NonZeroUsize,
) -> String {
let mut input_names = Vec::with_capacity(actions_per_workflow.get() * 3);
let mut input_names = Vec::with_capacity(actions_per_workflow.get() * 4);
let mut lines = Vec::with_capacity(actions_per_workflow.get() + 3);
lines.push("fn main(input: [".to_string());

Expand All @@ -70,6 +70,7 @@ fn workflow_source(
input_names.push(format!("delay_ms_{idx}"));
input_names.push(format!("should_fail_{idx}"));
input_names.push(format!("payload_bytes_{idx}"));
input_names.push(format!("include_payload_{idx}"));
}

lines[0].push_str(&input_names.join(", "));
Expand All @@ -78,7 +79,7 @@ fn workflow_source(
for step in 0..actions_per_workflow.get() {
let idx = step + 1;
lines.push(format!(
" step_{idx} = @{user_module}.simulated_action(delay_ms=delay_ms_{idx}, should_fail=should_fail_{idx}, payload_bytes=payload_bytes_{idx})[ActionTimeout -> retry: 1, backoff: 1 s][timeout: {timeout_seconds} s]"
" step_{idx} = @{user_module}.simulated_action(delay_ms=delay_ms_{idx}, should_fail=should_fail_{idx}, payload_bytes=payload_bytes_{idx}, include_payload=include_payload_{idx})[ActionTimeout -> retry: 1, backoff: 1 s][timeout: {timeout_seconds} s]"
));
}

Expand Down
9 changes: 7 additions & 2 deletions python/tests/fixtures_actions/soak_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ async def simulated_action(
delay_ms: int,
should_fail: bool,
payload_bytes: int,
include_payload: bool,
) -> dict[str, int | str | bool]:
"""Sleep for `delay_ms`, optionally fail, and return a small checksum payload."""
"""Sleep for `delay_ms`, optionally fail, and optionally include sized payload in result."""
bounded_delay_ms = max(0, delay_ms)
bounded_payload_bytes = max(0, min(payload_bytes, _MAX_PAYLOAD_BYTES))

Expand All @@ -30,9 +31,13 @@ async def simulated_action(
payload = "x" * bounded_payload_bytes
checksum = hashlib.sha1(payload.encode("utf-8")).hexdigest()[:16]

return {
result: dict[str, int | str | bool] = {
"delay_ms": bounded_delay_ms,
"payload_bytes": bounded_payload_bytes,
"checksum": checksum,
"failed": False,
}
if include_payload:
result["payload"] = payload

return result
Loading