diff --git a/README.md b/README.md index 34ab4bf..61e7cd4 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,6 @@ php-thread ========== -Simple implementation of threading in PHP using pnctl +* Thread.php: Simple implementation of threading in PHP using pnctl +* ThreadQueue.php: Enqueue tasks to run them in parallel easily + diff --git a/Thread.php b/Thread.php index 890df98..7606616 100644 --- a/Thread.php +++ b/Thread.php @@ -34,11 +34,8 @@ class Thread * @var array */ private $_errors = array( - Thread::FUNCTION_NOT_CALLABLE => - 'You must specify a valid function name that can be called '. - 'from the current scope.', - Thread::COULD_NOT_FORK => - 'pcntl_fork() returned a status of -1. No new process was created' + Thread::FUNCTION_NOT_CALLABLE => 'You must specify a valid function name that can be called from the current scope.', + Thread::COULD_NOT_FORK => 'pcntl_fork() returned a status of -1. No new process was created' ); /** @@ -55,6 +52,17 @@ class Thread * @var integer */ private $_pid; + + + /** + * Exits with error + * + * @return void + */ + + private function fatalError($errorCode){ + throw new Exception( $this->getError($errorCode) ); + } /** * Checks if threading is supported by the current PHP configuration @@ -84,6 +92,7 @@ class Thread */ public function __construct( $runnable = null ) { + if(!Thread::isAvailable() )throw new Exception("Threads not supported"); if ( $runnable !== null ) { $this->setRunnable($runnable); } @@ -101,10 +110,7 @@ class Thread if ( self::isRunnableOk($runnable) ) { $this->runnable = $runnable; } else { - throw new Exception( - $this->getError(Thread::FUNCTION_NOT_CALLABLE), - Thread::FUNCTION_NOT_CALLABLE - ); + $this->fatalError(Thread::FUNCTION_NOT_CALLABLE); } } @@ -120,8 +126,7 @@ class Thread /** * Checks if the callback is ok (the function/method - * actually exists and is runnable from the current - * context) + * is runnable from the current context) * * can be called statically * @@ -131,7 +136,7 @@ class Thread */ public static function isRunnableOk( $runnable ) { - return ( function_exists($runnable) && is_callable($runnable) ); + return ( is_callable($runnable) ); } /** @@ -166,10 +171,7 @@ class Thread { $pid = @pcntl_fork(); if ( $pid == -1 ) { - throw new Exception( - $this->getError(Thread::COULD_NOT_FORK), - Thread::COULD_NOT_FORK - ); + $this->fatalError(Thread::COULD_NOT_FORK); } if ( $pid ) { // parent @@ -252,4 +254,4 @@ class Thread } } } -- See more at: http://blog.motane.lu/2009/01/02/multithreading-in-php/#sthash.JevFlLcL.dpuf + diff --git a/ThreadQueue.php b/ThreadQueue.php new file mode 100644 index 0000000..8a41675 --- /dev/null +++ b/ThreadQueue.php @@ -0,0 +1,115 @@ +callable = $callable; + $this->queueSize = $queueSize; + } + + + /** + * Add a new job + * + * @param mixed $argument parameter to pass to $callable + * @return int queue size + */ + + public function add($argument){ + $this->jobs[] = $argument; + return $this->tick(); + } + + + /** + * Removes closed threads from queue + * + */ + + private function cleanup(){ + foreach($this->threads as $i => $szal) + if(!$szal->isAlive()) + unset($this->threads[$i]); + return count($this->threads); + } + + + /** + * Starts new threads if needed + * + * @return int queue size + */ + + public function tick(){ + $this->cleanup(); + + if( (count($this->threads) < $this->queueSize) && count($this->jobs) ){ + $this->threads[] = $szal = new Thread($this->callable); + $szal->start( array_shift($this->jobs) ); + } + + usleep(ThreadQueue::TICK_DELAY); + return $this->queueSize(); + } + + + /** + * returns queue size with waiting jobs + * + * @return int + */ + + public function queueSize(){ + return count($this->jobs); + } + + + /** + * returns thread instances + * + * @return array of Thread + */ + public function threads(){ + return $this->threads; + } + + + /** + * Removes all remaining jobs (empty queue) + * + * @return int number of removed jobs + */ + public function flush(){ + $size = $this->queueSize(); + $this->jobs = array(); + return $size; + } + +} + diff --git a/example1.php b/example1.php new file mode 100644 index 0000000..5e9adaa --- /dev/null +++ b/example1.php @@ -0,0 +1,49 @@ +add("one"); +$TQ->add("two"); +$TQ->add("three"); +$TQ->add("four"); +$TQ->add("five"); + + + + +// wait until all threads exit + +while( count( $TQ->threads() ) ){ // there are existing processes in the background? + sleep(1); // optional + + echo "waiting for all jobs done...\n"; + $TQ->tick(); // mandatory! + +} + +echo "all process finished.\n"; + diff --git a/example2.php b/example2.php new file mode 100644 index 0000000..72e33ab --- /dev/null +++ b/example2.php @@ -0,0 +1,68 @@ +add("one"); +$TQ->add("two"); +$TQ->add("three"); +$TQ->add("four"); +$TQ->add("five"); +$TQ->add("six"); + +// Oops! We changed our mind, let's remove awaiting jobs. +// Existing threads will run, but jobs not started will be removed. +$TQ->flush(); + + +// let's add jobs again. +$TQ->add("seven"); +$TQ->add("eight"); +$TQ->add("nine"); +$TQ->add("ten"); +$TQ->add("eleven"); +$TQ->add("twelve"); + + + + +// wait until all threads exit + +while( $numberOfThreads = count($TQ->threads()) ){ + usleep(500000); // optional + + echo "waiting for all ($numberOfThreads) jobs done...\n"; + $TQ->tick(); // mandatory! + + // ha-ha! we can change the number of parallel executions realtime. + $TQ->queueSize = 4; + +} + +echo "all process finished.\n"; +