From 67c0b7950571b02c5627641e6e97c7b1f14332e7 Mon Sep 17 00:00:00 2001 From: Tim Nagel Date: Thu, 22 Jan 2015 11:18:51 +1100 Subject: [PATCH] Tidy up ProgressBar use, move most calculations for loggerClosure into PopulateCommand rather than in AbstractProvider --- CHANGELOG-3.1.md | 4 ++ Command/PopulateCommand.php | 127 ++++++++++++++++++++++++++-------- Doctrine/AbstractProvider.php | 55 ++++----------- Propel/Provider.php | 28 ++------ Provider/AbstractProvider.php | 1 + 5 files changed, 120 insertions(+), 95 deletions(-) diff --git a/CHANGELOG-3.1.md b/CHANGELOG-3.1.md index 12fbb05..fe42514 100644 --- a/CHANGELOG-3.1.md +++ b/CHANGELOG-3.1.md @@ -21,3 +21,7 @@ https://github.com/FriendsOfSymfony/FOSElasticaBundle/compare/v3.0.4...v3.1.0 `dynamic_date_formats` for types. #753 * New event `POST_TRANSFORM` which allows developers to add custom properties to Elastica Documents for indexing. + * When available, the `fos:elastica:populate` command will now use the + ProgressBar helper instead of outputting strings. You can use verbosity + controls on the command to output additional information like memory + usage, runtime and estimated time. diff --git a/Command/PopulateCommand.php b/Command/PopulateCommand.php index 6f27e5b..ac37cab 100644 --- a/Command/PopulateCommand.php +++ b/Command/PopulateCommand.php @@ -100,6 +100,81 @@ class PopulateCommand extends ContainerAwareCommand } } + /** + * @param ProviderInterface $provider + * @param OutputInterface $output + * @param string $input + * @param string $type + * @param array $options + */ + private function doPopulateType(ProviderInterface $provider, OutputInterface $output, $input, $type, $options) + { + $loggerClosure = $this->getLoggerClosure($output, $input, $type); + + $provider->populate($loggerClosure, $options); + } + + /** + * Builds a loggerClosure to be called from inside the Provider to update the command + * line. + * + * @param OutputInterface $output + * @param string $index + * @param string $type + * @return callable + */ + private function getLoggerClosure(OutputInterface $output, $index, $type) + { + if (!class_exists('Symfony\Component\Console\Helper\ProgressBar')) { + $lastStep = null; + $current = 0; + + return function ($increment, $totalObjects) use ($output, $index, $type, &$lastStep, &$current) { + if ($increment > $totalObjects) { + $increment = $totalObjects; + } + + $currentTime = microtime(true); + $timeDifference = $currentTime - $lastStep; + $objectsPerSecond = $lastStep ? ($increment / $timeDifference) : $increment; + $lastStep = $currentTime; + $current += $increment; + $percent = 100 * $current / $totalObjects; + + $output->writeln(sprintf( + 'Populating %s/%s %0.1f%% (%d/%d), %d objects/s (RAM: current=%uMo peak=%uMo)', + $index, + $type, + $percent, + $current, + $totalObjects, + $objectsPerSecond, + round(memory_get_usage() / (1024 * 1024)), + round(memory_get_peak_usage() / (1024 * 1024)) + )); + }; + } + + ProgressBar::setFormatDefinition('normal', " %current%/%max% [%bar%] %percent:3s%%\n%message%"); + ProgressBar::setFormatDefinition('verbose', " %current%/%max% [%bar%] %percent:3s%% %elapsed:6s%\n%message%"); + ProgressBar::setFormatDefinition('very_verbose', " %current%/%max% [%bar%] %percent:3s%% %elapsed:6s%/%estimated:-6s%\n%message%"); + ProgressBar::setFormatDefinition('debug', " %current%/%max% [%bar%] %percent:3s%% %elapsed:6s%/%estimated:-6s% %memory:6s%\n%message%"); + $progress = null; + + return function ($increment, $totalObjects) use (&$progress, $output, $index, $type) { + if (null === $progress) { + $progress = new ProgressBar($output, $totalObjects); + } + + $progress->setMessage(sprintf('Populating %s/%s', $index, $type)); + $progress->advance($increment); + + if ($progress->getProgress() >= $progress->getMaxSteps()) { + $progress->finish(); + } + }; + } + /** * Recreates an index, populates its types, and refreshes the index. * @@ -110,39 +185,19 @@ class PopulateCommand extends ContainerAwareCommand */ private function populateIndex(OutputInterface $output, $index, $reset, $options) { - - /** @var $providers ProviderInterface[] */ - $providers = $this->providerRegistry->getIndexProviders($index); - if ($reset) { $output->writeln(sprintf('Resetting %s', $index)); $this->resetter->resetIndex($index, true); } + /** @var $providers ProviderInterface[] */ + $providers = $this->providerRegistry->getIndexProviders($index); + foreach ($providers as $type => $provider) { - if (class_exists('Symfony\Component\Console\Helper\ProgressBar')) { - $output->writeln(sprintf('Populating %s/%s', $index, $type)); - $progressBar = new ProgressBar($output, $provider->getTotalObjects()); - $progressBar->setFormat('debug'); - $progressBar->start(); - $loggerClosure = function($number) use ($progressBar) { - $progressBar->advance($number); - }; - } else { - $loggerClosure = function($message) use ($output, $index, $type) { - $output->writeln(sprintf('Populating %s/%s, %s', $index, $type, $message)); - }; - } - - $options['progress-bar'] = true; - $provider->populate($loggerClosure, $options); - - if (isset($progressBar)) $progressBar->finish(); + $this->doPopulateType($provider, $output, $index, $type, $options); } - $output->writeln(sprintf('Refreshing %s', $index)); - $this->resetter->postPopulate($index); - $this->indexManager->getIndex($index)->refresh(); + $this->refreshIndex($output, $index); } /** @@ -161,12 +216,24 @@ class PopulateCommand extends ContainerAwareCommand $this->resetter->resetIndexType($index, $type); } - $loggerClosure = function($message) use ($output, $index, $type) { - $output->writeln(sprintf('Populating %s/%s, %s', $index, $type, $message)); - }; - $provider = $this->providerRegistry->getProvider($index, $type); - $provider->populate($loggerClosure, $options); + $this->doPopulateType($provider, $output, $index, $type, $options); + + $this->refreshIndex($output, $index, false); + } + + /** + * Refreshes an index. + * + * @param OutputInterface $output + * @param string $index + * @param bool $postPopulate + */ + private function refreshIndex(OutputInterface $output, $index, $postPopulate = true) + { + if ($postPopulate) { + $this->resetter->postPopulate($index); + } $output->writeln(sprintf('Refreshing %s', $index)); $this->indexManager->getIndex($index)->refresh(); diff --git a/Doctrine/AbstractProvider.php b/Doctrine/AbstractProvider.php index 9901fef..80d0716 100644 --- a/Doctrine/AbstractProvider.php +++ b/Doctrine/AbstractProvider.php @@ -7,7 +7,6 @@ use Elastica\Exception\Bulk\ResponseException as BulkResponseException; use FOS\ElasticaBundle\Persister\ObjectPersisterInterface; use FOS\ElasticaBundle\Provider\AbstractProvider as BaseAbstractProvider; use FOS\ElasticaBundle\Provider\IndexableInterface; -use Symfony\Component\Console\Helper\ProgressBar; abstract class AbstractProvider extends BaseAbstractProvider { @@ -40,7 +39,7 @@ abstract class AbstractProvider extends BaseAbstractProvider } /** - * @see FOS\ElasticaBundle\Provider\ProviderInterface::populate() + * {@inheritDoc} */ public function populate(\Closure $loggerClosure = null, array $options = array()) { @@ -54,38 +53,22 @@ abstract class AbstractProvider extends BaseAbstractProvider $sleep = isset($options['sleep']) ? intval($options['sleep']) : 0; $batchSize = isset($options['batch-size']) ? intval($options['batch-size']) : $this->options['batch_size']; $ignoreErrors = isset($options['ignore-errors']) ? $options['ignore-errors'] : $this->options['ignore_errors']; - $progressBar = isset($options['progress-bar']) ? boolval($options['progress-bar']) : false; $manager = $this->managerRegistry->getManagerForClass($this->objectClass); for (; $offset < $nbObjects; $offset += $batchSize) { - if ($loggerClosure) { - $stepStartTime = microtime(true); - } $objects = $this->fetchSlice($queryBuilder, $batchSize, $offset); - if ($loggerClosure) { - $stepNbObjects = count($objects); - } $objects = array_filter($objects, array($this, 'isObjectIndexable')); - if (!$objects) { - if ($loggerClosure) { - $loggerClosure('Entire batch was filtered away, skipping...'); - } - if ($this->options['clear_object_manager']) { - $manager->clear(); - } - - continue; - } - - if (!$ignoreErrors) { - $this->objectPersister->insertMany($objects); - } else { - try { + if ($objects) { + if (!$ignoreErrors) { $this->objectPersister->insertMany($objects); - } catch(BulkResponseException $e) { - if ($loggerClosure) { - $loggerClosure(sprintf('%s',$e->getMessage())); + } else { + try { + $this->objectPersister->insertMany($objects); + } catch(BulkResponseException $e) { + if ($loggerClosure) { + $loggerClosure(sprintf('%s',$e->getMessage())); + } } } } @@ -96,14 +79,8 @@ abstract class AbstractProvider extends BaseAbstractProvider usleep($sleep); - if ($loggerClosure && !$progressBar) { - $stepCount = $stepNbObjects + $offset; - $percentComplete = 100 * $stepCount / $nbObjects; - $timeDifference = microtime(true) - $stepStartTime; - $objectsPerSecond = $timeDifference ? ($stepNbObjects / $timeDifference) : $stepNbObjects; - $loggerClosure(sprintf('%0.1f%% (%d/%d), %d objects/s %s', $percentComplete, $stepCount, $nbObjects, $objectsPerSecond, $this->getMemoryUsage())); - } else if ($loggerClosure && $progressBar) { - $loggerClosure($stepNbObjects); + if ($loggerClosure) { + $loggerClosure($batchSize, $nbObjects); } } @@ -112,14 +89,6 @@ abstract class AbstractProvider extends BaseAbstractProvider } } - /** - * @return int|mixed - */ - public function getTotalObjects() - { - return $this->countObjects($this->createQueryBuilder()); - } - /** * Counts objects that would be indexed using the query builder. * diff --git a/Propel/Provider.php b/Propel/Provider.php index f4966da..a3af1bd 100644 --- a/Propel/Provider.php +++ b/Propel/Provider.php @@ -12,7 +12,7 @@ use FOS\ElasticaBundle\Provider\AbstractProvider; class Provider extends AbstractProvider { /** - * @see FOS\ElasticaBundle\Provider\ProviderInterface::populate() + * {@inheritDoc} */ public function populate(\Closure $loggerClosure = null, array $options = array()) { @@ -20,40 +20,24 @@ class Provider extends AbstractProvider $nbObjects = $queryClass::create()->count(); $offset = isset($options['offset']) ? intval($options['offset']) : 0; $sleep = isset($options['sleep']) ? intval($options['sleep']) : 0; - $progressBar = isset($options['progress-bar']) ? boolval($options['progress-bar']) : false; $batchSize = isset($options['batch-size']) ? intval($options['batch-size']) : $this->options['batch_size']; for (; $offset < $nbObjects; $offset += $batchSize) { - if ($loggerClosure) { - $stepStartTime = microtime(true); - } - $objects = $queryClass::create() ->limit($batchSize) ->offset($offset) ->find() ->getArrayCopy(); - if ($loggerClosure) { - $stepNbObjects = count($objects); - } + $objects = array_filter($objects, array($this, 'isObjectIndexable')); - if (!$objects) { - $loggerClosure('Entire batch was filtered away, skipping...'); - - continue; + if ($objects) { + $this->objectPersister->insertMany($objects); } - $this->objectPersister->insertMany($objects); - usleep($sleep); - if ($loggerClosure && !$progressBar) { - $stepCount = $stepNbObjects + $offset; - $percentComplete = 100 * $stepCount / $nbObjects; - $objectsPerSecond = $stepNbObjects / (microtime(true) - $stepStartTime); - $loggerClosure(sprintf('%0.1f%% (%d/%d), %d objects/s %s', $percentComplete, $stepCount, $nbObjects, $objectsPerSecond, $this->getMemoryUsage())); - } else if ($loggerClosure && $progressBar) { - $loggerClosure($stepNbObjects); + if ($loggerClosure) { + $loggerClosure($batchSize, $nbObjects); } } } diff --git a/Provider/AbstractProvider.php b/Provider/AbstractProvider.php index 82ea914..842518d 100644 --- a/Provider/AbstractProvider.php +++ b/Provider/AbstractProvider.php @@ -70,6 +70,7 @@ abstract class AbstractProvider implements ProviderInterface /** * Get string with RAM usage information (current and peak) * + * @deprecated To be removed in 4.0 * @return string */ protected function getMemoryUsage()