Merge pull request #1 from ZsBT/master

adding queue manager for threads
This commit is contained in:
Tudor Barbu 2015-04-20 20:43:04 +02:00
commit fb313e476d
5 changed files with 254 additions and 18 deletions

View file

@ -1,4 +1,6 @@
php-thread
==========
Simple implementation of threading in PHP using pnctl
* Thread.php: Simple implementation of threading in PHP using pnctl
* ThreadQueue.php: Enqueue tasks to run them in parallel easily

View file

@ -34,11 +34,8 @@ class Thread
* @var array
*/
private $_errors = array(
Thread::FUNCTION_NOT_CALLABLE =>
'You must specify a valid function name that can be called '.
'from the current scope.',
Thread::COULD_NOT_FORK =>
'pcntl_fork() returned a status of -1. No new process was created'
Thread::FUNCTION_NOT_CALLABLE => 'You must specify a valid function name that can be called from the current scope.',
Thread::COULD_NOT_FORK => 'pcntl_fork() returned a status of -1. No new process was created'
);
/**
@ -55,6 +52,17 @@ class Thread
* @var integer
*/
private $_pid;
/**
* Exits with error
*
* @return void
*/
private function fatalError($errorCode){
throw new Exception( $this->getError($errorCode) );
}
/**
* Checks if threading is supported by the current PHP configuration
@ -84,6 +92,7 @@ class Thread
*/
public function __construct( $runnable = null )
{
if(!Thread::isAvailable() )throw new Exception("Threads not supported");
if ( $runnable !== null ) {
$this->setRunnable($runnable);
}
@ -101,10 +110,7 @@ class Thread
if ( self::isRunnableOk($runnable) ) {
$this->runnable = $runnable;
} else {
throw new Exception(
$this->getError(Thread::FUNCTION_NOT_CALLABLE),
Thread::FUNCTION_NOT_CALLABLE
);
$this->fatalError(Thread::FUNCTION_NOT_CALLABLE);
}
}
@ -120,8 +126,7 @@ class Thread
/**
* Checks if the callback is ok (the function/method
* actually exists and is runnable from the current
* context)
* is runnable from the current context)
*
* can be called statically
*
@ -131,7 +136,7 @@ class Thread
*/
public static function isRunnableOk( $runnable )
{
return ( function_exists($runnable) && is_callable($runnable) );
return ( is_callable($runnable) );
}
/**
@ -166,10 +171,7 @@ class Thread
{
$pid = @pcntl_fork();
if ( $pid == -1 ) {
throw new Exception(
$this->getError(Thread::COULD_NOT_FORK),
Thread::COULD_NOT_FORK
);
$this->fatalError(Thread::COULD_NOT_FORK);
}
if ( $pid ) {
// parent
@ -252,4 +254,4 @@ class Thread
}
}
}
- See more at: http://blog.motane.lu/2009/01/02/multithreading-in-php/#sthash.JevFlLcL.dpuf

115
ThreadQueue.php Normal file
View file

@ -0,0 +1,115 @@
<?php /*
Adds parallel tasks to a queue.
tick() must be called regularly, that arranges addition of new jobs and removal of closed threads.
*/
require_once("Thread.php");
class ThreadQueue {
const DEFAULT_QUEUE_SIZE = 2; // default number of parallel tasks
const TICK_DELAY = 10000; // delay after tick, in millisecs
private $callable; // the function name to call. can be a method of a static class as well
private $threads = array(); // Thread instances
private $jobs = array(); // parameters to pass to $callable
public $queueSize; // number of parallel tasks. public, to make it variable run-time.
/**
* Constructor
*
* @param string $callable function name
* @param integer $queueSize number of parallel tasks
*/
public function __construct($callable, $queueSize = ThreadQueue::DEFAULT_QUEUE_SIZE ){
if(!is_callable($callable))throw new Exception("$callable is not callable.");
$this->callable = $callable;
$this->queueSize = $queueSize;
}
/**
* Add a new job
*
* @param mixed $argument parameter to pass to $callable
* @return int queue size
*/
public function add($argument){
$this->jobs[] = $argument;
return $this->tick();
}
/**
* Removes closed threads from queue
*
*/
private function cleanup(){
foreach($this->threads as $i => $szal)
if(!$szal->isAlive())
unset($this->threads[$i]);
return count($this->threads);
}
/**
* Starts new threads if needed
*
* @return int queue size
*/
public function tick(){
$this->cleanup();
if( (count($this->threads) < $this->queueSize) && count($this->jobs) ){
$this->threads[] = $szal = new Thread($this->callable);
$szal->start( array_shift($this->jobs) );
}
usleep(ThreadQueue::TICK_DELAY);
return $this->queueSize();
}
/**
* returns queue size with waiting jobs
*
* @return int
*/
public function queueSize(){
return count($this->jobs);
}
/**
* returns thread instances
*
* @return array of Thread
*/
public function threads(){
return $this->threads;
}
/**
* Removes all remaining jobs (empty queue)
*
* @return int number of removed jobs
*/
public function flush(){
$size = $this->queueSize();
$this->jobs = array();
return $size;
}
}

49
example1.php Normal file
View file

@ -0,0 +1,49 @@
<?php /*
Basic usage
for beginners: it is for command-line only execution. Webservers do not support threads.
*/
require_once("ThreadQueue.php");
// it is the function that will be called several times
function parallel_task($arg){
echo "task with parameter '$arg' starts\n";
sleep( rand(2,5) ); // wait for random seconds
echo "task with parameter '$arg' ENDS\n";
}
// create a queue instance with a callable function name
$TQ = new ThreadQueue("parallel_task");
// add tasks
$TQ->add("one");
$TQ->add("two");
$TQ->add("three");
$TQ->add("four");
$TQ->add("five");
// wait until all threads exit
while( count( $TQ->threads() ) ){ // there are existing processes in the background?
sleep(1); // optional
echo "waiting for all jobs done...\n";
$TQ->tick(); // mandatory!
}
echo "all process finished.\n";

68
example2.php Normal file
View file

@ -0,0 +1,68 @@
<?php /*
Advanced usage
*/
require_once("ThreadQueue.php");
// function is a static method of a class
abstract class class1 {
static function parallel_task($arg){
echo "task with parameter '$arg' starts\n";
sleep( rand(2,5) ); // wait for random seconds
echo "task with parameter '$arg' ENDS\n";
}
}
// we want 3 jobs in parallel instead of the default 2
$TQ = new ThreadQueue("class1::parallel_task", 3);
// add tasks
$TQ->add("one");
$TQ->add("two");
$TQ->add("three");
$TQ->add("four");
$TQ->add("five");
$TQ->add("six");
// Oops! We changed our mind, let's remove awaiting jobs.
// Existing threads will run, but jobs not started will be removed.
$TQ->flush();
// let's add jobs again.
$TQ->add("seven");
$TQ->add("eight");
$TQ->add("nine");
$TQ->add("ten");
$TQ->add("eleven");
$TQ->add("twelve");
// wait until all threads exit
while( $numberOfThreads = count($TQ->threads()) ){
usleep(500000); // optional
echo "waiting for all ($numberOfThreads) jobs done...\n";
$TQ->tick(); // mandatory!
// ha-ha! we can change the number of parallel executions realtime.
$TQ->queueSize = 4;
}
echo "all process finished.\n";