| <?php | |
| require dirname(__FILE__).'/Process_Manager.php'; | |
| // Create a batch of test messages to send | |
| $email = array( | |
| 'to' => 'test@test.com', | |
| 'subject' => 'This is a test', | |
| 'body' => 'Hello, world of multi-processing!' | |
| ); | |
| $queue = array_fill(0, 50, $email); | |
| // Create a function simulate sending an email message | |
| $sender = function($message_id, $message) | |
| { | |
| // Pretend to send it, we'll assume a normal latency of 500-1000ms | |
| $ms = rand(500, 1000); | |
| usleep($ms * 1000); | |
| printf("Process %d: sent message %d (%d ms)\n", posix_getpid(), $message_id, $ms); | |
| }; | |
| // Start the timer | |
| $start_time = microtime(TRUE); | |
| // Send the emails | |
| foreach ($queue as $message_id => $message) | |
| { | |
| $sender($message_id, $message); | |
| } | |
| // Stop the timer | |
| $runtime = microtime(TRUE) - $start_time; | |
| printf("\nDone! Sent %d messages in %d seconds\n\n", count($queue), $runtime); | |
| exit; |
| <?php | |
| require dirname(__FILE__).'/Process_Manager.php'; | |
| $pm = new Process_Manager(); | |
| declare(ticks = 1); | |
| // Setup our signal handlers | |
| $callback = array($pm, 'signal_handler'); | |
| pcntl_signal(SIGTERM, $callback); | |
| pcntl_signal(SIGINT, $callback); | |
| pcntl_signal(SIGCHLD, $callback); | |
| // Create a batch of test messages to send | |
| $email = array( | |
| 'to' => 'test@test.com', | |
| 'subject' => 'This is a test', | |
| 'body' => 'Hello, world of multi-processing!' | |
| ); | |
| $queue = array_fill(0, 50, $email); | |
| // Create a function simulate sending an email message | |
| $sender = function($message_id, $message) | |
| { | |
| // Pretend to send it, we'll assume a normal latency of 500-1000ms | |
| $ms = rand(500, 1000); | |
| usleep($ms * 1000); | |
| printf("Process %d: sent message %d (%d ms)\n", posix_getpid(), $message_id, $ms); | |
| }; | |
| // Start the timer | |
| $start_time = microtime(TRUE); | |
| // Fork processes to send the emails | |
| foreach ($queue as $message_id => $message) | |
| { | |
| $args = array( | |
| 'message_id' => $message_id, | |
| 'message' => $message, | |
| ); | |
| // Execution will not proceed past this line | |
| // except for in the parent process. | |
| $pm->fork_child($sender, $args); | |
| // Limit concurrency to 5 processes | |
| if (count($pm) >= 5) | |
| { | |
| while (count($pm) >= 5) | |
| { | |
| usleep(500000); // sleep 500 ms | |
| } | |
| } | |
| } | |
| // Wait for all processes to end | |
| echo "The queue is empty, waiting for all processes to finish\n"; | |
| while (count($pm) > 0) | |
| { | |
| usleep(500000); // sleep 500 ms | |
| } | |
| // Stop the timer | |
| $runtime = microtime(TRUE) - $start_time; | |
| printf("\nDone! Sent %d messages in %d seconds\n\n", count($queue), $runtime); | |
| exit; |
| <?php | |
| class Process_Manager implements Countable | |
| { | |
| protected $processes = array(); | |
| protected $is_child = FALSE; | |
| public function count() | |
| { | |
| return count($this->processes); | |
| } | |
| public function signal_handler($signal) | |
| { | |
| // Don't do anything if we're not in the parent process | |
| if ($this->is_child) | |
| { | |
| return; | |
| } | |
| switch ($signal) | |
| { | |
| case SIGINT: | |
| case SIGTERM: | |
| echo "\nUser terminated the application\n"; | |
| // Kill all child processes before terminating the parent | |
| $this->kill_all(); | |
| exit(0); | |
| case SIGCHLD: | |
| // Reap a child process | |
| //echo "SIGCHLD received\n"; | |
| $this->reap_child(); | |
| } | |
| } | |
| public function kill_all() | |
| { | |
| foreach ($this->processes as $pid => $is_running) | |
| { | |
| posix_kill($pid, SIGKILL); | |
| } | |
| } | |
| public function fork_child($callback, $data) | |
| { | |
| $pid = pcntl_fork(); | |
| switch($pid) | |
| { | |
| case 0: | |
| // Child process | |
| $this->is_child = TRUE; | |
| call_user_func_array($callback, $data); | |
| posix_kill(posix_getppid(), SIGCHLD); | |
| exit; | |
| case -1: | |
| // Parent process, fork failed | |
| throw new Exception("Out of memory!"); | |
| default: | |
| // Parent process, fork succeeded | |
| $this->processes[$pid] = TRUE; | |
| return $pid; | |
| } | |
| } | |
| public function reap_child() | |
| { | |
| // Check if any child process has terminated, | |
| // and if so remove it from memory | |
| $pid = pcntl_wait($status, WNOHANG); | |
| if ($pid < 0) | |
| { | |
| throw new Exception("Out of memory"); | |
| } | |
| elseif ($pid > 0) | |
| { | |
| unset($this->processes[$pid]); | |
| } | |
| } | |
| } |