diff --git a/data-machine-code.php b/data-machine-code.php index 6fda7ec..29d6963 100644 --- a/data-machine-code.php +++ b/data-machine-code.php @@ -93,6 +93,14 @@ function datamachine_code_bootstrap() { new \DataMachineCode\Handlers\GitHub\GitHubPullRequestPublish(); new \DataMachineCode\Handlers\GitHub\GitHubUpsert(); + // Register the generic CLI transport runtime for agents/dispatch-message. + // Only wires up when the agents-api substrate is loaded — its + // register_dispatch_message_handler() helper is the canonical signal + // that the dispatch filter contract is present on this install. + if ( function_exists( 'AgentsAPI\\AI\\Channels\\register_dispatch_message_handler' ) ) { + \DataMachineCode\Channels\CliChannelTransport::register(); + } + // Register ability categories on the correct hook (must happen during wp_abilities_api_categories_init). add_action( 'wp_abilities_api_categories_init', 'datamachine_code_register_ability_categories' ); } diff --git a/inc/Channels/CliChannelRegistry.php b/inc/Channels/CliChannelRegistry.php new file mode 100644 index 0000000..6549aeb --- /dev/null +++ b/inc/Channels/CliChannelRegistry.php @@ -0,0 +1,255 @@ +, + * detach?: bool, + * timeout?: int, + * env?: array, + * cwd?: string|null, + * } + */ +class CliChannelRegistry { + + /** + * Filter and option key used for the channel registry. + * + * @var string + */ + public const REGISTRY_KEY = 'datamachine_code_cli_channels'; + + /** + * Return the full registered channel map. + * + * Invalid entries are silently dropped so a malformed entry can never + * cascade into the transport. Validation is intentionally minimal: the + * transport itself does not care what command it runs as long as the + * shape is right; site admins own the policy of what commands they + * register. + * + * @since 0.43.0 + * + * @return array> Channel name => config map. + */ + public static function get_channels(): array { + $option_value = array(); + if ( function_exists( 'get_option' ) ) { + $raw = get_option( self::REGISTRY_KEY, array() ); + if ( is_array( $raw ) ) { + $option_value = $raw; + } + } + + $channels = $option_value; + if ( function_exists( 'apply_filters' ) ) { + /** + * Filter the CLI channel registry map. + * + * Consumers register channel configurations here. Each entry must + * be a valid config array — see {@see CliChannelRegistry::normalize_entry()}. + * + * @since 0.43.0 + * + * @param array> $channels Existing registry. + */ + $filtered = apply_filters( self::REGISTRY_KEY, $channels ); + if ( is_array( $filtered ) ) { + $channels = $filtered; + } + } + + $valid = array(); + foreach ( $channels as $name => $config ) { + if ( ! is_string( $name ) || '' === $name ) { + continue; + } + if ( ! is_array( $config ) ) { + continue; + } + $normalized = self::normalize_entry( $config ); + if ( null === $normalized ) { + continue; + } + $valid[ $name ] = $normalized; + } + + return $valid; + } + + /** + * Look up a single channel by name. + * + * @since 0.43.0 + * + * @param string $channel Channel identifier. + * @return array|null Normalized config, or null if unknown / invalid. + */ + public static function lookup( string $channel ): ?array { + if ( '' === $channel ) { + return null; + } + + $channels = self::get_channels(); + if ( ! isset( $channels[ $channel ] ) ) { + return null; + } + + return $channels[ $channel ]; + } + + /** + * Validate and normalize a single channel config entry. + * + * Returns the normalized array (with defaults applied) on success, or + * null when the entry is malformed enough that the transport could not + * reasonably execute it. The shape requirements are intentionally + * narrow: + * + * - `command` must be a non-empty string. + * - `args` must be an array of strings (empty allowed). + * - `detach` defaults to true. + * - `timeout` defaults to 30 seconds and is only meaningful when + * `detach` is false. + * - `env` defaults to an empty array. + * - `cwd` defaults to null. + * + * @since 0.43.0 + * + * @param array $config Raw config entry. + * @return array|null Normalized config or null if invalid. + */ + public static function normalize_entry( array $config ): ?array { + $command = $config['command'] ?? null; + if ( ! is_string( $command ) || '' === trim( $command ) ) { + return null; + } + + $args = $config['args'] ?? array(); + if ( ! is_array( $args ) ) { + return null; + } + $normalized_args = array(); + foreach ( $args as $arg ) { + if ( ! is_string( $arg ) ) { + return null; + } + $normalized_args[] = $arg; + } + + $detach = $config['detach'] ?? true; + if ( ! is_bool( $detach ) ) { + $detach = (bool) $detach; + } + + $timeout = $config['timeout'] ?? 30; + if ( ! is_int( $timeout ) || $timeout < 0 ) { + $timeout = 30; + } + + $env = $config['env'] ?? array(); + if ( ! is_array( $env ) ) { + $env = array(); + } + $normalized_env = array(); + foreach ( $env as $env_key => $env_value ) { + if ( ! is_string( $env_key ) || '' === $env_key ) { + continue; + } + if ( ! is_scalar( $env_value ) ) { + continue; + } + $normalized_env[ $env_key ] = (string) $env_value; + } + + $cwd = $config['cwd'] ?? null; + if ( null !== $cwd && ( ! is_string( $cwd ) || '' === $cwd ) ) { + $cwd = null; + } + + return array( + 'command' => $command, + 'args' => $normalized_args, + 'detach' => $detach, + 'timeout' => $timeout, + 'env' => $normalized_env, + 'cwd' => $cwd, + ); + } + + /** + * Substitute canonical tokens into an args array. + * + * Tokens are replaced inside each string argument via simple string + * replacement. The args list is then passed to `proc_open` as an array + * — there is no shell interpolation step, so a `{message}` containing + * shell metacharacters is delivered to the child process as a single + * argv entry, untouched. + * + * Recognized tokens: `{recipient}`, `{message}`, `{conversation_id}`, + * `{channel}`. + * + * Unknown tokens are left as-is. Missing input keys substitute the + * empty string. + * + * @since 0.43.0 + * + * @param array $args Template args. + * @param array $input Canonical dispatch-message input. + * @return array Args with tokens substituted. + */ + public static function substitute_tokens( array $args, array $input ): array { + $replacements = array( + '{recipient}' => self::stringify( $input['recipient'] ?? '' ), + '{message}' => self::stringify( $input['message'] ?? '' ), + '{conversation_id}' => self::stringify( $input['conversation_id'] ?? '' ), + '{channel}' => self::stringify( $input['channel'] ?? '' ), + ); + + $result = array(); + foreach ( $args as $arg ) { + $result[] = strtr( $arg, $replacements ); + } + return $result; + } + + /** + * Convert a value to a string for token substitution. + * + * @param mixed $value Source value. + * @return string Stringified value. + */ + private static function stringify( $value ): string { + if ( null === $value ) { + return ''; + } + if ( is_scalar( $value ) ) { + return (string) $value; + } + return ''; + } +} diff --git a/inc/Channels/CliChannelTransport.php b/inc/Channels/CliChannelTransport.php new file mode 100644 index 0000000..4659238 --- /dev/null +++ b/inc/Channels/CliChannelTransport.php @@ -0,0 +1,455 @@ + $input Canonical dispatch-message input. + * @return callable|null + */ + public static function maybe_claim( $existing, $input ) { + if ( null !== $existing && is_callable( $existing ) ) { + return $existing; + } + + if ( ! is_array( $input ) ) { + return $existing; + } + + if ( ! Environment::has_shell() ) { + self::log_debug( 'cli_transport_declined', array( 'reason' => 'no_shell' ) ); + return $existing; + } + + $channel = isset( $input['channel'] ) && is_string( $input['channel'] ) ? $input['channel'] : ''; + if ( '' === $channel ) { + return $existing; + } + + $config = CliChannelRegistry::lookup( $channel ); + if ( null === $config ) { + self::log_debug( + 'cli_transport_declined', + array( + 'reason' => 'unknown_channel', + 'channel' => $channel, + ) + ); + return $existing; + } + + return array( self::class, 'execute' ); + } + + /** + * Execute a registered CLI dispatch. + * + * Returns the canonical output shape on success, or `WP_Error` on + * failure. The substrate fires `agents_dispatch_message_failed` for + * the latter. + * + * @since 0.43.0 + * + * @param array $input Canonical dispatch-message input. + * @return array|WP_Error + */ + public static function execute( array $input ) { + $channel = isset( $input['channel'] ) && is_string( $input['channel'] ) ? $input['channel'] : ''; + if ( '' === $channel ) { + return new WP_Error( + 'datamachine_code_cli_dispatch_invalid_input', + 'agents/dispatch-message input is missing a channel identifier.' + ); + } + + $config = CliChannelRegistry::lookup( $channel ); + if ( null === $config ) { + return new WP_Error( + 'datamachine_code_cli_dispatch_unknown_channel', + sprintf( 'No CLI channel registered for "%s".', $channel ) + ); + } + + $recipient = isset( $input['recipient'] ) && is_scalar( $input['recipient'] ) ? (string) $input['recipient'] : ''; + + $command_args = CliChannelRegistry::substitute_tokens( $config['args'], $input ); + array_unshift( $command_args, $config['command'] ); + + $detach = (bool) ( $config['detach'] ?? true ); + $timeout = isset( $config['timeout'] ) && is_int( $config['timeout'] ) ? $config['timeout'] : self::DEFAULT_TIMEOUT_SECONDS; + $cwd = isset( $config['cwd'] ) && is_string( $config['cwd'] ) && '' !== $config['cwd'] ? $config['cwd'] : null; + $env = self::build_env_map( isset( $config['env'] ) && is_array( $config['env'] ) ? $config['env'] : array() ); + + if ( $detach ) { + return self::dispatch_detached( $channel, $recipient, $command_args, $cwd, $env ); + } + + return self::dispatch_sync( $channel, $recipient, $command_args, $cwd, $env, $timeout ); + } + + /** + * Fire-and-forget dispatch. + * + * @param string $channel Channel id. + * @param string $recipient Recipient id. + * @param array $argv Command + args. + * @param string|null $cwd Working directory. + * @param array|null $env Environment map. + * @return array|WP_Error + */ + private static function dispatch_detached( string $channel, string $recipient, array $argv, ?string $cwd, ?array $env ) { + $descriptors = array( + 0 => array( 'file', '/dev/null', 'r' ), + 1 => array( 'file', '/dev/null', 'w' ), + 2 => array( 'file', '/dev/null', 'w' ), + ); + + $started_at = microtime( true ); + + $process = self::open_process( $argv, $descriptors, $cwd, $env, true ); + if ( $process instanceof WP_Error ) { + return $process; + } + + $pid = null; + $status = proc_get_status( $process ); + if ( is_array( $status ) && isset( $status['pid'] ) ) { + $pid = (int) $status['pid']; + } + + // Release the handle without waiting. The child keeps running in + // its own session because proc_open was given start_new_session. + proc_close( $process ); + + $duration_ms = (int) round( ( microtime( true ) - $started_at ) * 1000 ); + + return array( + 'sent' => true, + 'channel' => $channel, + 'recipient' => $recipient, + 'message_id' => null !== $pid ? (string) $pid : null, + 'metadata' => array( + 'mode' => 'detached', + 'pid' => $pid, + 'duration_ms' => $duration_ms, + ), + ); + } + + /** + * Synchronous dispatch with stdout/stderr capture and timeout. + * + * @param string $channel Channel id. + * @param string $recipient Recipient id. + * @param array $argv Command + args. + * @param string|null $cwd Working directory. + * @param array|null $env Environment map. + * @param int $timeout Timeout in seconds. + * @return array|WP_Error + */ + private static function dispatch_sync( string $channel, string $recipient, array $argv, ?string $cwd, ?array $env, int $timeout ) { + $descriptors = array( + 0 => array( 'pipe', 'r' ), + 1 => array( 'pipe', 'w' ), + 2 => array( 'pipe', 'w' ), + ); + + $pipes = array(); + $started_at = microtime( true ); + + $process = self::open_process( $argv, $descriptors, $cwd, $env, false, $pipes ); + if ( $process instanceof WP_Error ) { + return $process; + } + + // Close stdin so the child doesn't block on read. + if ( isset( $pipes[0] ) && is_resource( $pipes[0] ) ) { + fclose( $pipes[0] ); + } + + // Non-blocking reads on stdout/stderr so we can enforce the timeout. + if ( isset( $pipes[1] ) && is_resource( $pipes[1] ) ) { + stream_set_blocking( $pipes[1], false ); + } + if ( isset( $pipes[2] ) && is_resource( $pipes[2] ) ) { + stream_set_blocking( $pipes[2], false ); + } + + $stdout = ''; + $stderr = ''; + $timed_out = false; + $deadline = $started_at + max( 1, $timeout ); + + while ( true ) { + $status = proc_get_status( $process ); + + if ( isset( $pipes[1] ) && is_resource( $pipes[1] ) ) { + $chunk = stream_get_contents( $pipes[1] ); + if ( is_string( $chunk ) && '' !== $chunk ) { + $stdout .= $chunk; + } + } + if ( isset( $pipes[2] ) && is_resource( $pipes[2] ) ) { + $chunk = stream_get_contents( $pipes[2] ); + if ( is_string( $chunk ) && '' !== $chunk ) { + $stderr .= $chunk; + } + } + + if ( ! is_array( $status ) || false === $status['running'] ) { + break; + } + + if ( microtime( true ) >= $deadline ) { + $timed_out = true; + proc_terminate( $process, 15 ); // SIGTERM + // Give the child a brief grace window to flush. + usleep( 100000 ); + $status = proc_get_status( $process ); + if ( is_array( $status ) && true === $status['running'] ) { + proc_terminate( $process, 9 ); // SIGKILL + } + break; + } + + usleep( 20000 ); + } + + // Drain any remaining output. + foreach ( array( 1, 2 ) as $fd ) { + if ( ! isset( $pipes[ $fd ] ) || ! is_resource( $pipes[ $fd ] ) ) { + continue; + } + $chunk = stream_get_contents( $pipes[ $fd ] ); + if ( is_string( $chunk ) && '' !== $chunk ) { + if ( 1 === $fd ) { + $stdout .= $chunk; + } else { + $stderr .= $chunk; + } + } + fclose( $pipes[ $fd ] ); + } + + $exit_code = proc_close( $process ); + $duration_ms = (int) round( ( microtime( true ) - $started_at ) * 1000 ); + + if ( $timed_out ) { + return new WP_Error( + 'datamachine_code_cli_dispatch_timeout', + sprintf( 'CLI channel "%s" exceeded the %d second timeout.', $channel, $timeout ), + array( + 'channel' => $channel, + 'recipient' => $recipient, + 'stdout' => self::truncate_output( $stdout ), + 'stderr' => self::truncate_output( $stderr ), + 'duration_ms' => $duration_ms, + ) + ); + } + + if ( 0 !== $exit_code ) { + return new WP_Error( + 'datamachine_code_cli_dispatch_nonzero_exit', + sprintf( 'CLI channel "%s" exited with code %d.', $channel, $exit_code ), + array( + 'channel' => $channel, + 'recipient' => $recipient, + 'exit_code' => $exit_code, + 'stdout' => self::truncate_output( $stdout ), + 'stderr' => self::truncate_output( $stderr ), + 'duration_ms' => $duration_ms, + ) + ); + } + + return array( + 'sent' => true, + 'channel' => $channel, + 'recipient' => $recipient, + 'message_id' => null, + 'metadata' => array( + 'mode' => 'sync', + 'exit_code' => $exit_code, + 'duration_ms' => $duration_ms, + 'stdout' => self::truncate_output( $stdout ), + 'stderr' => self::truncate_output( $stderr ), + ), + ); + } + + /** + * Open a child process. Wraps `proc_open` to handle the array-argv vs + * string-command preference and detached-session option, and to + * surface a typed failure. + * + * @param array $argv Command argv (index 0 is the program). + * @param array $descriptors Descriptor spec for proc_open. + * @param string|null $cwd Working directory. + * @param array|null $env Environment map (null inherits parent). + * @param bool $detached Whether to start a new session. + * @param array $pipes Output pipes (by reference). + * @return resource|WP_Error + */ + private static function open_process( array $argv, array $descriptors, ?string $cwd, ?array $env, bool $detached, array &$pipes = array() ) { + if ( ! function_exists( 'proc_open' ) ) { + return new WP_Error( + 'datamachine_code_cli_dispatch_no_proc_open', + 'proc_open is not available on this host.' + ); + } + + $options = array(); + if ( $detached ) { + // `start_new_session` detaches the child into its own process + // group so it survives PHP request teardown. + $options['start_new_session'] = true; + } + + // Hand argv to proc_open as an array so PHP bypasses the shell. + $process = @proc_open( $argv, $descriptors, $pipes, $cwd, $env, $options ); + + if ( ! is_resource( $process ) ) { + return new WP_Error( + 'datamachine_code_cli_dispatch_spawn_failed', + sprintf( 'Failed to spawn CLI process "%s".', $argv[0] ?? '' ) + ); + } + + return $process; + } + + /** + * Build the environment map passed to the child process. + * + * The parent's PATH is forwarded so PATH-relative commands resolve, + * but no other inherited variables leak. Configured `env` overrides + * the inherited PATH if it provides one. + * + * @param array $configured Configured env map. + * @return array + */ + private static function build_env_map( array $configured ): array { + $env = array(); + + $parent_path = getenv( 'PATH' ); + if ( is_string( $parent_path ) && '' !== $parent_path ) { + $env['PATH'] = $parent_path; + } + + foreach ( $configured as $key => $value ) { + $env[ $key ] = $value; + } + + return $env; + } + + /** + * Cap captured output so a runaway child cannot blow up the response. + * + * @param string $output Captured output. + * @return string Truncated output. + */ + private static function truncate_output( string $output ): string { + $limit = 8192; + if ( strlen( $output ) <= $limit ) { + return $output; + } + return substr( $output, 0, $limit ) . "\n[...truncated]"; + } + + /** + * Emit a debug-level log entry, if the host has logging hooks wired. + * + * @param string $event Event slug. + * @param array $context Structured context. + */ + private static function log_debug( string $event, array $context ): void { + if ( function_exists( 'do_action' ) ) { + do_action( 'datamachine_code_cli_transport_debug', $event, $context ); + } + } +} diff --git a/tests/smoke-cli-channel-transport.php b/tests/smoke-cli-channel-transport.php new file mode 100644 index 0000000..9756045 --- /dev/null +++ b/tests/smoke-cli-channel-transport.php @@ -0,0 +1,362 @@ +code; } + public function get_error_message(): string { return $this->message; } + public function get_error_data(): mixed { return $this->data; } + } + } + + if ( ! function_exists( 'is_wp_error' ) ) { + function is_wp_error( mixed $thing ): bool { return $thing instanceof \WP_Error; } + } + + // Filter / option shims. The transport reads channel config via these. + if ( ! function_exists( 'apply_filters' ) ) { + function apply_filters( string $hook, mixed $value, ...$args ): mixed { + global $datamachine_code_test_filters; + if ( ! is_array( $datamachine_code_test_filters ) ) { + return $value; + } + if ( ! isset( $datamachine_code_test_filters[ $hook ] ) ) { + return $value; + } + foreach ( $datamachine_code_test_filters[ $hook ] as $callback ) { + $value = $callback( $value, ...$args ); + } + return $value; + } + } + + if ( ! function_exists( 'add_filter' ) ) { + function add_filter( string $hook, callable $callback, int $priority = 10, int $accepted_args = 1 ): void { + global $datamachine_code_test_filters; + if ( ! is_array( $datamachine_code_test_filters ) ) { + $datamachine_code_test_filters = array(); + } + $datamachine_code_test_filters[ $hook ][] = $callback; + unset( $priority, $accepted_args ); + } + } + + if ( ! function_exists( 'do_action' ) ) { + function do_action( string $hook, ...$args ): void { + unset( $hook, $args ); + } + } + + if ( ! function_exists( 'get_option' ) ) { + function get_option( string $key, mixed $default_value = false ): mixed { + global $datamachine_code_test_options; + if ( ! is_array( $datamachine_code_test_options ) ) { + return $default_value; + } + return $datamachine_code_test_options[ $key ] ?? $default_value; + } + } + + require __DIR__ . '/../inc/Environment.php'; + require __DIR__ . '/../inc/Channels/CliChannelRegistry.php'; + require __DIR__ . '/../inc/Channels/CliChannelTransport.php'; + + $failures = array(); + $assert = static function ( string $label, bool $condition ) use ( &$failures ): void { + if ( $condition ) { + echo " [PASS] {$label}\n"; + return; + } + $failures[] = $label; + echo " [FAIL] {$label}\n"; + }; + + echo "=== smoke-cli-channel-transport ===\n"; + + // Resolve standard stub binaries. Bail with a clear diagnostic if + // the host is missing them — the runtime needs real subprocess capability. + $echo_bin = '/bin/echo'; + $true_bin = '/bin/true'; + $false_bin = '/bin/false'; + $sleep_bin = '/bin/sleep'; + foreach ( array( $echo_bin, $true_bin, $false_bin, $sleep_bin ) as $candidate ) { + if ( ! is_executable( $candidate ) ) { + echo " [SKIP] stub binary {$candidate} not present; smoke cannot run on this host\n"; + exit( 0 ); + } + } + + // --------------------------------------------------------------- + // CliChannelRegistry: normalization + lookup + // --------------------------------------------------------------- + + global $datamachine_code_test_options, $datamachine_code_test_filters; + $datamachine_code_test_options = array(); + $datamachine_code_test_filters = array(); + + $valid_entry = array( + 'command' => $echo_bin, + 'args' => array( '--', '{recipient}', '{message}' ), + 'detach' => false, + 'timeout' => 5, + ); + $normalized = \DataMachineCode\Channels\CliChannelRegistry::normalize_entry( $valid_entry ); + $assert( 'valid entry normalizes', is_array( $normalized ) && $normalized['command'] === $echo_bin ); + $assert( 'normalized entry has args array', is_array( $normalized['args'] ?? null ) && count( $normalized['args'] ) === 3 ); + $assert( 'normalized entry preserves detach false', false === ( $normalized['detach'] ?? null ) ); + $assert( 'normalized entry preserves timeout', 5 === ( $normalized['timeout'] ?? null ) ); + + $bad_no_command = \DataMachineCode\Channels\CliChannelRegistry::normalize_entry( array( 'args' => array() ) ); + $assert( 'missing command is rejected', null === $bad_no_command ); + + $bad_args_type = \DataMachineCode\Channels\CliChannelRegistry::normalize_entry( array( + 'command' => $echo_bin, + 'args' => array( 'ok', 123 ), + ) ); + $assert( 'non-string arg is rejected', null === $bad_args_type ); + + $datamachine_code_test_options['datamachine_code_cli_channels'] = array( + 'option-channel' => array( + 'command' => $echo_bin, + 'args' => array( 'from-option' ), + ), + 'bogus-entry' => 'not-an-array', + '' => array( 'command' => $echo_bin, 'args' => array() ), + ); + + add_filter( 'datamachine_code_cli_channels', static function ( array $existing ): array { + $existing['filter-channel'] = array( + 'command' => '/bin/echo', + 'args' => array( 'from-filter' ), + ); + return $existing; + } ); + + $channels = \DataMachineCode\Channels\CliChannelRegistry::get_channels(); + $assert( 'option-defined channel is present', isset( $channels['option-channel'] ) ); + $assert( 'filter-defined channel is present', isset( $channels['filter-channel'] ) ); + $assert( 'malformed entries are dropped', ! isset( $channels['bogus-entry'] ) && ! isset( $channels[''] ) ); + + $lookup_hit = \DataMachineCode\Channels\CliChannelRegistry::lookup( 'option-channel' ); + $lookup_miss = \DataMachineCode\Channels\CliChannelRegistry::lookup( 'does-not-exist' ); + $assert( 'lookup returns config for known channel', is_array( $lookup_hit ) ); + $assert( 'lookup returns null for unknown channel', null === $lookup_miss ); + + // --------------------------------------------------------------- + // Token substitution: tokens are replaced inside args, no shell interp + // --------------------------------------------------------------- + + $substituted = \DataMachineCode\Channels\CliChannelRegistry::substitute_tokens( + array( '--to', '{recipient}', '--msg', '{message}', '--conv', '{conversation_id}', '--ch', '{channel}', 'literal' ), + array( + 'recipient' => 'user-123', + 'message' => 'hello $(rm -rf /) world', + 'conversation_id' => 'conv-abc', + 'channel' => 'fixture-channel', + ) + ); + $assert( 'recipient token substituted', $substituted[1] === 'user-123' ); + $assert( 'message token substituted verbatim (no shell interp)', $substituted[3] === 'hello $(rm -rf /) world' ); + $assert( 'conversation_id token substituted', $substituted[5] === 'conv-abc' ); + $assert( 'channel token substituted', $substituted[7] === 'fixture-channel' ); + $assert( 'literal arg untouched', $substituted[8] === 'literal' ); + + $partial = \DataMachineCode\Channels\CliChannelRegistry::substitute_tokens( + array( '--user={recipient}' ), + array( 'recipient' => 'alice' ) + ); + $assert( 'token substituted inside compound arg', $partial[0] === '--user=alice' ); + + $missing_input = \DataMachineCode\Channels\CliChannelRegistry::substitute_tokens( + array( '{recipient}', '{message}' ), + array() + ); + $assert( 'missing input substitutes empty string', $missing_input === array( '', '' ) ); + + // --------------------------------------------------------------- + // Transport claim semantics + // --------------------------------------------------------------- + + // Reset registry to a single known channel. + $datamachine_code_test_options['datamachine_code_cli_channels'] = array( + 'sync-echo' => array( + 'command' => $echo_bin, + 'args' => array( '{recipient}:{message}' ), + 'detach' => false, + 'timeout' => 5, + ), + 'sync-true' => array( + 'command' => $true_bin, + 'args' => array(), + 'detach' => false, + 'timeout' => 5, + ), + 'sync-false' => array( + 'command' => $false_bin, + 'args' => array(), + 'detach' => false, + 'timeout' => 5, + ), + 'sync-sleep' => array( + 'command' => $sleep_bin, + 'args' => array( '5' ), + 'detach' => false, + 'timeout' => 1, + ), + 'detached-true' => array( + 'command' => $true_bin, + 'args' => array(), + 'detach' => true, + ), + ); + $datamachine_code_test_filters = array(); + + $claim_known = \DataMachineCode\Channels\CliChannelTransport::maybe_claim( + null, + array( 'channel' => 'sync-true', 'recipient' => 'r', 'message' => 'm' ) + ); + $assert( 'claims registered channel', is_callable( $claim_known ) ); + + $claim_unknown = \DataMachineCode\Channels\CliChannelTransport::maybe_claim( + null, + array( 'channel' => 'nope', 'recipient' => 'r', 'message' => 'm' ) + ); + $assert( 'declines unknown channel (returns existing null)', null === $claim_unknown ); + + $existing_callable = static function () { + return new \WP_Error( 'prior', 'prior handler' ); + }; + $claim_existing = \DataMachineCode\Channels\CliChannelTransport::maybe_claim( + $existing_callable, + array( 'channel' => 'sync-true', 'recipient' => 'r', 'message' => 'm' ) + ); + $assert( 'preserves prior handler at filter chain', $claim_existing === $existing_callable ); + + $claim_empty_channel = \DataMachineCode\Channels\CliChannelTransport::maybe_claim( + null, + array( 'channel' => '', 'recipient' => 'r', 'message' => 'm' ) + ); + $assert( 'declines empty channel name', null === $claim_empty_channel ); + + // --------------------------------------------------------------- + // Sync dispatch: success path + // --------------------------------------------------------------- + + $ok = \DataMachineCode\Channels\CliChannelTransport::execute( array( + 'channel' => 'sync-echo', + 'recipient' => 'user-1', + 'message' => 'hi-there', + ) ); + $assert( 'sync success returns array', is_array( $ok ) && ! ( $ok instanceof \WP_Error ) ); + if ( is_array( $ok ) ) { + $assert( 'sync success sent=true', true === ( $ok['sent'] ?? null ) ); + $assert( 'sync success channel echoes input', 'sync-echo' === ( $ok['channel'] ?? null ) ); + $assert( 'sync success recipient echoes input', 'user-1' === ( $ok['recipient'] ?? null ) ); + $metadata = $ok['metadata'] ?? array(); + $assert( 'sync success metadata mode=sync', 'sync' === ( $metadata['mode'] ?? null ) ); + $assert( 'sync success exit_code 0', 0 === ( $metadata['exit_code'] ?? null ) ); + $assert( + 'sync success captures substituted stdout', + isset( $metadata['stdout'] ) && trim( (string) $metadata['stdout'] ) === 'user-1:hi-there' + ); + } + + $ok_true = \DataMachineCode\Channels\CliChannelTransport::execute( array( + 'channel' => 'sync-true', + 'recipient' => 'noop', + 'message' => '', + ) ); + $assert( 'sync /bin/true succeeds', is_array( $ok_true ) && true === ( $ok_true['sent'] ?? false ) ); + + // --------------------------------------------------------------- + // Sync dispatch: failure paths + // --------------------------------------------------------------- + + $fail = \DataMachineCode\Channels\CliChannelTransport::execute( array( + 'channel' => 'sync-false', + 'recipient' => 'x', + 'message' => 'x', + ) ); + $assert( 'nonzero exit returns WP_Error', $fail instanceof \WP_Error ); + if ( $fail instanceof \WP_Error ) { + $assert( + 'nonzero exit error code is machine-readable', + 'datamachine_code_cli_dispatch_nonzero_exit' === $fail->get_error_code() + ); + $data = $fail->get_error_data(); + $assert( 'nonzero exit error data carries exit_code', is_array( $data ) && 1 === ( $data['exit_code'] ?? null ) ); + } + + $timed = \DataMachineCode\Channels\CliChannelTransport::execute( array( + 'channel' => 'sync-sleep', + 'recipient' => 'x', + 'message' => 'x', + ) ); + $assert( 'timeout returns WP_Error', $timed instanceof \WP_Error ); + if ( $timed instanceof \WP_Error ) { + $assert( + 'timeout error code is machine-readable', + 'datamachine_code_cli_dispatch_timeout' === $timed->get_error_code() + ); + } + + $unknown = \DataMachineCode\Channels\CliChannelTransport::execute( array( + 'channel' => 'nope', + 'recipient' => 'x', + 'message' => 'x', + ) ); + $assert( 'unknown channel from execute returns WP_Error', $unknown instanceof \WP_Error ); + + // --------------------------------------------------------------- + // Detached dispatch + // --------------------------------------------------------------- + + $detached = \DataMachineCode\Channels\CliChannelTransport::execute( array( + 'channel' => 'detached-true', + 'recipient' => 'r', + 'message' => 'fire-and-forget', + ) ); + $assert( 'detached returns array', is_array( $detached ) && ! ( $detached instanceof \WP_Error ) ); + if ( is_array( $detached ) ) { + $assert( 'detached sent=true', true === ( $detached['sent'] ?? null ) ); + $assert( + 'detached message_id is numeric PID string', + is_string( $detached['message_id'] ?? null ) && ctype_digit( (string) $detached['message_id'] ) + ); + $metadata = $detached['metadata'] ?? array(); + $assert( 'detached metadata mode=detached', 'detached' === ( $metadata['mode'] ?? null ) ); + } + + // --------------------------------------------------------------- + // Summary + // --------------------------------------------------------------- + + if ( ! empty( $failures ) ) { + echo "\nFAIL: " . count( $failures ) . " assertion(s)\n"; + foreach ( $failures as $failure ) { + echo " - {$failure}\n"; + } + exit( 1 ); + } + + echo "\nOK\n"; + exit( 0 ); +}