<?php
require dirname(__FILE__).'/Process_Manager.php';
// Create a batch of test messages to send
$email = array(
'to' => 'test@test.com',
'subject' => 'This is a test',
'body' => 'Hello, world of multi-processing!'
);
$queue = array_fill(0, 50, $email);
// Create a function simulate sending an email message
$sender = function($message_id, $message)
{
// Pretend to send it, we'll assume a normal latency of 500-1000ms
$ms = rand(500, 1000);
usleep($ms * 1000);
printf("Process %d: sent message %d (%d ms)\n", posix_getpid(), $message_id, $ms);
};
// Start the timer
$start_time = microtime(TRUE);
// Send the emails
foreach ($queue as $message_id => $message)
{
$sender($message_id, $message);
}
// Stop the timer
$runtime = microtime(TRUE) - $start_time;
printf("\nDone! Sent %d messages in %d seconds\n\n", count($queue), $runtime);
exit;
view raw demo1.php hosted with ❤ by GitHub
<?php
require dirname(__FILE__).'/Process_Manager.php';
$pm = new Process_Manager();
declare(ticks = 1);
// Setup our signal handlers
$callback = array($pm, 'signal_handler');
pcntl_signal(SIGTERM, $callback);
pcntl_signal(SIGINT, $callback);
pcntl_signal(SIGCHLD, $callback);
// Create a batch of test messages to send
$email = array(
'to' => 'test@test.com',
'subject' => 'This is a test',
'body' => 'Hello, world of multi-processing!'
);
$queue = array_fill(0, 50, $email);
// Create a function simulate sending an email message
$sender = function($message_id, $message)
{
// Pretend to send it, we'll assume a normal latency of 500-1000ms
$ms = rand(500, 1000);
usleep($ms * 1000);
printf("Process %d: sent message %d (%d ms)\n", posix_getpid(), $message_id, $ms);
};
// Start the timer
$start_time = microtime(TRUE);
// Fork processes to send the emails
foreach ($queue as $message_id => $message)
{
$args = array(
'message_id' => $message_id,
'message' => $message,
);
// Execution will not proceed past this line
// except for in the parent process.
$pm->fork_child($sender, $args);
// Limit concurrency to 5 processes
if (count($pm) >= 5)
{
while (count($pm) >= 5)
{
usleep(500000); // sleep 500 ms
}
}
}
// Wait for all processes to end
echo "The queue is empty, waiting for all processes to finish\n";
while (count($pm) > 0)
{
usleep(500000); // sleep 500 ms
}
// Stop the timer
$runtime = microtime(TRUE) - $start_time;
printf("\nDone! Sent %d messages in %d seconds\n\n", count($queue), $runtime);
exit;
view raw demo2.php hosted with ❤ by GitHub
<?php
class Process_Manager implements Countable
{
protected $processes = array();
protected $is_child = FALSE;
public function count()
{
return count($this->processes);
}
public function signal_handler($signal)
{
// Don't do anything if we're not in the parent process
if ($this->is_child)
{
return;
}
switch ($signal)
{
case SIGINT:
case SIGTERM:
echo "\nUser terminated the application\n";
// Kill all child processes before terminating the parent
$this->kill_all();
exit(0);
case SIGCHLD:
// Reap a child process
//echo "SIGCHLD received\n";
$this->reap_child();
}
}
public function kill_all()
{
foreach ($this->processes as $pid => $is_running)
{
posix_kill($pid, SIGKILL);
}
}
public function fork_child($callback, $data)
{
$pid = pcntl_fork();
switch($pid)
{
case 0:
// Child process
$this->is_child = TRUE;
call_user_func_array($callback, $data);
posix_kill(posix_getppid(), SIGCHLD);
exit;
case -1:
// Parent process, fork failed
throw new Exception("Out of memory!");
default:
// Parent process, fork succeeded
$this->processes[$pid] = TRUE;
return $pid;
}
}
public function reap_child()
{
// Check if any child process has terminated,
// and if so remove it from memory
$pid = pcntl_wait($status, WNOHANG);
if ($pid < 0)
{
throw new Exception("Out of memory");
}
elseif ($pid > 0)
{
unset($this->processes[$pid]);
}
}
}
view raw Process_Manager.php hosted with ❤ by GitHub