From a0083580561acff7082df37f2222d54e2492c8cd Mon Sep 17 00:00:00 2001 From: Dan Cryer Date: Fri, 9 Oct 2015 09:27:45 +0100 Subject: [PATCH] Cleaning up BuildWorker #1 --- PHPCI/Worker/BuildWorker.php | 71 ++++++++++++++++++++++++------------ 1 file changed, 47 insertions(+), 24 deletions(-) diff --git a/PHPCI/Worker/BuildWorker.php b/PHPCI/Worker/BuildWorker.php index a6fcc536..37d2bc20 100644 --- a/PHPCI/Worker/BuildWorker.php +++ b/PHPCI/Worker/BuildWorker.php @@ -6,6 +6,7 @@ use b8\Config; use b8\Database; use b8\Store\Factory; use Monolog\Logger; +use Pheanstalk\Job; use Pheanstalk\Pheanstalk; use PHPCI\Builder; use PHPCI\BuildFactory; @@ -49,6 +50,16 @@ class BuildWorker */ protected $queue; + /** + * @var \Pheanstalk\Pheanstalk + */ + protected $pheanstalk; + + /** + * @var int + */ + protected $totalJobs = 0; + /** * @param $host * @param $queue @@ -57,6 +68,7 @@ class BuildWorker { $this->host = $host; $this->queue = $queue; + $this->pheanstalk = new Pheanstalk($this->host); } /** @@ -80,36 +92,20 @@ class BuildWorker */ public function startWorker() { - $pheanstalk = new Pheanstalk($this->host); - $pheanstalk->watch($this->queue); - $pheanstalk->ignore('default'); + $this->pheanstalk->watch($this->queue); + $this->pheanstalk->ignore('default'); $buildStore = Factory::getStore('Build'); - $jobs = 0; - while ($this->run) { // Get a job from the queue: - $job = $pheanstalk->reserve(); + $job = $this->pheanstalk->reserve(); - // Make sure we don't run more than maxJobs jobs on this worker: - $jobs++; - - if ($this->maxJobs != -1 && $this->maxJobs <= $jobs) { - $this->run = false; - } + $this->checkJobLimit(); // Get the job data and run the job: $jobData = json_decode($job->getData(), true); - if (empty($jobData) || !is_array($jobData)) { - // Probably not from PHPCI. - $pheanstalk->release($job); - continue; - } - - if (!array_key_exists('type', $jobData) || $jobData['type'] !== 'phpci.build') { - // Probably not from PHPCI. - $pheanstalk->delete($job); + if (!$this->verifyJob($job, $jobData)) { continue; } @@ -128,7 +124,7 @@ class BuildWorker $build = BuildFactory::getBuildById($jobData['build_id']); } catch (\Exception $ex) { $this->logger->addWarning('Build #' . $jobData['build_id'] . ' does not exist in the database.'); - $pheanstalk->delete($job); + $this->pheanstalk->delete($job); } try { @@ -147,7 +143,7 @@ class BuildWorker // If we've caught a PDO Exception, it is probably not the fault of the build, but of a failed // connection or similar. Release the job and kill the worker. $this->run = false; - $pheanstalk->release($job); + $this->pheanstalk->release($job); } catch (\Exception $ex) { $build->setStatus(Build::STATUS_FAILED); $build->setFinished(new \DateTime()); @@ -163,7 +159,7 @@ class BuildWorker } // Delete the job when we're done: - $pheanstalk->delete($job); + $this->pheanstalk->delete($job); } } @@ -174,4 +170,31 @@ class BuildWorker { $this->run = false; } + + protected function checkJobLimit() + { + // Make sure we don't run more than maxJobs jobs on this worker: + $this->totalJobs++; + + if ($this->maxJobs != -1 && $this->maxJobs <= $this->totalJobs) { + $this->stopWorker(); + } + } + + protected function verifyJob(Job $job, $jobData) + { + if (empty($jobData) || !is_array($jobData)) { + // Probably not from PHPCI. + $this->pheanstalk->release($job); + return false; + } + + if (!array_key_exists('type', $jobData) || $jobData['type'] !== 'phpci.build') { + // Probably not from PHPCI. + $this->pheanstalk->delete($job); + return false; + } + + return true; + } }