<?php | |
require dirname(__FILE__).'/Process_Manager.php'; | |
// Create a batch of test messages to send | |
$email = array( | |
'to' => 'test@test.com', | |
'subject' => 'This is a test', | |
'body' => 'Hello, world of multi-processing!' | |
); | |
$queue = array_fill(0, 50, $email); | |
// Create a function simulate sending an email message | |
$sender = function($message_id, $message) | |
{ | |
// Pretend to send it, we'll assume a normal latency of 500-1000ms | |
$ms = rand(500, 1000); | |
usleep($ms * 1000); | |
printf("Process %d: sent message %d (%d ms)\n", posix_getpid(), $message_id, $ms); | |
}; | |
// Start the timer | |
$start_time = microtime(TRUE); | |
// Send the emails | |
foreach ($queue as $message_id => $message) | |
{ | |
$sender($message_id, $message); | |
} | |
// Stop the timer | |
$runtime = microtime(TRUE) - $start_time; | |
printf("\nDone! Sent %d messages in %d seconds\n\n", count($queue), $runtime); | |
exit; |
<?php | |
require dirname(__FILE__).'/Process_Manager.php'; | |
$pm = new Process_Manager(); | |
declare(ticks = 1); | |
// Setup our signal handlers | |
$callback = array($pm, 'signal_handler'); | |
pcntl_signal(SIGTERM, $callback); | |
pcntl_signal(SIGINT, $callback); | |
pcntl_signal(SIGCHLD, $callback); | |
// Create a batch of test messages to send | |
$email = array( | |
'to' => 'test@test.com', | |
'subject' => 'This is a test', | |
'body' => 'Hello, world of multi-processing!' | |
); | |
$queue = array_fill(0, 50, $email); | |
// Create a function simulate sending an email message | |
$sender = function($message_id, $message) | |
{ | |
// Pretend to send it, we'll assume a normal latency of 500-1000ms | |
$ms = rand(500, 1000); | |
usleep($ms * 1000); | |
printf("Process %d: sent message %d (%d ms)\n", posix_getpid(), $message_id, $ms); | |
}; | |
// Start the timer | |
$start_time = microtime(TRUE); | |
// Fork processes to send the emails | |
foreach ($queue as $message_id => $message) | |
{ | |
$args = array( | |
'message_id' => $message_id, | |
'message' => $message, | |
); | |
// Execution will not proceed past this line | |
// except for in the parent process. | |
$pm->fork_child($sender, $args); | |
// Limit concurrency to 5 processes | |
if (count($pm) >= 5) | |
{ | |
while (count($pm) >= 5) | |
{ | |
usleep(500000); // sleep 500 ms | |
} | |
} | |
} | |
// Wait for all processes to end | |
echo "The queue is empty, waiting for all processes to finish\n"; | |
while (count($pm) > 0) | |
{ | |
usleep(500000); // sleep 500 ms | |
} | |
// Stop the timer | |
$runtime = microtime(TRUE) - $start_time; | |
printf("\nDone! Sent %d messages in %d seconds\n\n", count($queue), $runtime); | |
exit; |
<?php | |
class Process_Manager implements Countable | |
{ | |
protected $processes = array(); | |
protected $is_child = FALSE; | |
public function count() | |
{ | |
return count($this->processes); | |
} | |
public function signal_handler($signal) | |
{ | |
// Don't do anything if we're not in the parent process | |
if ($this->is_child) | |
{ | |
return; | |
} | |
switch ($signal) | |
{ | |
case SIGINT: | |
case SIGTERM: | |
echo "\nUser terminated the application\n"; | |
// Kill all child processes before terminating the parent | |
$this->kill_all(); | |
exit(0); | |
case SIGCHLD: | |
// Reap a child process | |
//echo "SIGCHLD received\n"; | |
$this->reap_child(); | |
} | |
} | |
public function kill_all() | |
{ | |
foreach ($this->processes as $pid => $is_running) | |
{ | |
posix_kill($pid, SIGKILL); | |
} | |
} | |
public function fork_child($callback, $data) | |
{ | |
$pid = pcntl_fork(); | |
switch($pid) | |
{ | |
case 0: | |
// Child process | |
$this->is_child = TRUE; | |
call_user_func_array($callback, $data); | |
posix_kill(posix_getppid(), SIGCHLD); | |
exit; | |
case -1: | |
// Parent process, fork failed | |
throw new Exception("Out of memory!"); | |
default: | |
// Parent process, fork succeeded | |
$this->processes[$pid] = TRUE; | |
return $pid; | |
} | |
} | |
public function reap_child() | |
{ | |
// Check if any child process has terminated, | |
// and if so remove it from memory | |
$pid = pcntl_wait($status, WNOHANG); | |
if ($pid < 0) | |
{ | |
throw new Exception("Out of memory"); | |
} | |
elseif ($pid > 0) | |
{ | |
unset($this->processes[$pid]); | |
} | |
} | |
} |