2015-04-10 09:24:12 +02:00
|
|
|
<?php /*
|
|
|
|
|
|
|
|
Adds parallel tasks to a queue.
|
|
|
|
|
|
|
|
tick() must be called regularly, that arranges addition of new jobs and removal of closed threads.
|
|
|
|
|
|
|
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
require_once("Thread.php");
|
|
|
|
|
|
|
|
class ThreadQueue {
|
|
|
|
|
|
|
|
const DEFAULT_QUEUE_SIZE = 2; // default number of parallel tasks
|
|
|
|
const TICK_DELAY = 10000; // delay after tick, in millisecs
|
|
|
|
|
|
|
|
private $callable; // the function name to call. can be a method of a static class as well
|
|
|
|
private $threads = array(); // Thread instances
|
|
|
|
private $jobs = array(); // parameters to pass to $callable
|
|
|
|
public $queueSize; // number of parallel tasks. public, to make it variable run-time.
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Constructor
|
|
|
|
*
|
|
|
|
* @param string $callable function name
|
|
|
|
* @param integer $queueSize number of parallel tasks
|
|
|
|
*/
|
|
|
|
|
|
|
|
public function __construct($callable, $queueSize = ThreadQueue::DEFAULT_QUEUE_SIZE ){
|
|
|
|
if(!is_callable($callable))throw new Exception("$callable is not callable.");
|
|
|
|
$this->callable = $callable;
|
|
|
|
$this->queueSize = $queueSize;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Add a new job
|
|
|
|
*
|
|
|
|
* @param mixed $argument parameter to pass to $callable
|
|
|
|
* @return int queue size
|
|
|
|
*/
|
|
|
|
|
|
|
|
public function add($argument){
|
|
|
|
$this->jobs[] = $argument;
|
|
|
|
return $this->tick();
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Removes closed threads from queue
|
|
|
|
*
|
|
|
|
*/
|
|
|
|
|
|
|
|
private function cleanup(){
|
|
|
|
foreach($this->threads as $i => $szal)
|
|
|
|
if(!$szal->isAlive())
|
|
|
|
unset($this->threads[$i]);
|
|
|
|
return count($this->threads);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Starts new threads if needed
|
|
|
|
*
|
|
|
|
* @return int queue size
|
|
|
|
*/
|
|
|
|
|
|
|
|
public function tick(){
|
|
|
|
$this->cleanup();
|
|
|
|
|
|
|
|
if( (count($this->threads) < $this->queueSize) && count($this->jobs) ){
|
2015-04-10 09:53:11 +02:00
|
|
|
$this->threads[] = $szal = new Thread($this->callable);
|
2015-04-10 09:24:12 +02:00
|
|
|
$szal->start( array_shift($this->jobs) );
|
|
|
|
}
|
|
|
|
|
|
|
|
usleep(ThreadQueue::TICK_DELAY);
|
2015-04-10 09:53:11 +02:00
|
|
|
return $this->queueSize();
|
2015-04-10 09:24:12 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
2015-04-10 09:53:11 +02:00
|
|
|
* returns queue size with waiting jobs
|
2015-04-10 09:24:12 +02:00
|
|
|
*
|
|
|
|
* @return int
|
|
|
|
*/
|
|
|
|
|
2015-04-10 09:53:11 +02:00
|
|
|
public function queueSize(){
|
2015-04-10 09:24:12 +02:00
|
|
|
return count($this->jobs);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
* returns thread instances
|
|
|
|
*
|
|
|
|
* @return array of Thread
|
|
|
|
*/
|
|
|
|
public function threads(){
|
|
|
|
return $this->threads;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Removes all remaining jobs (empty queue)
|
|
|
|
*
|
|
|
|
* @return int number of removed jobs
|
|
|
|
*/
|
|
|
|
public function flush(){
|
2015-04-10 09:53:11 +02:00
|
|
|
$size = $this->queueSize();
|
2015-04-10 09:24:12 +02:00
|
|
|
$this->jobs = array();
|
|
|
|
return $size;
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|