Merge pull request #780 from merk/pr/770

Continued work on ProgressBar
This commit is contained in:
Tim Nagel 2015-01-25 19:54:53 +11:00
commit d44525f6f3
5 changed files with 116 additions and 61 deletions

View file

@ -21,3 +21,7 @@ https://github.com/FriendsOfSymfony/FOSElasticaBundle/compare/v3.0.4...v3.1.0
`dynamic_date_formats` for types. #753
* New event `POST_TRANSFORM` which allows developers to add custom properties to
Elastica Documents for indexing.
* When available, the `fos:elastica:populate` command will now use the
ProgressBar helper instead of outputting strings. You can use verbosity
controls on the command to output additional information like memory
usage, runtime and estimated time.

View file

@ -11,6 +11,7 @@ use FOS\ElasticaBundle\IndexManager;
use FOS\ElasticaBundle\Provider\ProviderRegistry;
use FOS\ElasticaBundle\Resetter;
use FOS\ElasticaBundle\Provider\ProviderInterface;
use Symfony\Component\Console\Helper\ProgressBar;
/**
* Populate the search index
@ -99,6 +100,81 @@ class PopulateCommand extends ContainerAwareCommand
}
}
/**
* @param ProviderInterface $provider
* @param OutputInterface $output
* @param string $input
* @param string $type
* @param array $options
*/
private function doPopulateType(ProviderInterface $provider, OutputInterface $output, $input, $type, $options)
{
$loggerClosure = $this->getLoggerClosure($output, $input, $type);
$provider->populate($loggerClosure, $options);
}
/**
* Builds a loggerClosure to be called from inside the Provider to update the command
* line.
*
* @param OutputInterface $output
* @param string $index
* @param string $type
* @return callable
*/
private function getLoggerClosure(OutputInterface $output, $index, $type)
{
if (!class_exists('Symfony\Component\Console\Helper\ProgressBar')) {
$lastStep = null;
$current = 0;
return function ($increment, $totalObjects) use ($output, $index, $type, &$lastStep, &$current) {
if ($increment > $totalObjects) {
$increment = $totalObjects;
}
$currentTime = microtime(true);
$timeDifference = $currentTime - $lastStep;
$objectsPerSecond = $lastStep ? ($increment / $timeDifference) : $increment;
$lastStep = $currentTime;
$current += $increment;
$percent = 100 * $current / $totalObjects;
$output->writeln(sprintf(
'<info>Populating</info> <comment>%s/%s</comment> %0.1f%% (%d/%d), %d objects/s (RAM: current=%uMo peak=%uMo)',
$index,
$type,
$percent,
$current,
$totalObjects,
$objectsPerSecond,
round(memory_get_usage() / (1024 * 1024)),
round(memory_get_peak_usage() / (1024 * 1024))
));
};
}
ProgressBar::setFormatDefinition('normal', " %current%/%max% [%bar%] %percent:3s%%\n%message%");
ProgressBar::setFormatDefinition('verbose', " %current%/%max% [%bar%] %percent:3s%% %elapsed:6s%\n%message%");
ProgressBar::setFormatDefinition('very_verbose', " %current%/%max% [%bar%] %percent:3s%% %elapsed:6s%/%estimated:-6s%\n%message%");
ProgressBar::setFormatDefinition('debug', " %current%/%max% [%bar%] %percent:3s%% %elapsed:6s%/%estimated:-6s% %memory:6s%\n%message%");
$progress = null;
return function ($increment, $totalObjects) use (&$progress, $output, $index, $type) {
if (null === $progress) {
$progress = new ProgressBar($output, $totalObjects);
}
$progress->setMessage(sprintf('<info>Populating</info> <comment>%s/%s</comment>', $index, $type));
$progress->advance($increment);
if ($progress->getProgress() >= $progress->getMaxSteps()) {
$progress->finish();
}
};
}
/**
* Recreates an index, populates its types, and refreshes the index.
*
@ -118,16 +194,10 @@ class PopulateCommand extends ContainerAwareCommand
$providers = $this->providerRegistry->getIndexProviders($index);
foreach ($providers as $type => $provider) {
$loggerClosure = function($message) use ($output, $index, $type) {
$output->writeln(sprintf('<info>Populating</info> %s/%s, %s', $index, $type, $message));
};
$provider->populate($loggerClosure, $options);
$this->doPopulateType($provider, $output, $index, $type, $options);
}
$output->writeln(sprintf('<info>Refreshing</info> <comment>%s</comment>', $index));
$this->resetter->postPopulate($index);
$this->indexManager->getIndex($index)->refresh();
$this->refreshIndex($output, $index);
}
/**
@ -146,12 +216,24 @@ class PopulateCommand extends ContainerAwareCommand
$this->resetter->resetIndexType($index, $type);
}
$loggerClosure = function($message) use ($output, $index, $type) {
$output->writeln(sprintf('<info>Populating</info> %s/%s, %s', $index, $type, $message));
};
$provider = $this->providerRegistry->getProvider($index, $type);
$provider->populate($loggerClosure, $options);
$this->doPopulateType($provider, $output, $index, $type, $options);
$this->refreshIndex($output, $index, false);
}
/**
* Refreshes an index.
*
* @param OutputInterface $output
* @param string $index
* @param bool $postPopulate
*/
private function refreshIndex(OutputInterface $output, $index, $postPopulate = true)
{
if ($postPopulate) {
$this->resetter->postPopulate($index);
}
$output->writeln(sprintf('<info>Refreshing</info> <comment>%s</comment>', $index));
$this->indexManager->getIndex($index)->refresh();

View file

@ -39,7 +39,7 @@ abstract class AbstractProvider extends BaseAbstractProvider
}
/**
* @see FOS\ElasticaBundle\Provider\ProviderInterface::populate()
* {@inheritDoc}
*/
public function populate(\Closure $loggerClosure = null, array $options = array())
{
@ -56,34 +56,19 @@ abstract class AbstractProvider extends BaseAbstractProvider
$manager = $this->managerRegistry->getManagerForClass($this->objectClass);
for (; $offset < $nbObjects; $offset += $batchSize) {
if ($loggerClosure) {
$stepStartTime = microtime(true);
}
$objects = $this->fetchSlice($queryBuilder, $batchSize, $offset);
if ($loggerClosure) {
$stepNbObjects = count($objects);
}
$objects = array_filter($objects, array($this, 'isObjectIndexable'));
if (!$objects) {
if ($loggerClosure) {
$loggerClosure('<info>Entire batch was filtered away, skipping...</info>');
}
if ($this->options['clear_object_manager']) {
$manager->clear();
}
continue;
}
if (!$ignoreErrors) {
$this->objectPersister->insertMany($objects);
} else {
try {
if ($objects) {
if (!$ignoreErrors) {
$this->objectPersister->insertMany($objects);
} catch(BulkResponseException $e) {
if ($loggerClosure) {
$loggerClosure(sprintf('<error>%s</error>',$e->getMessage()));
} else {
try {
$this->objectPersister->insertMany($objects);
} catch(BulkResponseException $e) {
if ($loggerClosure) {
$loggerClosure(sprintf('<error>%s</error>',$e->getMessage()));
}
}
}
}
@ -95,11 +80,7 @@ abstract class AbstractProvider extends BaseAbstractProvider
usleep($sleep);
if ($loggerClosure) {
$stepCount = $stepNbObjects + $offset;
$percentComplete = 100 * $stepCount / $nbObjects;
$timeDifference = microtime(true) - $stepStartTime;
$objectsPerSecond = $timeDifference ? ($stepNbObjects / $timeDifference) : $stepNbObjects;
$loggerClosure(sprintf('%0.1f%% (%d/%d), %d objects/s %s', $percentComplete, $stepCount, $nbObjects, $objectsPerSecond, $this->getMemoryUsage()));
$loggerClosure($batchSize, $nbObjects);
}
}

View file

@ -12,7 +12,7 @@ use FOS\ElasticaBundle\Provider\AbstractProvider;
class Provider extends AbstractProvider
{
/**
* @see FOS\ElasticaBundle\Provider\ProviderInterface::populate()
* {@inheritDoc}
*/
public function populate(\Closure $loggerClosure = null, array $options = array())
{
@ -23,34 +23,21 @@ class Provider extends AbstractProvider
$batchSize = isset($options['batch-size']) ? intval($options['batch-size']) : $this->options['batch_size'];
for (; $offset < $nbObjects; $offset += $batchSize) {
if ($loggerClosure) {
$stepStartTime = microtime(true);
}
$objects = $queryClass::create()
->limit($batchSize)
->offset($offset)
->find()
->getArrayCopy();
if ($loggerClosure) {
$stepNbObjects = count($objects);
}
$objects = array_filter($objects, array($this, 'isObjectIndexable'));
if (!$objects) {
$loggerClosure('<info>Entire batch was filtered away, skipping...</info>');
continue;
if ($objects) {
$this->objectPersister->insertMany($objects);
}
$this->objectPersister->insertMany($objects);
usleep($sleep);
if ($loggerClosure) {
$stepCount = $stepNbObjects + $offset;
$percentComplete = 100 * $stepCount / $nbObjects;
$objectsPerSecond = $stepNbObjects / (microtime(true) - $stepStartTime);
$loggerClosure(sprintf('%0.1f%% (%d/%d), %d objects/s %s', $percentComplete, $stepCount, $nbObjects, $objectsPerSecond, $this->getMemoryUsage()));
$loggerClosure($batchSize, $nbObjects);
}
}
}

View file

@ -70,6 +70,7 @@ abstract class AbstractProvider implements ProviderInterface
/**
* Get string with RAM usage information (current and peak)
*
* @deprecated To be removed in 4.0
* @return string
*/
protected function getMemoryUsage()