From 2becb1d28ac51dc92993c211dbdf17a681a8143e Mon Sep 17 00:00:00 2001 From: ZsBT Date: Fri, 10 Apr 2015 09:24:12 +0200 Subject: [PATCH] Enqueue tasks to run them in parallel --- ThreadQueue.php | 115 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 ThreadQueue.php diff --git a/ThreadQueue.php b/ThreadQueue.php new file mode 100644 index 0000000..648067c --- /dev/null +++ b/ThreadQueue.php @@ -0,0 +1,115 @@ +callable = $callable; + $this->queueSize = $queueSize; + } + + + /** + * Add a new job + * + * @param mixed $argument parameter to pass to $callable + * @return int queue size + */ + + public function add($argument){ + $this->jobs[] = $argument; + return $this->tick(); + } + + + /** + * Removes closed threads from queue + * + */ + + private function cleanup(){ + foreach($this->threads as $i => $szal) + if(!$szal->isAlive()) + unset($this->threads[$i]); + return count($this->threads); + } + + + /** + * Starts new threads if needed + * + * @return int queue size + */ + + public function tick(){ + $this->cleanup(); + + if( (count($this->threads) < $this->queueSize) && count($this->jobs) ){ + $this->threads[] = $szal = new zsThread($this->callable); + $szal->start( array_shift($this->jobs) ); + } + + usleep(ThreadQueue::TICK_DELAY); + return $this->size(); + } + + + /** + * returns queue size + * + * @return int + */ + + public function size(){ + return count($this->jobs); + } + + + /** + * returns thread instances + * + * @return array of Thread + */ + public function threads(){ + return $this->threads; + } + + + /** + * Removes all remaining jobs (empty queue) + * + * @return int number of removed jobs + */ + public function flush(){ + $size = $this->size(); + $this->jobs = array(); + return $size; + } + +} +