Cleaning up BuildWorker #1

This commit is contained in:
Dan Cryer 2015-10-09 09:27:45 +01:00
parent 68249d2f5d
commit a008358056

View file

@ -6,6 +6,7 @@ use b8\Config;
use b8\Database; use b8\Database;
use b8\Store\Factory; use b8\Store\Factory;
use Monolog\Logger; use Monolog\Logger;
use Pheanstalk\Job;
use Pheanstalk\Pheanstalk; use Pheanstalk\Pheanstalk;
use PHPCI\Builder; use PHPCI\Builder;
use PHPCI\BuildFactory; use PHPCI\BuildFactory;
@ -49,6 +50,16 @@ class BuildWorker
*/ */
protected $queue; protected $queue;
/**
* @var \Pheanstalk\Pheanstalk
*/
protected $pheanstalk;
/**
* @var int
*/
protected $totalJobs = 0;
/** /**
* @param $host * @param $host
* @param $queue * @param $queue
@ -57,6 +68,7 @@ class BuildWorker
{ {
$this->host = $host; $this->host = $host;
$this->queue = $queue; $this->queue = $queue;
$this->pheanstalk = new Pheanstalk($this->host);
} }
/** /**
@ -80,36 +92,20 @@ class BuildWorker
*/ */
public function startWorker() public function startWorker()
{ {
$pheanstalk = new Pheanstalk($this->host); $this->pheanstalk->watch($this->queue);
$pheanstalk->watch($this->queue); $this->pheanstalk->ignore('default');
$pheanstalk->ignore('default');
$buildStore = Factory::getStore('Build'); $buildStore = Factory::getStore('Build');
$jobs = 0;
while ($this->run) { while ($this->run) {
// Get a job from the queue: // Get a job from the queue:
$job = $pheanstalk->reserve(); $job = $this->pheanstalk->reserve();
// Make sure we don't run more than maxJobs jobs on this worker: $this->checkJobLimit();
$jobs++;
if ($this->maxJobs != -1 && $this->maxJobs <= $jobs) {
$this->run = false;
}
// Get the job data and run the job: // Get the job data and run the job:
$jobData = json_decode($job->getData(), true); $jobData = json_decode($job->getData(), true);
if (empty($jobData) || !is_array($jobData)) { if (!$this->verifyJob($job, $jobData)) {
// Probably not from PHPCI.
$pheanstalk->release($job);
continue;
}
if (!array_key_exists('type', $jobData) || $jobData['type'] !== 'phpci.build') {
// Probably not from PHPCI.
$pheanstalk->delete($job);
continue; continue;
} }
@ -128,7 +124,7 @@ class BuildWorker
$build = BuildFactory::getBuildById($jobData['build_id']); $build = BuildFactory::getBuildById($jobData['build_id']);
} catch (\Exception $ex) { } catch (\Exception $ex) {
$this->logger->addWarning('Build #' . $jobData['build_id'] . ' does not exist in the database.'); $this->logger->addWarning('Build #' . $jobData['build_id'] . ' does not exist in the database.');
$pheanstalk->delete($job); $this->pheanstalk->delete($job);
} }
try { try {
@ -147,7 +143,7 @@ class BuildWorker
// If we've caught a PDO Exception, it is probably not the fault of the build, but of a failed // If we've caught a PDO Exception, it is probably not the fault of the build, but of a failed
// connection or similar. Release the job and kill the worker. // connection or similar. Release the job and kill the worker.
$this->run = false; $this->run = false;
$pheanstalk->release($job); $this->pheanstalk->release($job);
} catch (\Exception $ex) { } catch (\Exception $ex) {
$build->setStatus(Build::STATUS_FAILED); $build->setStatus(Build::STATUS_FAILED);
$build->setFinished(new \DateTime()); $build->setFinished(new \DateTime());
@ -163,7 +159,7 @@ class BuildWorker
} }
// Delete the job when we're done: // Delete the job when we're done:
$pheanstalk->delete($job); $this->pheanstalk->delete($job);
} }
} }
@ -174,4 +170,31 @@ class BuildWorker
{ {
$this->run = false; $this->run = false;
} }
protected function checkJobLimit()
{
// Make sure we don't run more than maxJobs jobs on this worker:
$this->totalJobs++;
if ($this->maxJobs != -1 && $this->maxJobs <= $this->totalJobs) {
$this->stopWorker();
}
}
protected function verifyJob(Job $job, $jobData)
{
if (empty($jobData) || !is_array($jobData)) {
// Probably not from PHPCI.
$this->pheanstalk->release($job);
return false;
}
if (!array_key_exists('type', $jobData) || $jobData['type'] !== 'phpci.build') {
// Probably not from PHPCI.
$this->pheanstalk->delete($job);
return false;
}
return true;
}
} }