callable = $callable; $this->queueSize = $queueSize; } /** * Add a new job * * @param mixed $argument parameter to pass to $callable * @return int queue size */ public function add($argument){ $this->jobs[] = $argument; return $this->tick(); } /** * Removes closed threads from queue * */ private function cleanup(){ foreach($this->threads as $i => $szal) if(!$szal->isAlive()) unset($this->threads[$i]); return count($this->threads); } /** * Starts new threads if needed * * @return int queue size */ public function tick(){ $this->cleanup(); if( (count($this->threads) < $this->queueSize) && count($this->jobs) ){ $this->threads[] = $szal = new Thread($this->callable); $szal->start( array_shift($this->jobs) ); } usleep(ThreadQueue::TICK_DELAY); return $this->queueSize(); } /** * returns queue size with waiting jobs * * @return int */ public function queueSize(){ return count($this->jobs); } /** * returns thread instances * * @return array of Thread */ public function threads(){ return $this->threads; } /** * Removes all remaining jobs (empty queue) * * @return int number of removed jobs */ public function flush(){ $size = $this->queueSize(); $this->jobs = array(); return $size; } }