Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
"php tests/conversation-loop-budgets-smoke.php",
"php tests/runtime-package-run-contract-smoke.php",
"php tests/run-control-normalization-smoke.php",
"php tests/canonical-run-lifecycle-smoke.php",
"php tests/channels-smoke.php",
"php tests/chat-run-control-smoke.php",
"php tests/task-execution-smoke.php",
Expand Down
72 changes: 63 additions & 9 deletions src/Runtime/class-wp-agent-conversation-loop.php
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,13 @@ public static function run( array $messages, ?callable $turn_runner = null, arra
$turns_run = $turn;
$turn_context = $context;
$turn_context['turn'] = $turn;
$interrupt = self::check_runtime_cancellation( $run_id, $lock_session_id, $turn_context, $on_event );
if ( null !== $interrupt ) {
$messages[] = $interrupt['message'];
$events[] = self::interrupt_event( $interrupt );
$interrupted = $interrupt['metadata'];
break;
}

self::emit_event( $on_event, 'turn_started', array(
'turn' => $turn,
Expand Down Expand Up @@ -238,6 +245,14 @@ public static function run( array $messages, ?callable $turn_runner = null, arra
throw $error;
}

$interrupt = self::check_runtime_cancellation( $run_id, $lock_session_id, $turn_context, $on_event );
if ( null !== $interrupt ) {
$messages[] = $interrupt['message'];
$events[] = self::interrupt_event( $interrupt );
$interrupted = $interrupt['metadata'];
break;
}

if ( isset( $result['provider_diagnostics'] ) && is_array( $result['provider_diagnostics'] ) ) {
$provider_diagnostics[] = self::normalize_assoc_array( $result['provider_diagnostics'] );
}
Expand Down Expand Up @@ -291,6 +306,14 @@ public static function run( array $messages, ?callable $turn_runner = null, arra
// When mediation is enabled, the turn runner returns tool_calls
// and the loop handles execution. Otherwise, the caller-managed path applies.
if ( null !== $tool_executor && $mediation_enabled && isset( $result['tool_calls'] ) && is_array( $result['tool_calls'] ) ) {
$interrupt = self::check_runtime_cancellation( $run_id, $lock_session_id, $turn_context, $on_event );
if ( null !== $interrupt ) {
$messages[] = $interrupt['message'];
$events[] = self::interrupt_event( $interrupt );
$interrupted = $interrupt['metadata'];
break;
}

$mediation_result = WP_Agent_Tool_Mediation_Runner::run(
$messages,
self::normalize_assoc_array( $result ),
Expand Down Expand Up @@ -321,6 +344,13 @@ public static function run( array $messages, ?callable $turn_runner = null, arra
$approval_required = $mediation_result['approval_required'] ?? null;
$runtime_tool_pending = $mediation_result['runtime_tool_pending'] ?? null;
$stalled = self::check_spin_detector( $spin_detector, $mediation_result['spin_signatures'], $turn_context, $on_event );
$interrupt = self::check_runtime_cancellation( $run_id, $lock_session_id, $turn_context, $on_event );
if ( null !== $interrupt ) {
$messages[] = $interrupt['message'];
$events[] = self::interrupt_event( $interrupt );
$interrupted = $interrupt['metadata'];
break;
}
} else {
// Caller-managed path: turn runner handles everything internally.
$result = self::normalize_conversation_result( $result );
Expand Down Expand Up @@ -379,18 +409,12 @@ public static function run( array $messages, ?callable $turn_runner = null, arra
}

$interrupt = self::check_interrupt_source( $interrupt_source, $messages, $options, $turn_context, $on_event );
if ( null === $interrupt && '' !== $run_id ) {
$interrupt_message = WP_Agent_Chat_Run_Control::cancellation_interrupt_for_run( $run_id, $lock_session_id );
if ( null !== $interrupt_message ) {
$interrupt = self::normalize_interrupt_message( $interrupt_message, $turn_context, $on_event );
}
if ( null === $interrupt ) {
$interrupt = self::check_runtime_cancellation( $run_id, $lock_session_id, $turn_context, $on_event );
}
if ( null !== $interrupt ) {
$messages[] = $interrupt['message'];
$events[] = array(
'type' => 'interrupt_received',
'metadata' => $interrupt['metadata'],
);
$events[] = self::interrupt_event( $interrupt );

if ( 'cancel' === $interrupt['action'] ) {
$interrupted = $interrupt['metadata'];
Expand Down Expand Up @@ -1342,6 +1366,36 @@ private static function check_interrupt_source( ?callable $interrupt_source, arr
return self::normalize_interrupt_message( self::normalize_assoc_array( $message ), $context, $on_event );
}

/**
* Check canonical chat run-control cancellation state.
*
* @param array<string,mixed> $context Current turn context.
* @return array{message: array<string, mixed>, metadata: array<string, mixed>, action: string}|null
*/
private static function check_runtime_cancellation( string $run_id, string $lock_session_id, array $context, ?callable $on_event ): ?array {
if ( '' === $run_id ) {
return null;
}

$interrupt_message = WP_Agent_Chat_Run_Control::cancellation_interrupt_for_run( $run_id, $lock_session_id );
if ( null === $interrupt_message ) {
return null;
}

return self::normalize_interrupt_message( $interrupt_message, $context, $on_event );
}

/**
* @param array{message: array<string, mixed>, metadata: array<string, mixed>, action: string} $interrupt Interrupt payload.
* @return array<string,mixed>
*/
private static function interrupt_event( array $interrupt ): array {
return array(
'type' => 'interrupt_received',
'metadata' => $interrupt['metadata'],
);
}

/**
* Normalize and emit a received interrupt message.
*
Expand Down
5 changes: 3 additions & 2 deletions src/Runtime/class-wp-agent-option-run-control-store.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class WP_Agent_Option_Run_Control_Store implements WP_Agent_Run_Control_Store {

/**
* @param string $store_key Store key.
* @return array{runs:array<string,array<string,mixed>>,queues:array<string,array<int,array<string,mixed>>>}
* @return array{runs:array<string,array<string,mixed>>,queues:array<string,array<int,array<string,mixed>>>,events:array<string,array<int,array<string,mixed>>>}
*/
public function get_state( string $store_key ): array {
$state = function_exists( 'get_option' ) ? get_option( $store_key, array() ) : array();
Expand All @@ -27,12 +27,13 @@ public function get_state( string $store_key ): array {
return array(
'runs' => $this->stored_runs( $state['runs'] ?? array() ),
'queues' => $this->stored_queues( $state['queues'] ?? array() ),
'events' => $this->stored_queues( $state['events'] ?? array() ),
);
}

/**
* @param string $store_key Store key.
* @param array{runs:array<string,array<string,mixed>>,queues:array<string,array<int,array<string,mixed>>>} $state State envelope.
* @param array{runs:array<string,array<string,mixed>>,queues:array<string,array<int,array<string,mixed>>>,events:array<string,array<int,array<string,mixed>>>} $state State envelope.
*/
public function save_state( string $store_key, array $state ): void {
if ( function_exists( 'update_option' ) ) {
Expand Down
52 changes: 50 additions & 2 deletions src/Runtime/class-wp-agent-run-control.php
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ public static function start_run( string $store_key, string $run_id, array $run

$state = self::state( $store_key );
$state['runs'][ $run_id ] = $run;
$state = self::record_event_in_state( $state, $run_id, 'run_started', array( 'status' => self::STATUS_RUNNING ) );
self::save_state( $store_key, $state );

return self::normalize_run( $run );
Expand All @@ -257,6 +258,7 @@ public static function save_run( string $store_key, array $run ): array {

$state = self::state( $store_key );
$state['runs'][ $run_id ] = $normalized;
$state = self::record_event_in_state( $state, $run_id, 'run_updated', array( 'status' => $normalized['status'] ) );
self::save_state( $store_key, $state );

return $normalized;
Expand Down Expand Up @@ -284,6 +286,7 @@ public static function finish_run( string $store_key, string $run_id, string $st
}

$state['runs'][ $run_id ] = $run;
$state = self::record_event_in_state( $state, $run_id, 'run_finished', array( 'status' => $run['status'] ) );
self::save_state( $store_key, $state );

return self::normalize_run( $run );
Expand Down Expand Up @@ -318,6 +321,7 @@ public static function request_cancel( string $store_key, string $run_id ): ?arr
$run['updated_at'] = self::now();

$state['runs'][ $run_id ] = $run;
$state = self::record_event_in_state( $state, $run_id, 'cancel_requested', array( 'status' => $run['status'] ) );
self::save_state( $store_key, $state );

return self::normalize_run( $run );
Expand All @@ -328,6 +332,32 @@ public static function cancel_requested( string $store_key, string $run_id ): bo
return null !== $run && self::STATUS_CANCELLING === ( $run['status'] ?? '' );
}

/**
* @return array<string,mixed>|null
*/
public static function list_events( string $store_key, string $run_id, string $cursor = '', int $limit = 100 ): ?array {
$run = self::get_run( $store_key, $run_id );
if ( null === $run ) {
return null;
}

$state = self::state( $store_key );
$events = array_values( $state['events'][ $run_id ] ?? array() );
$offset = max( 0, self::int_value( $cursor ) );
$limit = max( 1, min( 500, $limit ) );
$page = array_slice( $events, $offset, $limit );
$next = $offset + count( $page );

return array_merge(
$run,
array(
'events' => array_map( array( self::class, 'normalize_event' ), $page ),
'cursor' => $next < count( $events ) ? (string) $next : '',
'has_more' => $next < count( $events ),
)
);
}

public static function store(): WP_Agent_Run_Control_Store {
if ( null === self::$store ) {
self::$store = new WP_Agent_Option_Run_Control_Store();
Expand All @@ -344,13 +374,13 @@ public static function reset_store(): void {
self::$store = null;
}

/** @return array{runs:array<string,array<string,mixed>>,queues:array<string,array<int,array<string,mixed>>>} */
/** @return array{runs:array<string,array<string,mixed>>,queues:array<string,array<int,array<string,mixed>>>,events:array<string,array<int,array<string,mixed>>>} */
public static function state( string $store_key ): array {
return self::store()->get_state( $store_key );
}

/**
* @param array{runs:array<string,array<string,mixed>>,queues:array<string,array<int,array<string,mixed>>>} $state
* @param array{runs:array<string,array<string,mixed>>,queues:array<string,array<int,array<string,mixed>>>,events:array<string,array<int,array<string,mixed>>>} $state
*/
public static function save_state( string $store_key, array $state ): void {
self::store()->save_state( $store_key, $state );
Expand Down Expand Up @@ -382,4 +412,22 @@ public static function string_keyed_array( array $value ): array {
private static function int_value( mixed $value ): int {
return is_int( $value ) || is_float( $value ) || is_string( $value ) || is_bool( $value ) ? (int) $value : 0;
}

/**
* @param array{runs:array<string,array<string,mixed>>,queues:array<string,array<int,array<string,mixed>>>,events:array<string,array<int,array<string,mixed>>>} $state
* @param array<string,mixed> $metadata Event metadata.
* @return array{runs:array<string,array<string,mixed>>,queues:array<string,array<int,array<string,mixed>>>,events:array<string,array<int,array<string,mixed>>>}
*/
private static function record_event_in_state( array $state, string $run_id, string $type, array $metadata = array() ): array {
$events = array_values( $state['events'][ $run_id ] ?? array() );
$events[] = array(
'id' => $run_id . ':' . count( $events ),
'type' => $type,
'created_at' => self::now(),
'metadata' => self::string_keyed_array( $metadata ),
);
$state['events'][ $run_id ] = $events;

return $state;
}
}
4 changes: 2 additions & 2 deletions src/Runtime/interface-wp-agent-run-control-store.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ interface WP_Agent_Run_Control_Store {
* Read the state for a store key.
*
* @param string $store_key Store key.
* @return array{runs:array<string,array<string,mixed>>,queues:array<string,array<int,array<string,mixed>>>}
* @return array{runs:array<string,array<string,mixed>>,queues:array<string,array<int,array<string,mixed>>>,events:array<string,array<int,array<string,mixed>>>}
*/
public function get_state( string $store_key ): array;

/**
* Save the state for a store key.
*
* @param string $store_key Store key.
* @param array{runs:array<string,array<string,mixed>>,queues:array<string,array<int,array<string,mixed>>>} $state State envelope.
* @param array{runs:array<string,array<string,mixed>>,queues:array<string,array<int,array<string,mixed>>>,events:array<string,array<int,array<string,mixed>>>} $state State envelope.
*/
public function save_state( string $store_key, array $state ): void;
}
Loading