php-thread/ThreadQueue.php

116 lines
2.4 KiB
PHP
Raw Permalink Normal View History

2015-04-10 09:24:12 +02:00
<?php /*
Adds parallel tasks to a queue.
tick() must be called regularly, that arranges addition of new jobs and removal of closed threads.
*/
require_once("Thread.php");
class ThreadQueue {
const DEFAULT_QUEUE_SIZE = 2; // default number of parallel tasks
const TICK_DELAY = 10000; // delay after tick, in millisecs
private $callable; // the function name to call. can be a method of a static class as well
private $threads = array(); // Thread instances
private $jobs = array(); // parameters to pass to $callable
public $queueSize; // number of parallel tasks. public, to make it variable run-time.
/**
* Constructor
*
* @param string $callable function name
* @param integer $queueSize number of parallel tasks
*/
public function __construct($callable, $queueSize = ThreadQueue::DEFAULT_QUEUE_SIZE ){
if(!is_callable($callable))throw new Exception("$callable is not callable.");
$this->callable = $callable;
$this->queueSize = $queueSize;
}
/**
* Add a new job
*
* @param mixed $argument parameter to pass to $callable
* @return int queue size
*/
public function add($argument){
$this->jobs[] = $argument;
return $this->tick();
}
/**
* Removes closed threads from queue
*
*/
private function cleanup(){
foreach($this->threads as $i => $szal)
if(!$szal->isAlive())
unset($this->threads[$i]);
return count($this->threads);
}
/**
* Starts new threads if needed
*
* @return int queue size
*/
public function tick(){
$this->cleanup();
if( (count($this->threads) < $this->queueSize) && count($this->jobs) ){
2015-04-10 09:53:11 +02:00
$this->threads[] = $szal = new Thread($this->callable);
2015-04-10 09:24:12 +02:00
$szal->start( array_shift($this->jobs) );
}
usleep(ThreadQueue::TICK_DELAY);
2015-04-10 09:53:11 +02:00
return $this->queueSize();
2015-04-10 09:24:12 +02:00
}
/**
2015-04-10 09:53:11 +02:00
* returns queue size with waiting jobs
2015-04-10 09:24:12 +02:00
*
* @return int
*/
2015-04-10 09:53:11 +02:00
public function queueSize(){
2015-04-10 09:24:12 +02:00
return count($this->jobs);
}
/**
* returns thread instances
*
* @return array of Thread
*/
public function threads(){
return $this->threads;
}
/**
* Removes all remaining jobs (empty queue)
*
* @return int number of removed jobs
*/
public function flush(){
2015-04-10 09:53:11 +02:00
$size = $this->queueSize();
2015-04-10 09:24:12 +02:00
$this->jobs = array();
return $size;
}
}