Tidy up ProgressBar use, move most calculations for loggerClosure

into PopulateCommand rather than in AbstractProvider
This commit is contained in:
Tim Nagel 2015-01-22 11:18:51 +11:00
parent 6bb2def21e
commit 67c0b79505
5 changed files with 120 additions and 95 deletions

View file

@ -21,3 +21,7 @@ https://github.com/FriendsOfSymfony/FOSElasticaBundle/compare/v3.0.4...v3.1.0
`dynamic_date_formats` for types. #753
* New event `POST_TRANSFORM` which allows developers to add custom properties to
Elastica Documents for indexing.
* When available, the `fos:elastica:populate` command will now use the
ProgressBar helper instead of outputting strings. You can use verbosity
controls on the command to output additional information like memory
usage, runtime and estimated time.

View file

@ -100,6 +100,81 @@ class PopulateCommand extends ContainerAwareCommand
}
}
/**
* @param ProviderInterface $provider
* @param OutputInterface $output
* @param string $input
* @param string $type
* @param array $options
*/
private function doPopulateType(ProviderInterface $provider, OutputInterface $output, $input, $type, $options)
{
$loggerClosure = $this->getLoggerClosure($output, $input, $type);
$provider->populate($loggerClosure, $options);
}
/**
* Builds a loggerClosure to be called from inside the Provider to update the command
* line.
*
* @param OutputInterface $output
* @param string $index
* @param string $type
* @return callable
*/
private function getLoggerClosure(OutputInterface $output, $index, $type)
{
if (!class_exists('Symfony\Component\Console\Helper\ProgressBar')) {
$lastStep = null;
$current = 0;
return function ($increment, $totalObjects) use ($output, $index, $type, &$lastStep, &$current) {
if ($increment > $totalObjects) {
$increment = $totalObjects;
}
$currentTime = microtime(true);
$timeDifference = $currentTime - $lastStep;
$objectsPerSecond = $lastStep ? ($increment / $timeDifference) : $increment;
$lastStep = $currentTime;
$current += $increment;
$percent = 100 * $current / $totalObjects;
$output->writeln(sprintf(
'<info>Populating</info> <comment>%s/%s</comment> %0.1f%% (%d/%d), %d objects/s (RAM: current=%uMo peak=%uMo)',
$index,
$type,
$percent,
$current,
$totalObjects,
$objectsPerSecond,
round(memory_get_usage() / (1024 * 1024)),
round(memory_get_peak_usage() / (1024 * 1024))
));
};
}
ProgressBar::setFormatDefinition('normal', " %current%/%max% [%bar%] %percent:3s%%\n%message%");
ProgressBar::setFormatDefinition('verbose', " %current%/%max% [%bar%] %percent:3s%% %elapsed:6s%\n%message%");
ProgressBar::setFormatDefinition('very_verbose', " %current%/%max% [%bar%] %percent:3s%% %elapsed:6s%/%estimated:-6s%\n%message%");
ProgressBar::setFormatDefinition('debug', " %current%/%max% [%bar%] %percent:3s%% %elapsed:6s%/%estimated:-6s% %memory:6s%\n%message%");
$progress = null;
return function ($increment, $totalObjects) use (&$progress, $output, $index, $type) {
if (null === $progress) {
$progress = new ProgressBar($output, $totalObjects);
}
$progress->setMessage(sprintf('<info>Populating</info> <comment>%s/%s</comment>', $index, $type));
$progress->advance($increment);
if ($progress->getProgress() >= $progress->getMaxSteps()) {
$progress->finish();
}
};
}
/**
* Recreates an index, populates its types, and refreshes the index.
*
@ -110,39 +185,19 @@ class PopulateCommand extends ContainerAwareCommand
*/
private function populateIndex(OutputInterface $output, $index, $reset, $options)
{
/** @var $providers ProviderInterface[] */
$providers = $this->providerRegistry->getIndexProviders($index);
if ($reset) {
$output->writeln(sprintf('<info>Resetting</info> <comment>%s</comment>', $index));
$this->resetter->resetIndex($index, true);
}
/** @var $providers ProviderInterface[] */
$providers = $this->providerRegistry->getIndexProviders($index);
foreach ($providers as $type => $provider) {
if (class_exists('Symfony\Component\Console\Helper\ProgressBar')) {
$output->writeln(sprintf('<info>Populating</info> %s/%s', $index, $type));
$progressBar = new ProgressBar($output, $provider->getTotalObjects());
$progressBar->setFormat('debug');
$progressBar->start();
$loggerClosure = function($number) use ($progressBar) {
$progressBar->advance($number);
};
} else {
$loggerClosure = function($message) use ($output, $index, $type) {
$output->writeln(sprintf('<info>Populating</info> %s/%s, %s', $index, $type, $message));
};
}
$options['progress-bar'] = true;
$provider->populate($loggerClosure, $options);
if (isset($progressBar)) $progressBar->finish();
$this->doPopulateType($provider, $output, $index, $type, $options);
}
$output->writeln(sprintf('<info>Refreshing</info> <comment>%s</comment>', $index));
$this->resetter->postPopulate($index);
$this->indexManager->getIndex($index)->refresh();
$this->refreshIndex($output, $index);
}
/**
@ -161,12 +216,24 @@ class PopulateCommand extends ContainerAwareCommand
$this->resetter->resetIndexType($index, $type);
}
$loggerClosure = function($message) use ($output, $index, $type) {
$output->writeln(sprintf('<info>Populating</info> %s/%s, %s', $index, $type, $message));
};
$provider = $this->providerRegistry->getProvider($index, $type);
$provider->populate($loggerClosure, $options);
$this->doPopulateType($provider, $output, $index, $type, $options);
$this->refreshIndex($output, $index, false);
}
/**
* Refreshes an index.
*
* @param OutputInterface $output
* @param string $index
* @param bool $postPopulate
*/
private function refreshIndex(OutputInterface $output, $index, $postPopulate = true)
{
if ($postPopulate) {
$this->resetter->postPopulate($index);
}
$output->writeln(sprintf('<info>Refreshing</info> <comment>%s</comment>', $index));
$this->indexManager->getIndex($index)->refresh();

View file

@ -7,7 +7,6 @@ use Elastica\Exception\Bulk\ResponseException as BulkResponseException;
use FOS\ElasticaBundle\Persister\ObjectPersisterInterface;
use FOS\ElasticaBundle\Provider\AbstractProvider as BaseAbstractProvider;
use FOS\ElasticaBundle\Provider\IndexableInterface;
use Symfony\Component\Console\Helper\ProgressBar;
abstract class AbstractProvider extends BaseAbstractProvider
{
@ -40,7 +39,7 @@ abstract class AbstractProvider extends BaseAbstractProvider
}
/**
* @see FOS\ElasticaBundle\Provider\ProviderInterface::populate()
* {@inheritDoc}
*/
public function populate(\Closure $loggerClosure = null, array $options = array())
{
@ -54,38 +53,22 @@ abstract class AbstractProvider extends BaseAbstractProvider
$sleep = isset($options['sleep']) ? intval($options['sleep']) : 0;
$batchSize = isset($options['batch-size']) ? intval($options['batch-size']) : $this->options['batch_size'];
$ignoreErrors = isset($options['ignore-errors']) ? $options['ignore-errors'] : $this->options['ignore_errors'];
$progressBar = isset($options['progress-bar']) ? boolval($options['progress-bar']) : false;
$manager = $this->managerRegistry->getManagerForClass($this->objectClass);
for (; $offset < $nbObjects; $offset += $batchSize) {
if ($loggerClosure) {
$stepStartTime = microtime(true);
}
$objects = $this->fetchSlice($queryBuilder, $batchSize, $offset);
if ($loggerClosure) {
$stepNbObjects = count($objects);
}
$objects = array_filter($objects, array($this, 'isObjectIndexable'));
if (!$objects) {
if ($loggerClosure) {
$loggerClosure('<info>Entire batch was filtered away, skipping...</info>');
}
if ($this->options['clear_object_manager']) {
$manager->clear();
}
continue;
}
if (!$ignoreErrors) {
$this->objectPersister->insertMany($objects);
} else {
try {
if ($objects) {
if (!$ignoreErrors) {
$this->objectPersister->insertMany($objects);
} catch(BulkResponseException $e) {
if ($loggerClosure) {
$loggerClosure(sprintf('<error>%s</error>',$e->getMessage()));
} else {
try {
$this->objectPersister->insertMany($objects);
} catch(BulkResponseException $e) {
if ($loggerClosure) {
$loggerClosure(sprintf('<error>%s</error>',$e->getMessage()));
}
}
}
}
@ -96,14 +79,8 @@ abstract class AbstractProvider extends BaseAbstractProvider
usleep($sleep);
if ($loggerClosure && !$progressBar) {
$stepCount = $stepNbObjects + $offset;
$percentComplete = 100 * $stepCount / $nbObjects;
$timeDifference = microtime(true) - $stepStartTime;
$objectsPerSecond = $timeDifference ? ($stepNbObjects / $timeDifference) : $stepNbObjects;
$loggerClosure(sprintf('%0.1f%% (%d/%d), %d objects/s %s', $percentComplete, $stepCount, $nbObjects, $objectsPerSecond, $this->getMemoryUsage()));
} else if ($loggerClosure && $progressBar) {
$loggerClosure($stepNbObjects);
if ($loggerClosure) {
$loggerClosure($batchSize, $nbObjects);
}
}
@ -112,14 +89,6 @@ abstract class AbstractProvider extends BaseAbstractProvider
}
}
/**
* @return int|mixed
*/
public function getTotalObjects()
{
return $this->countObjects($this->createQueryBuilder());
}
/**
* Counts objects that would be indexed using the query builder.
*

View file

@ -12,7 +12,7 @@ use FOS\ElasticaBundle\Provider\AbstractProvider;
class Provider extends AbstractProvider
{
/**
* @see FOS\ElasticaBundle\Provider\ProviderInterface::populate()
* {@inheritDoc}
*/
public function populate(\Closure $loggerClosure = null, array $options = array())
{
@ -20,40 +20,24 @@ class Provider extends AbstractProvider
$nbObjects = $queryClass::create()->count();
$offset = isset($options['offset']) ? intval($options['offset']) : 0;
$sleep = isset($options['sleep']) ? intval($options['sleep']) : 0;
$progressBar = isset($options['progress-bar']) ? boolval($options['progress-bar']) : false;
$batchSize = isset($options['batch-size']) ? intval($options['batch-size']) : $this->options['batch_size'];
for (; $offset < $nbObjects; $offset += $batchSize) {
if ($loggerClosure) {
$stepStartTime = microtime(true);
}
$objects = $queryClass::create()
->limit($batchSize)
->offset($offset)
->find()
->getArrayCopy();
if ($loggerClosure) {
$stepNbObjects = count($objects);
}
$objects = array_filter($objects, array($this, 'isObjectIndexable'));
if (!$objects) {
$loggerClosure('<info>Entire batch was filtered away, skipping...</info>');
continue;
if ($objects) {
$this->objectPersister->insertMany($objects);
}
$this->objectPersister->insertMany($objects);
usleep($sleep);
if ($loggerClosure && !$progressBar) {
$stepCount = $stepNbObjects + $offset;
$percentComplete = 100 * $stepCount / $nbObjects;
$objectsPerSecond = $stepNbObjects / (microtime(true) - $stepStartTime);
$loggerClosure(sprintf('%0.1f%% (%d/%d), %d objects/s %s', $percentComplete, $stepCount, $nbObjects, $objectsPerSecond, $this->getMemoryUsage()));
} else if ($loggerClosure && $progressBar) {
$loggerClosure($stepNbObjects);
if ($loggerClosure) {
$loggerClosure($batchSize, $nbObjects);
}
}
}

View file

@ -70,6 +70,7 @@ abstract class AbstractProvider implements ProviderInterface
/**
* Get string with RAM usage information (current and peak)
*
* @deprecated To be removed in 4.0
* @return string
*/
protected function getMemoryUsage()