From bcaaff25e147b482ee5b9749482d7db8bd209595 Mon Sep 17 00:00:00 2001 From: Lindsay Snider Date: Wed, 19 Nov 2014 16:57:01 -0500 Subject: [PATCH 01/17] clean up documentation and type hinting --- fork_daemon.php | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/fork_daemon.php b/fork_daemon.php index af1692c..ecb0db0 100644 --- a/fork_daemon.php +++ b/fork_daemon.php @@ -99,7 +99,7 @@ class fork_daemon /** * Function the parent invokes before forking a child * @access protected - * @var integer $parent_function_prefork + * @var string[] $parent_function_prefork */ protected $parent_function_prefork = ''; @@ -530,7 +530,7 @@ public function register_child_run($function_name, $bucket = self::DEFAULT_BUCKE * @param array names of functions to be called. * @return bool true if the callback was successfully registered, false if it failed */ - public function register_parent_prefork($function_names) + public function register_parent_prefork(array $function_names) { $this->parent_function_prefork = $function_names; return true; @@ -615,7 +615,7 @@ public function register_child_exit($function_name, $bucket = self::DEFAULT_BUCK } /** - * Allows the app to set the call back function for when a child process is killed to exceeding its max runtime + * Allows the app to set the call back function for when a child process is killed for exceeding its max runtime * @access public * @param string name of function to be called. * @param int $bucket the bucket to use @@ -961,7 +961,7 @@ public function received_exit_request($requested = null) * @param int $bucket the bucket to use * @param bool $sort_queue true to sort the work unit queue */ - public function addwork(array $new_work_units, $identifier = '', $bucket = self::DEFAULT_BUCKET, $sort_queue = false) + public function addwork($new_work_units, $identifier = '', $bucket = self::DEFAULT_BUCKET, $sort_queue = false) { // ensure bucket is setup before we try to add data to it if (! array_key_exists($bucket, $this->work_units)) From 883d6f579e1249053c4e698405fd039e04585600 Mon Sep 17 00:00:00 2001 From: Kevin Bonner Date: Mon, 31 Oct 2016 10:19:32 -0400 Subject: [PATCH 02/17] add dispatch function --- fork_daemon.php | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/fork_daemon.php b/fork_daemon.php index ecb0db0..7a90370 100644 --- a/fork_daemon.php +++ b/fork_daemon.php @@ -1503,6 +1503,15 @@ public function has_result($bucket = self::DEFAULT_BUCKET) return (! empty($this->results[$bucket])); } + /** + * Run signals sent to the process + */ + public function dispatch_signals() + { + // Run signal handlers + pcntl_signal_dispatch(); + } + /** * Checks if any changed child sockets are in the bucket. * From b5c171010927e81da514a51120fcb6bebdd34268 Mon Sep 17 00:00:00 2001 From: Kevin Bonner Date: Mon, 31 Oct 2016 18:39:27 -0400 Subject: [PATCH 03/17] make dispatch_signals static --- fork_daemon.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fork_daemon.php b/fork_daemon.php index 7a90370..668e0dd 100644 --- a/fork_daemon.php +++ b/fork_daemon.php @@ -1506,7 +1506,7 @@ public function has_result($bucket = self::DEFAULT_BUCKET) /** * Run signals sent to the process */ - public function dispatch_signals() + static public function dispatch_signals() { // Run signal handlers pcntl_signal_dispatch(); From a40020c501ad1e0094a9c8a9b1e0fddd7dba420c Mon Sep 17 00:00:00 2001 From: Lindsay Snider Date: Thu, 8 Dec 2016 17:06:01 +0000 Subject: [PATCH 04/17] add ability for child to send message to parent before child exit --- fork_daemon.php | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/fork_daemon.php b/fork_daemon.php index 668e0dd..6102f83 100644 --- a/fork_daemon.php +++ b/fork_daemon.php @@ -185,6 +185,13 @@ class fork_daemon */ protected $exit_request_status = false; + /** + * In the child process, stores the socket to get results back to the parent + * + * @var null + */ + protected $child_socket_to_parent = null; + /**************** SERVER CONTROLS ****************/ /** * Upper limit on the number of children started. @@ -1506,12 +1513,21 @@ public function has_result($bucket = self::DEFAULT_BUCKET) /** * Run signals sent to the process */ - static public function dispatch_signals() + public static function dispatch_signals() { // Run signal handlers pcntl_signal_dispatch(); } + /** + * Send a message from the child to the parent + * @param $result + */ + public function child_send_result_to_parent($result) + { + self::socket_send($this->child_socket_to_parent, $result); + } + /** * Checks if any changed child sockets are in the bucket. * @@ -1764,6 +1780,9 @@ protected function fork_work_unit($work_unit, $identifier = '', $bucket = self:: $this->forked_children = null; $this->results = null; + // save the socket from child to parent to support $this->child_send_result_to_parent() + $this->child_socket_to_parent = $socket_parent; + // set child properties $this->child_bucket = $bucket; From 33e855e6c7773b396f7c98b9613e9533a0ee3eb0 Mon Sep 17 00:00:00 2001 From: Lindsay Snider Date: Thu, 16 Feb 2017 19:47:02 +0000 Subject: [PATCH 05/17] avoid sending null child result to parent --- fork_daemon.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fork_daemon.php b/fork_daemon.php index 6102f83..260c99d 100644 --- a/fork_daemon.php +++ b/fork_daemon.php @@ -1799,7 +1799,8 @@ protected function fork_work_unit($work_unit, $identifier = '', $bucket = self:: $result = $this->invoke_callback($this->child_function_run[$bucket], $work_unit, false); // send the result to the parent - self::socket_send($socket_parent, $result); + if (is_null($result)) + self::socket_send($socket_parent, $result); // delay the child's exit slightly to avoid race conditions usleep(500); From 4a90976c836e8fd8b1f34cf0cf0b6b737c49f22a Mon Sep 17 00:00:00 2001 From: Lindsay Snider Date: Thu, 16 Feb 2017 19:49:54 +0000 Subject: [PATCH 06/17] ensure IPC communication not interrupted by signals --- fork_daemon.php | 96 ++++++++++++++++++++++++++----------------------- 1 file changed, 52 insertions(+), 44 deletions(-) diff --git a/fork_daemon.php b/fork_daemon.php index 260c99d..74c4de4 100644 --- a/fork_daemon.php +++ b/fork_daemon.php @@ -1933,30 +1933,34 @@ protected function ipc_init() */ protected function socket_send($socket, $message) { - $serialized_message = @serialize($message); - if ($serialized_message == false) - { - $this->log('socket_send failed to serialize message', self::LOG_LEVEL_CRIT); - return false; - } - - $header = pack('N', strlen($serialized_message)); - $data = $header . $serialized_message; - $bytes_left = strlen($data); - while ($bytes_left > 0) + // do not process signals while we are sending an IPC message + declare(ticks = 0) { - $bytes_sent = @socket_write($socket, $data); - if ($bytes_sent === false) + $serialized_message = @serialize($message); + if ($serialized_message == false) { - $this->log('socket_send error: ' . socket_strerror(socket_last_error()), self::LOG_LEVEL_CRIT); + $this->log('socket_send failed to serialize message', self::LOG_LEVEL_CRIT); return false; } - $bytes_left -= $bytes_sent; - $data = substr($data, $bytes_sent); - } + $header = pack('N', strlen($serialized_message)); + $data = $header . $serialized_message; + $bytes_left = strlen($data); + while ($bytes_left > 0) + { + $bytes_sent = @socket_write($socket, $data); + if ($bytes_sent === false) + { + $this->log('socket_send error: ' . socket_strerror(socket_last_error()), self::LOG_LEVEL_CRIT); + return false; + } - return true; + $bytes_left -= $bytes_sent; + $data = substr($data, $bytes_sent); + } + + return true; + } } /** @@ -1967,40 +1971,44 @@ protected function socket_send($socket, $message) */ protected function socket_receive($socket) { - // initially read to the length of the header size, then - // expand to read more - $bytes_total = self::SOCKET_HEADER_SIZE; - $bytes_read = 0; - $have_header = false; - $socket_message = ''; - while ($bytes_read < $bytes_total) + // do not process signals while we are receiving an IPC message + declare(ticks = 0) { - $read = @socket_read($socket, $bytes_total - $bytes_read); - if ($read === false) + // initially read to the length of the header size, then + // expand to read more + $bytes_total = self::SOCKET_HEADER_SIZE; + $bytes_read = 0; + $have_header = false; + $socket_message = ''; + while ($bytes_read < $bytes_total) { - $this->log('socket_receive error: ' . socket_strerror(socket_last_error()), self::LOG_LEVEL_CRIT); - return false; - } + $read = @socket_read($socket, $bytes_total - $bytes_read); + if ($read === false) + { + $this->log('socket_receive error: ' . socket_strerror(socket_last_error()), self::LOG_LEVEL_CRIT); + return false; + } - // blank socket_read means done - if ($read == '') - break; + // blank socket_read means done + if ($read == '') + break; - $bytes_read += strlen($read); - $socket_message .= $read; + $bytes_read += strlen($read); + $socket_message .= $read; - if (!$have_header && $bytes_read >= self::SOCKET_HEADER_SIZE) - { - $have_header = true; - list($bytes_total) = array_values(unpack('N', $socket_message)); - $bytes_read = 0; - $socket_message = ''; + if (!$have_header && $bytes_read >= self::SOCKET_HEADER_SIZE) + { + $have_header = true; + list($bytes_total) = array_values(unpack('N', $socket_message)); + $bytes_read = 0; + $socket_message = ''; + } } - } - $message = @unserialize($socket_message); + $message = @unserialize($socket_message); - return $message; + return $message; + } } /** From 32251263734af9ff4291ab0c5ede0eab2d5d6ea4 Mon Sep 17 00:00:00 2001 From: Lindsay Snider Date: Thu, 16 Feb 2017 20:15:03 +0000 Subject: [PATCH 07/17] code relies on parent_function_prefork being an array --- fork_daemon.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fork_daemon.php b/fork_daemon.php index 74c4de4..677e35d 100644 --- a/fork_daemon.php +++ b/fork_daemon.php @@ -101,7 +101,7 @@ class fork_daemon * @access protected * @var string[] $parent_function_prefork */ - protected $parent_function_prefork = ''; + protected $parent_function_prefork = array(); /** * Function the parent invokes when a child is spawned From b0cd390c43bcd976ec63510442d9a8fb122195d4 Mon Sep 17 00:00:00 2001 From: Lindsay Snider Date: Thu, 16 Feb 2017 20:15:18 +0000 Subject: [PATCH 08/17] cleanup docs --- fork_daemon.php | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/fork_daemon.php b/fork_daemon.php index 677e35d..1d60aa8 100644 --- a/fork_daemon.php +++ b/fork_daemon.php @@ -187,8 +187,7 @@ class fork_daemon /** * In the child process, stores the socket to get results back to the parent - * - * @var null + * @var socket */ protected $child_socket_to_parent = null; From ec1803223ce92ad2308656ea17880efe64e153a1 Mon Sep 17 00:00:00 2001 From: Lindsay Snider Date: Thu, 16 Feb 2017 20:15:41 +0000 Subject: [PATCH 09/17] removed unused method --- fork_daemon.php | 9 --------- 1 file changed, 9 deletions(-) diff --git a/fork_daemon.php b/fork_daemon.php index 1d60aa8..12b3273 100644 --- a/fork_daemon.php +++ b/fork_daemon.php @@ -1509,15 +1509,6 @@ public function has_result($bucket = self::DEFAULT_BUCKET) return (! empty($this->results[$bucket])); } - /** - * Run signals sent to the process - */ - public static function dispatch_signals() - { - // Run signal handlers - pcntl_signal_dispatch(); - } - /** * Send a message from the child to the parent * @param $result From f245b50012bc1bc85302ee262cd7a63aa0f970e6 Mon Sep 17 00:00:00 2001 From: Kevin Bonner Date: Thu, 16 Feb 2017 15:49:30 -0500 Subject: [PATCH 10/17] cleanup codeclimate issues --- fork_daemon.php | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/fork_daemon.php b/fork_daemon.php index 12b3273..18d087f 100644 --- a/fork_daemon.php +++ b/fork_daemon.php @@ -962,8 +962,8 @@ public function received_exit_request($requested = null) /** * Add work to the group of work to be processed * - * @param mixed array of items to be handed back to child in chunks - * @param string a unique identifier for this work + * @param array $new_work_units mixed array of items to be handed back to child in chunks + * @param string $identifier a unique identifier for this work * @param int $bucket the bucket to use * @param bool $sort_queue true to sort the work unit queue */ @@ -1511,7 +1511,7 @@ public function has_result($bucket = self::DEFAULT_BUCKET) /** * Send a message from the child to the parent - * @param $result + * @param string $result */ public function child_send_result_to_parent($result) { @@ -1790,7 +1790,9 @@ protected function fork_work_unit($work_unit, $identifier = '', $bucket = self:: // send the result to the parent if (is_null($result)) + { self::socket_send($socket_parent, $result); + } // delay the child's exit slightly to avoid race conditions usleep(500); From dcb4f2e9ef3a613bc5d00423055727ed4a861a54 Mon Sep 17 00:00:00 2001 From: Lindsay Snider Date: Fri, 17 Feb 2017 22:44:46 +0000 Subject: [PATCH 11/17] fix undefined index issue, remove a fix causing data loss --- fork_daemon.php | 73 +++++++++++++++++++++++-------------------------- 1 file changed, 34 insertions(+), 39 deletions(-) diff --git a/fork_daemon.php b/fork_daemon.php index 18d087f..eb4059d 100644 --- a/fork_daemon.php +++ b/fork_daemon.php @@ -1562,60 +1562,55 @@ protected function get_changed_sockets($bucket = self::DEFAULT_BUCKET, $timeout */ protected function fetch_results($blocking = true, $timeout = 0, $bucket = self::DEFAULT_BUCKET) { - $start = microtime(true); - $results = array(); - - // loop while there is pending children and pending sockets; this - // will break early on timeouts and when not blocking. - do + declare(ticks = 0) { - $ready_sockets = $this->get_changed_sockets($bucket, $timeout); - if (is_array($ready_sockets)) + $start = microtime(true); + $results = array(); + + // loop while there is pending children and pending sockets; this + // will break early on timeouts and when not blocking. + do { - foreach ($ready_sockets as $pid => $socket) + $ready_sockets = $this->get_changed_sockets($bucket, $timeout); + if (is_array($ready_sockets)) { - // Ensure PID is still on forked_children -- may have been removed if a SIGCHILD occurred. The hope - // is that this fixes BNBS-23987. - if (!isset($this->forked_children[$pid])) + foreach ($ready_sockets as $pid => $socket) { - unset($ready_sockets[$pid]); - continue; + $result = $this->socket_receive($socket); + if ($result !== false && (! is_null($result))) + { + $this->forked_children[$pid]['last_active'] = $start; + $results[$pid] = $result; + } } + } - $result = $this->socket_receive($socket); - if ($result !== false && (! is_null($result))) + // clean up forked children that have stopped and did not have recently + // active sockets. + foreach ($this->forked_children as $pid => &$child) + { + if (isset($child['last_active']) && ($child['last_active'] < $start) && ($child['status'] == self::STOPPED)) { - $this->forked_children[$pid]['last_active'] = $start; - $results[$pid] = $result; + // close the socket from the parent + unset($this->forked_children[$pid]); } } - } + unset($child); - // clean up forked children that have stopped and did not have recently - // active sockets. - foreach ($this->forked_children as $pid => &$child) - { - if (isset($child['last_active']) && ($child['last_active'] < $start) && ($child['status'] == self::STOPPED)) + // check if timed out + if ($timeout && (microtime(true) - $start > $timeout)) + return $results; + + // return null if not blocking and we haven't seen results + if (! $blocking) { - // close the socket from the parent - unset($this->forked_children[$pid]); + return $results; } } - unset($child); - - // check if timed out - if ($timeout && (microtime(true) - $start > $timeout)) - return $results; + while (count($this->forked_children) > 0); - // return null if not blocking and we haven't seen results - if (! $blocking) - { - return $results; - } + return $results; } - while (count($this->forked_children) > 0); - - return $results; } /** From f9e84e51c82babfd90dca48ef9143398de3fb711 Mon Sep 17 00:00:00 2001 From: Lindsay Snider Date: Mon, 30 Mar 2020 13:55:36 -0400 Subject: [PATCH 12/17] deprecated braces moved to brackets --- fork_daemon.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fork_daemon.php b/fork_daemon.php index eb4059d..492bef7 100644 --- a/fork_daemon.php +++ b/fork_daemon.php @@ -1834,7 +1834,7 @@ protected function kill_maxtime_violators() $this->log('Force kill ' . $pid . ' has run too long', self::LOG_LEVEL_INFO); // notify app that child process timed out - $this->invoke_callback($this->child_function_timeout{$pid_info['bucket']}, array($pid, $pid_info['identifier']), true); + $this->invoke_callback($this->child_function_timeout[$pid_info['bucket']], array($pid, $pid_info['identifier']), true); $this->safe_kill($pid, SIGKILL); // its probably stuck on something, kill it immediately. sleep(3); // give the child time to die From 57447a50d5a1dffcaea020ef17457530f8259e20 Mon Sep 17 00:00:00 2001 From: Lindsay Snider Date: Tue, 7 Jul 2020 16:58:48 -0400 Subject: [PATCH 13/17] convert from declare ticks to pcntl_async_signals (#3) * convert from declare ticks to pcntl_async_signals --- composer.json | 2 +- fork_daemon.php | 287 +++++++++++++++++++++++++----------------------- 2 files changed, 149 insertions(+), 140 deletions(-) diff --git a/composer.json b/composer.json index 2413537..6d3fc4c 100644 --- a/composer.json +++ b/composer.json @@ -6,7 +6,7 @@ "homepage": "https://github.com/barracudanetworks/forkdaemon-php", "license": "MIT", "require": { - "php": ">=5.3.0" + "php": ">=7.1" }, "autoload": { "classmap": [ "fork_daemon.php" ] diff --git a/fork_daemon.php b/fork_daemon.php index 492bef7..1eddb9d 100644 --- a/fork_daemon.php +++ b/fork_daemon.php @@ -725,7 +725,7 @@ public function __construct() self::$parent_pid = getmypid(); // install signal handlers - declare(ticks = 1); + pcntl_async_signals(true); pcntl_signal(SIGHUP, array(&$this, 'signal_handler_sighup')); pcntl_signal(SIGCHLD, array(&$this, 'signal_handler_sigchild')); pcntl_signal(SIGTERM, array(&$this, 'signal_handler_sigint')); @@ -740,9 +740,6 @@ public function __construct() pcntl_signal(SIGQUIT, SIG_IGN); pcntl_signal(SIGTRAP, SIG_IGN); pcntl_signal(SIGSYS, SIG_IGN); - - // add barracuda specific prefork functions (doesn't hurt anything) - $this->parent_function_prefork = array('db_clear_connection_cache', 'memcache_clear_connection_cache'); } /** @@ -803,60 +800,61 @@ public function signal_handler_sighup($signal_number) */ public function signal_handler_sigchild($signal_number) { - // do not allow signals to interrupt this - declare(ticks = 0) + // reap all child zombie processes + if (self::$parent_pid == getmypid()) { - // reap all child zombie processes - if (self::$parent_pid == getmypid()) - { - $status = ''; + // do not allow signals to interrupt this + pcntl_async_signals(false); + + $status = ''; - do + do + { + // get child pid that exited + $child_pid = pcntl_waitpid(0, $status, WNOHANG); + if ($child_pid > 0) { - // get child pid that exited - $child_pid = pcntl_waitpid(0, $status, WNOHANG); - if ($child_pid > 0) + // child exited + if (!isset($this->forked_children[$child_pid])) { - // child exited - $identifier = false; - if (!isset($this->forked_children[$child_pid])) - { - $this->log("Cannot find $child_pid in array! (This may be a subprocess not in our fork)", self::LOG_LEVEL_INFO); - continue; - } - - $child = $this->forked_children[$child_pid]; - $identifier = $child['identifier']; + $this->log("Cannot find $child_pid in array! (This may be a subprocess not in our fork)", self::LOG_LEVEL_INFO); + continue; + } - // call exit function if and only if its declared */ - if ($child['status'] == self::WORKER) - $this->invoke_callback($this->parent_function_child_exited[ $this->forked_children[$child_pid]['bucket'] ], array($child_pid, $this->forked_children[$child_pid]['identifier']), true); + $child = $this->forked_children[$child_pid]; + $identifier = $child['identifier']; - // stop the child pid - $this->forked_children[$child_pid]['status'] = self::STOPPED; - $this->forked_children_count--; + // call exit function if and only if its declared */ + if ($child['status'] == self::WORKER) + $this->invoke_callback($this->parent_function_child_exited[ $this->forked_children[$child_pid]['bucket'] ], array($child_pid, $this->forked_children[$child_pid]['identifier']), true); - // respawn helper processes - if ($child['status'] == self::HELPER && $child['respawn'] === true) - { - $this->log('Helper process ' . $child_pid . ' died, respawning', self::LOG_LEVEL_INFO); - $this->helper_process_spawn($child['function'], $child['arguments'], $child['identifier'], true); - } + // stop the child pid + $this->forked_children[$child_pid]['status'] = self::STOPPED; + $this->forked_children_count--; - // Poll for results from any children - $this->post_results($child['bucket']); - } - elseif ($child_pid < 0) + // respawn helper processes + if ($child['status'] == self::HELPER && $child['respawn'] === true) { - // ignore acceptable error 'No child processes' given we force this signal to run potentially when no children exist - if (pcntl_get_last_error() == 10) continue; - - // pcntl_wait got an error - $this->log('pcntl_waitpid failed with error ' . pcntl_get_last_error() . ':' . pcntl_strerror((pcntl_get_last_error())), self::LOG_LEVEL_DEBUG); + $this->log('Helper process ' . $child_pid . ' died, respawning', self::LOG_LEVEL_INFO); + $this->helper_process_spawn($child['function'], $child['arguments'], $child['identifier'], true); } + + // Poll for results from any children + $this->post_results($child['bucket']); + } + elseif ($child_pid < 0) + { + // ignore acceptable error 'No child processes' given we force this signal to run potentially when no children exist + if (pcntl_get_last_error() == 10) continue; + + // pcntl_wait got an error + $this->log('pcntl_waitpid failed with error ' . pcntl_get_last_error() . ':' . pcntl_strerror((pcntl_get_last_error())), self::LOG_LEVEL_DEBUG); } - while ($child_pid > 0); } + while ($child_pid > 0); + + // turn signals back on + pcntl_async_signals(true); } } @@ -1242,7 +1240,7 @@ public function helper_process_spawn($function_name, $arguments = array(), $iden list($socket_child, $socket_parent) = $this->ipc_init(); // do not process signals while we are forking - declare(ticks = 0); + pcntl_async_signals(false); $pid = pcntl_fork(); if ($pid == -1) @@ -1258,7 +1256,8 @@ public function helper_process_spawn($function_name, $arguments = array(), $iden // set child properties $this->child_bucket = self::DEFAULT_BUCKET; - declare(ticks = 1); + // turn signals back on + pcntl_async_signals(true); // close our socket (we only need the one to the parent) socket_close($socket_child); @@ -1277,7 +1276,9 @@ public function helper_process_spawn($function_name, $arguments = array(), $iden * Parent process */ - declare(ticks = 1); + // turn signals back on + pcntl_async_signals(true); + $this->log('Spawned new helper process with pid ' . $pid, self::LOG_LEVEL_INFO); // close our socket (we only need the one to the child) @@ -1562,55 +1563,60 @@ protected function get_changed_sockets($bucket = self::DEFAULT_BUCKET, $timeout */ protected function fetch_results($blocking = true, $timeout = 0, $bucket = self::DEFAULT_BUCKET) { - declare(ticks = 0) - { - $start = microtime(true); - $results = array(); + // turn signals off while processing results + pcntl_async_signals(false); - // loop while there is pending children and pending sockets; this - // will break early on timeouts and when not blocking. - do + $start = microtime(true); + $results = array(); + + // loop while there is pending children and pending sockets; this + // will break early on timeouts and when not blocking. + do + { + $ready_sockets = $this->get_changed_sockets($bucket, $timeout); + if (is_array($ready_sockets)) { - $ready_sockets = $this->get_changed_sockets($bucket, $timeout); - if (is_array($ready_sockets)) + foreach ($ready_sockets as $pid => $socket) { - foreach ($ready_sockets as $pid => $socket) + $result = $this->socket_receive($socket); + if ($result !== false && (! is_null($result))) { - $result = $this->socket_receive($socket); - if ($result !== false && (! is_null($result))) - { - $this->forked_children[$pid]['last_active'] = $start; - $results[$pid] = $result; - } + $this->forked_children[$pid]['last_active'] = $start; + $results[$pid] = $result; } } + } - // clean up forked children that have stopped and did not have recently - // active sockets. - foreach ($this->forked_children as $pid => &$child) + // clean up forked children that have stopped and did not have recently + // active sockets. + foreach ($this->forked_children as $pid => &$child) + { + if (isset($child['last_active']) && ($child['last_active'] < $start) && ($child['status'] == self::STOPPED)) { - if (isset($child['last_active']) && ($child['last_active'] < $start) && ($child['status'] == self::STOPPED)) - { - // close the socket from the parent - unset($this->forked_children[$pid]); - } + // close the socket from the parent + unset($this->forked_children[$pid]); } - unset($child); - - // check if timed out - if ($timeout && (microtime(true) - $start > $timeout)) - return $results; + } + unset($child); - // return null if not blocking and we haven't seen results - if (! $blocking) - { - return $results; - } + // check if timed out + if ($timeout && (microtime(true) - $start > $timeout)) + { + pcntl_async_signals(true); + return $results; } - while (count($this->forked_children) > 0); - return $results; + // return null if not blocking and we haven't seen results + if (! $blocking) + { + pcntl_async_signals(true); + return $results; + } } + while (count($this->forked_children) > 0); + + pcntl_async_signals(true); + return $results; } /** @@ -1711,7 +1717,7 @@ protected function fork_work_unit($work_unit, $identifier = '', $bucket = self:: list($socket_child, $socket_parent) = $this->ipc_init(); // turn off signals temporarily to prevent a SIGCHLD from interupting the parent before $this->forked_children is updated - declare(ticks = 0); + pcntl_async_signals(false); // spoon! $pid = pcntl_fork(); @@ -1743,7 +1749,7 @@ protected function fork_work_unit($work_unit, $identifier = '', $bucket = self:: $this->forked_children_count++; // turn back on signals now that $this->forked_children has been updated - declare(ticks = 1); + pcntl_async_signals(true); // close our socket (we only need the one to the child) socket_close($socket_parent); @@ -1772,7 +1778,7 @@ protected function fork_work_unit($work_unit, $identifier = '', $bucket = self:: $this->child_bucket = $bucket; // turn signals on for the child - declare(ticks = 1); + pcntl_async_signals(true); // close our socket (we only need the one to the parent) socket_close($socket_child); @@ -1921,33 +1927,35 @@ protected function ipc_init() protected function socket_send($socket, $message) { // do not process signals while we are sending an IPC message - declare(ticks = 0) + pcntl_async_signals(false); + + $serialized_message = @serialize($message); + if ($serialized_message == false) { - $serialized_message = @serialize($message); - if ($serialized_message == false) - { - $this->log('socket_send failed to serialize message', self::LOG_LEVEL_CRIT); - return false; - } + $this->log('socket_send failed to serialize message', self::LOG_LEVEL_CRIT); + pcntl_async_signals(true); + return false; + } - $header = pack('N', strlen($serialized_message)); - $data = $header . $serialized_message; - $bytes_left = strlen($data); - while ($bytes_left > 0) + $header = pack('N', strlen($serialized_message)); + $data = $header . $serialized_message; + $bytes_left = strlen($data); + while ($bytes_left > 0) + { + $bytes_sent = @socket_write($socket, $data); + if ($bytes_sent === false) { - $bytes_sent = @socket_write($socket, $data); - if ($bytes_sent === false) - { - $this->log('socket_send error: ' . socket_strerror(socket_last_error()), self::LOG_LEVEL_CRIT); - return false; - } - - $bytes_left -= $bytes_sent; - $data = substr($data, $bytes_sent); + $this->log('socket_send error: ' . socket_strerror(socket_last_error()), self::LOG_LEVEL_CRIT); + pcntl_async_signals(true); + return false; } - return true; + $bytes_left -= $bytes_sent; + $data = substr($data, $bytes_sent); } + + pcntl_async_signals(true); + return true; } /** @@ -1959,43 +1967,44 @@ protected function socket_send($socket, $message) protected function socket_receive($socket) { // do not process signals while we are receiving an IPC message - declare(ticks = 0) - { - // initially read to the length of the header size, then - // expand to read more - $bytes_total = self::SOCKET_HEADER_SIZE; - $bytes_read = 0; - $have_header = false; - $socket_message = ''; - while ($bytes_read < $bytes_total) + pcntl_async_signals(false); + + // initially read to the length of the header size, then + // expand to read more + $bytes_total = self::SOCKET_HEADER_SIZE; + $bytes_read = 0; + $have_header = false; + $socket_message = ''; + while ($bytes_read < $bytes_total) + { + $read = @socket_read($socket, $bytes_total - $bytes_read); + if ($read === false) { - $read = @socket_read($socket, $bytes_total - $bytes_read); - if ($read === false) - { - $this->log('socket_receive error: ' . socket_strerror(socket_last_error()), self::LOG_LEVEL_CRIT); - return false; - } + $this->log('socket_receive error: ' . socket_strerror(socket_last_error()), self::LOG_LEVEL_CRIT); + pcntl_async_signals(true); + return false; + } - // blank socket_read means done - if ($read == '') - break; + // blank socket_read means done + if ($read == '') + break; - $bytes_read += strlen($read); - $socket_message .= $read; + $bytes_read += strlen($read); + $socket_message .= $read; - if (!$have_header && $bytes_read >= self::SOCKET_HEADER_SIZE) - { - $have_header = true; - list($bytes_total) = array_values(unpack('N', $socket_message)); - $bytes_read = 0; - $socket_message = ''; - } + if (!$have_header && $bytes_read >= self::SOCKET_HEADER_SIZE) + { + $have_header = true; + list($bytes_total) = array_values(unpack('N', $socket_message)); + $bytes_read = 0; + $socket_message = ''; } + } - $message = @unserialize($socket_message); + $message = @unserialize($socket_message); - return $message; - } + pcntl_async_signals(true); + return $message; } /** From 6e2fe42fdab6dd2f56541e56ab59135084ca3de3 Mon Sep 17 00:00:00 2001 From: Lindsay Snider Date: Mon, 20 Jul 2020 12:50:33 -0400 Subject: [PATCH 14/17] save IPC sockets for helper children --- fork_daemon.php | 3 +++ 1 file changed, 3 insertions(+) diff --git a/fork_daemon.php b/fork_daemon.php index 1eddb9d..6748e15 100644 --- a/fork_daemon.php +++ b/fork_daemon.php @@ -1256,6 +1256,9 @@ public function helper_process_spawn($function_name, $arguments = array(), $iden // set child properties $this->child_bucket = self::DEFAULT_BUCKET; + // save the socket from child to parent to support $this->child_send_result_to_parent() + $this->child_socket_to_parent = $socket_parent; + // turn signals back on pcntl_async_signals(true); From 57d76acb339eac10c3b8e07c576b5c50cd4bdfde Mon Sep 17 00:00:00 2001 From: Lindsay Snider Date: Mon, 20 Jul 2020 12:51:21 -0400 Subject: [PATCH 15/17] fixed bug where helper children were not calling srand --- fork_daemon.php | 3 +++ 1 file changed, 3 insertions(+) diff --git a/fork_daemon.php b/fork_daemon.php index 6748e15..4b2e229 100644 --- a/fork_daemon.php +++ b/fork_daemon.php @@ -1265,6 +1265,9 @@ public function helper_process_spawn($function_name, $arguments = array(), $iden // close our socket (we only need the one to the parent) socket_close($socket_child); + // re-seed the random generator to prevent clone from parent + srand(); + // execute the function $result = call_user_func_array($function_name, $arguments); From 76d996441fce1ed81d193d33e8808ad4d00eb4ac Mon Sep 17 00:00:00 2001 From: Lindsay Snider Date: Mon, 20 Jul 2020 12:52:02 -0400 Subject: [PATCH 16/17] fixed bug where helper children were not honoring respawn parameter --- fork_daemon.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fork_daemon.php b/fork_daemon.php index 4b2e229..8a00cc9 100644 --- a/fork_daemon.php +++ b/fork_daemon.php @@ -1296,7 +1296,7 @@ public function helper_process_spawn($function_name, $arguments = array(), $iden 'identifier' => $identifier, 'status' => self::HELPER, 'bucket' => self::DEFAULT_BUCKET, - 'respawn' => true, + 'respawn' => $respawn, 'function' => $function_name, 'arguments' => $arguments, 'socket' => $socket_child, From e15676783b9fd852de391c1237ecb000e5a7c7af Mon Sep 17 00:00:00 2001 From: Lindsay Snider Date: Mon, 20 Jul 2020 12:52:51 -0400 Subject: [PATCH 17/17] post results now includes children identifier in callback --- fork_daemon.php | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/fork_daemon.php b/fork_daemon.php index 8a00cc9..5e9c264 100644 --- a/fork_daemon.php +++ b/fork_daemon.php @@ -1631,6 +1631,8 @@ protected function fetch_results($blocking = true, $timeout = 0, $bucket = self: * post any results that are available, so call while children are running * to check and post more results. * + * The callback will run for every child that return results + * * NOTE: This should be polled to update results. * * @param type $bucket The bucket to post the results in @@ -1647,8 +1649,11 @@ protected function post_results($bucket = self::DEFAULT_BUCKET) if (! empty($this->parent_function_results[$bucket])) { - if ($this->invoke_callback($this->parent_function_results[$bucket], array($results), true) === false) - return false; + foreach ($results as $pid => $result) + { + if ($this->invoke_callback($this->parent_function_results[$bucket], array($result, $this->forked_children[$pid]['identifier']), true) === false) + return false; + } } elseif ($this->store_result === true) {