Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cpanfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Do not edit this file directly. To change prereqs, edit the `dist.ini` file.

requires "App::Yath::Script" => "2.000011";
requires "Atomic::Pipe" => "0.021";
requires "Atomic::Pipe" => "0.026";
requires "B" => "0";
requires "Capture::Tiny" => "0.48";
requires "Carp" => "0";
Expand Down Expand Up @@ -40,7 +40,7 @@ requires "IO::Select" => "0";
requires "IO::Uncompress::Bunzip2" => "0";
requires "IO::Uncompress::Gunzip" => "0";
requires "IPC::Cmd" => "0";
requires "IPC::Manager" => "0.000032";
requires "IPC::Manager" => "0.000034";
requires "Importer" => "0.025";
requires "JSON::PP" => "0";
requires "LWP" => "0";
Expand Down
4 changes: 2 additions & 2 deletions dist.ini
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ goto::file = 0.005
mro = 0
parent = 0.241
App::Yath::Script = 2.000011
Atomic::Pipe = 0.021
Atomic::Pipe = 0.026
B = 0
Capture::Tiny = 0.48
Carp = 0
Expand Down Expand Up @@ -114,7 +114,7 @@ IO::Select = 0
IO::Uncompress::Bunzip2 = 0
IO::Uncompress::Gunzip = 0
IPC::Cmd = 0
IPC::Manager = 0.000032
IPC::Manager = 0.000034
Importer = 0.025
JSON::PP = 0
LWP = 0
Expand Down
86 changes: 55 additions & 31 deletions lib/App/Yath2/Command/extract.pm
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ sub run {
my @rest;
for my $arg (@$args) {
if ($arg eq '--no-decompress') { $no_decompress = 1 }
else { push @rest => $arg }
else { push @rest => $arg }
}
@$args = @rest;

Expand All @@ -90,9 +90,10 @@ sub run {

my $count = 0;
for my $rel ($la->list_files) {
my $is_zst = $rel =~ /\.zst\z/;
my $is_dict = $rel eq 'zstd-dict.bin';
my $out_rel = ($no_decompress || !$is_zst || $is_dict)
my $is_zst = $rel =~ /\.zst\z/;
my $is_dict = $rel eq 'zstd-dict.bin';
my $out_rel =
($no_decompress || !$is_zst || $is_dict)
? $rel
: do { (my $r = $rel) =~ s/\.zst\z//; $r };

Expand All @@ -115,7 +116,7 @@ sub run {
# would still point at non-existent '.zst' paths. Rewrite
# the keys to match the extracted shape so consumers
# (yath replay, ad-hoc tooling) find the files.
if ($out_rel eq 'artifacts.json'
if ( $out_rel eq 'artifacts.json'
|| $out_rel =~ m{\Aruns/[^/]+/artifacts\.json\z})
{
$bytes = _rewrite_manifest_keys($bytes);
Expand All @@ -125,7 +126,7 @@ sub run {
open(my $out, '>', $abs) or die "open $abs: $!\n";
binmode $out;
print $out $bytes;
close $out or die "close $abs: $!\n";
close $out or die "close $abs: $!\n";
$count++;
}

Expand Down Expand Up @@ -157,37 +158,60 @@ sub _rewrite_manifest_keys {

# Decompress a possibly-multi-frame zstd byte string. The single
# .json.zst snapshots produce one frame; the .jsonl.zst event
# streams concatenate one-shot frame per line, so the extracted
# plaintext is the concatenation of every frame's decoded payload.
# streams concatenate one self-contained zstd frame per record.
#
# Without a dict we feed everything into a streaming Decompressor.
# With a dict the streaming binding does not accept dicts, so we
# walk frames using zstd_frame_size and decompress each with a
# reused DecompressionContext.
# Producers (the JSONL.zst writer, EventEmitter) do NOT embed an
# inter-record newline inside the compressed plaintext -- frames
# self-delimit -- so the extract command is responsible for
# reinserting exactly one newline between records when materializing
# extracted plaintext jsonl. We strip any trailing newlines from
# each frame's payload first so producers that did include one (the
# legacy plain JSONL writer through the same path, or older archives)
# still extract to a single-newline-per-record shape.
#
# Walks frames via zstd_frame_size for both the dict and no-dict
# paths; one frame is one record either way.
sub _decompress_multi_frame {
my ($bytes, $dict_bytes) = @_;

if ($dict_bytes) {
my $ddict = Compress::Zstd::DecompressionDictionary->new($dict_bytes);
my $dctx = Compress::Zstd::DecompressionContext->new;
my $out = '';
while (length $bytes) {
my $size = zstd_frame_size($bytes);
die "tar.zidx: incomplete zstd frame in extract payload\n"
unless defined $size;
my $frame = substr($bytes, 0, $size);
substr($bytes, 0, $size) = '';
my $plain = $dctx->decompress_using_dict($frame, $ddict);
die "tar.zidx: decompress_using_dict failed\n" unless defined $plain;
$out .= $plain;
}
return $out;
my $ddict;
$ddict = Compress::Zstd::DecompressionDictionary->new($dict_bytes)
if defined $dict_bytes;
my $dctx = Compress::Zstd::DecompressionContext->new if $ddict;

my @records;
while (length $bytes) {
my $size = zstd_frame_size($bytes);
die "tar.zidx: incomplete zstd frame in extract payload\n"
unless defined $size;
my $frame = substr($bytes, 0, $size);
substr($bytes, 0, $size) = '';

my $plain =
$ddict
? $dctx->decompress_using_dict($frame, $ddict)
: Compress::Zstd::decompress($frame);
die "tar.zidx: zstd decompress failed in extract payload\n"
unless defined $plain;

push @records => $plain;
}

my $d = Compress::Zstd::Decompressor->new;
$d->init;
my $out = $d->decompress($bytes);
die "tar.zidx: streaming decompress failed\n" unless defined $out;
return '' unless @records;

# Single-frame snapshots (.json.zst): return the payload as-is;
# there is no inter-record story to enforce, and a snapshot's
# body may legitimately end without a newline.
return $records[0] if @records == 1;

# Multi-frame streams (.jsonl.zst): one record per line, exactly
# one newline between records, trailing newline included so the
# file is canonical jsonl.
my $out = '';
for my $r (@records) {
$r =~ s/\n+\z//;
$out .= $r . "\n";
}
return $out;
}

Expand Down
21 changes: 20 additions & 1 deletion lib/App/Yath2/Command/test.pm
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use App::Yath2::OutputManager();
use App::Yath2::Options::Renderer();
use App::Yath2::Util::IPC qw/publish_ipc_file unlink_ipc_file/;
use Scope::Guard ();
use Test2::Util qw/IS_WIN32/;

use Getopt::Yath;
include_options(
Expand Down Expand Up @@ -113,10 +114,20 @@ sub run {

my $workdir = $settings->workspace->workdir;

my @resources = (Test2::Harness2::Resource::JobCount->new(slots => 16));

if (!IS_WIN32 && $settings->can('runner') && @{$settings->runner->preloads // []}) {
require Test2::Harness2::Resource::Preload;
push @resources => Test2::Harness2::Resource::Preload->new(
preloads => $settings->runner->preloads,
preload_early => $settings->runner->preload_early // {},
);
}

my $spawn = Test2::Harness2->spawn(
workdir => $workdir,
protocol => $settings->ipc->protocol,
resources => [Test2::Harness2::Resource::JobCount->new(slots => 16)],
resources => \@resources,
loggers => [
'Test2::Harness2::Collector::Logger::JSONL',
'Test2::Harness2::Collector::Logger::JSON',
Expand Down Expand Up @@ -204,6 +215,14 @@ sub run {
# around to accept these requests.
eval { $spawn->unsubscribe; 1 } or warn $@;

# Wait for the harness to drain any events still queued for us
# before we tell it to finish. The has_pending_messages request
# itself does not count as pending work (its response is queued
# AFTER the handler returns). Cap at 30 seconds; on timeout we
# proceed anyway because the run is over and any straggler
# events at that point are not worth blocking exit on.
eval { $spawn->wait_until_idle(30); 1 } or warn $@;

# Drop the streamer explicitly: it holds a reference to $spawn,
# which is what keeps the IPC handle (and its AtomicPipe client)
# alive. Without this, the client's pre_disconnect_hook fires
Expand Down
5 changes: 3 additions & 2 deletions lib/App/Yath2/Options/IPC.pm
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ option_group {group => 'ipc', category => 'IPC Options'} => sub {
option protocol => (
name => 'ipc-protocol',
type => 'Scalar',
description => 'IPC::Manager client driver (default MessageFiles). ' . 'Use "+Some::Class" to force a fully qualified namespace.',
description => 'IPC::Manager client driver (default ConnectionUnix). ' . 'Use "+Some::Class" to force a fully qualified namespace.',
long_examples => [
' ConnectionUnix',
' AtomicPipe',
' UnixSocket',
' JSONFile',
Expand All @@ -45,7 +46,7 @@ option_group {group => 'ipc', category => 'IPC Options'} => sub {
' SQLite',
' +Custom::Driver',
],
default => sub { 'IPC::Manager::Client::MessageFiles' },
default => sub { 'IPC::Manager::Client::ConnectionUnix' },
normalize => \&_normalize_protocol,
);

Expand Down
5 changes: 4 additions & 1 deletion lib/App/Yath2/Renderer/Default.pm
Original file line number Diff line number Diff line change
Expand Up @@ -680,8 +680,11 @@ sub render_parent {
my $meth = $params{quiet} ? 'build_quiet' : 'build_event';

my @out;
my $ph = $f->{harness} || {};
for my $sf (@{$f->{parent}->{children}}) {
$sf->{harness} ||= $f->{harness};
my $ch = $sf->{harness} //= {};
$ch->{job_id} //= $ph->{job_id} if defined $ph->{job_id};
$ch->{run_id} //= $ph->{run_id} if defined $ph->{run_id};
my $stree = $self->render_tree($sf);
push @out => @{$self->$meth($sf, $stree)};
}
Expand Down
45 changes: 27 additions & 18 deletions lib/App/Yath2/Renderer/Summary.pm
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use strict;
use warnings;

use Test2::Util::Table qw/table/;
use Test2::Harness2::Util qw/clean_path/;
use Test2::Harness2::Util qw/clean_path render_duration/;
use Test2::Harness2::Util::JSON qw/json_true json_false/;

use List::Util qw/max/;
Expand Down Expand Up @@ -91,14 +91,15 @@ sub render_summary {
my $self = shift;
my ($run_end) = @_;

my $pass = $run_end->{pass};
my $pass_count = $run_end->{pass_count} // 0;
my $fail_count = $run_end->{fail_count} // 0;
my $total = $pass_count + $fail_count;
my $wall_time = $run_end->{wall_time};
my $cpu_times = $run_end->{cpu_times};
my $cpu_total = $run_end->{cpu_total};
my $cpu_usage = $run_end->{cpu_usage};
my $pass = $run_end->{pass};
my $pass_count = $run_end->{pass_count} // 0;
my $fail_count = $run_end->{fail_count} // 0;
my $total = $pass_count + $fail_count;
my $wall_time = $run_end->{wall_time};
my $cum_job_time = $run_end->{cumulative_job_time};
my $cpu_times = $run_end->{cpu_times};
my $cpu_total = $run_end->{cpu_total};
my $cpu_usage = $run_end->{cpu_usage};

if (my $rows = $self->{+_FAILED_JOBS}) {
if (@$rows) {
Expand All @@ -114,16 +115,23 @@ sub render_summary {
my @summary = (
$fail_count ? (" Fail Count: $fail_count") : (),
" File Count: $total",
(defined $wall_time)
(defined $wall_time
? sprintf(" Run Wall Time: %s", render_duration($wall_time))
: ()),
(defined $cum_job_time || $cpu_times)
? (
sprintf(" Wall Time: %.2f seconds", $wall_time),
"Aggregate Job Stats:",
(defined $cum_job_time
? sprintf(" Cumulative Job Time: %s", render_duration($cum_job_time))
: ()),
($cpu_times
? (
sprintf(
" CPU Time: %.2f seconds (usr: %.2fs | sys: %.2fs | cusr: %.2fs | csys: %.2fs)",
$cpu_total, @{$cpu_times}[0 .. 3]
" CPU Time: %s (usr: %s | sys: %s | cusr: %s | csys: %s)",
render_duration($cpu_total),
map { render_duration($_) } @{$cpu_times}[0 .. 3]
),
sprintf(" CPU Usage: %i%%", $cpu_usage),
sprintf(" CPU Usage: %i%%", $cpu_usage),
)
: ()
),
Expand Down Expand Up @@ -165,10 +173,11 @@ sub write_summary_file {

failed => $self->{+_FAILED_JOBS},

(defined $run_end->{wall_time} ? (wall_time => $run_end->{wall_time}) : ()),
(defined $run_end->{cpu_total} ? (cpu_total => $run_end->{cpu_total}) : ()),
(defined $run_end->{cpu_usage} ? (cpu_usage => $run_end->{cpu_usage}) : ()),
(defined $run_end->{cpu_times} ? (cpu_times => $run_end->{cpu_times}) : ()),
(defined $run_end->{wall_time} ? (wall_time => $run_end->{wall_time}) : ()),
(defined $run_end->{cumulative_job_time} ? (cumulative_job_time => $run_end->{cumulative_job_time}) : ()),
(defined $run_end->{cpu_total} ? (cpu_total => $run_end->{cpu_total}) : ()),
(defined $run_end->{cpu_usage} ? (cpu_usage => $run_end->{cpu_usage}) : ()),
(defined $run_end->{cpu_times} ? (cpu_times => $run_end->{cpu_times}) : ()),
);

require Test2::Harness2::Util::File::JSON;
Expand Down
31 changes: 19 additions & 12 deletions lib/App/Yath2/Streamer/Base.pm
Original file line number Diff line number Diff line change
Expand Up @@ -328,34 +328,41 @@ sub _harness_run_end_facet {
my $results = ref($state->{results}) eq 'HASH' ? $state->{results} : {};
my $all_pass = 1;
my ($fail_count, $pass_count) = (0, 0);
my @cpu_agg = (0, 0, 0, 0);
my $have_times = 0;
my @cpu_agg = (0, 0, 0, 0);
my $have_times = 0;
my $cum_job_wall = 0;
my $have_wall = 0;

for my $jid (keys %$results) {
next unless defined $results->{$jid}{completed_at};
if ($results->{$jid}{pass}) { $pass_count++ }
else { $fail_count++; $all_pass = 0 }
if (my $t = $results->{$jid}{times}) {
if (my $t = $results->{$jid}{child_times}) {
$cpu_agg[$_] += $t->[$_] for 0 .. 3;
$have_times = 1;
}
if (defined(my $w = $results->{$jid}{child_wall})) {
$cum_job_wall += $w;
$have_wall = 1;
}
}

my $stamp = _max_completed_at($results) // time;
my $wall_time = defined $state->{created_at} ? ($stamp - $state->{created_at}) : undef;

my %timing;
$timing{wall_time} = $wall_time if defined $wall_time;
$timing{cumulative_job_time} = $cum_job_wall if $have_wall;

if ($have_times) {
my $cpu_total = $cpu_agg[0] + $cpu_agg[1] + $cpu_agg[2] + $cpu_agg[3];
%timing = (
wall_time => $wall_time,
cpu_times => \@cpu_agg,
cpu_total => $cpu_total,
cpu_usage => ($wall_time && $wall_time > 0) ? int($cpu_total / $wall_time * 100) : 0,
);
}
elsif (defined $wall_time) {
%timing = (wall_time => $wall_time);
$timing{cpu_times} = \@cpu_agg;
$timing{cpu_total} = $cpu_total;
# CPU Usage relative to Run Wall: aggregate CPU-cores worth of
# work performed by all test jobs during the run. With high
# parallelism this can exceed 100% (one core = 100%).
$timing{cpu_usage}
= ($wall_time && $wall_time > 0) ? int($cpu_total / $wall_time * 100) : 0;
}

return {
Expand Down
11 changes: 11 additions & 0 deletions lib/App/Yath2/Tester.pm
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,17 @@ sub yath {
$ENV{$_} = $env->{$_} for keys %$env;
$ENV{YATH_COLOR} = 0;
my $pid = start_process \@cmd => sub {
# When this test is itself running under an outer yath, that
# outer worker sets TMPDIR to a per-worker subdirectory like
# /tmp/yath-XXXXXXXX/tmp. The spawned inner yath places its
# IPC::Manager unix-socket route under TMPDIR. The sun_path
# budget on Linux is only 104 bytes, leaving no room for the
# 42-byte hashed peer-id under such a deep route, which makes
# the inner harness fail with "Cannot map peer id ... exceeds
# available budget". Reset TMPDIR to /tmp here in the spawned
# child so the inner yath gets a short route. See
# IPC::Manager::Client::ConnectionUnix::max_on_disk_name_length.
$ENV{TMPDIR} = '/tmp';
return unless $capture;
swap_io(\*STDOUT, $wh);
swap_io(\*STDERR, $wh);
Expand Down
Loading