Provider refactoring
This commit is contained in:
parent
d4f01e8d2e
commit
3bb2f384ba
|
@ -7,6 +7,7 @@ use Elastica\Exception\Bulk\ResponseException as BulkResponseException;
|
|||
use FOS\ElasticaBundle\Persister\ObjectPersisterInterface;
|
||||
use FOS\ElasticaBundle\Provider\AbstractProvider as BaseAbstractProvider;
|
||||
use FOS\ElasticaBundle\Provider\IndexableInterface;
|
||||
use Symfony\Component\OptionsResolver\OptionsResolver;
|
||||
|
||||
abstract class AbstractProvider extends BaseAbstractProvider
|
||||
{
|
||||
|
@ -26,7 +27,7 @@ abstract class AbstractProvider extends BaseAbstractProvider
|
|||
* @param ObjectPersisterInterface $objectPersister
|
||||
* @param IndexableInterface $indexable
|
||||
* @param string $objectClass
|
||||
* @param array $options
|
||||
* @param array $baseOptions
|
||||
* @param ManagerRegistry $managerRegistry
|
||||
* @param SliceFetcherInterface $sliceFetcher
|
||||
*/
|
||||
|
@ -34,71 +35,106 @@ abstract class AbstractProvider extends BaseAbstractProvider
|
|||
ObjectPersisterInterface $objectPersister,
|
||||
IndexableInterface $indexable,
|
||||
$objectClass,
|
||||
array $options,
|
||||
array $baseOptions,
|
||||
ManagerRegistry $managerRegistry,
|
||||
SliceFetcherInterface $sliceFetcher = null
|
||||
) {
|
||||
parent::__construct($objectPersister, $indexable, $objectClass, array_merge(array(
|
||||
'clear_object_manager' => true,
|
||||
'debug_logging' => false,
|
||||
'ignore_errors' => false,
|
||||
'query_builder_method' => 'createQueryBuilder',
|
||||
), $options));
|
||||
parent::__construct($objectPersister, $indexable, $objectClass, $baseOptions);
|
||||
|
||||
$this->managerRegistry = $managerRegistry;
|
||||
$this->sliceFetcher = $sliceFetcher;
|
||||
}
|
||||
|
||||
/**
|
||||
* Counts objects that would be indexed using the query builder.
|
||||
*
|
||||
* @param object $queryBuilder
|
||||
*
|
||||
* @return integer
|
||||
*/
|
||||
abstract protected function countObjects($queryBuilder);
|
||||
|
||||
/**
|
||||
* Creates the query builder, which will be used to fetch objects to index.
|
||||
*
|
||||
* @param string $method
|
||||
*
|
||||
* @return object
|
||||
*/
|
||||
abstract protected function createQueryBuilder($method);
|
||||
|
||||
/**
|
||||
* Fetches a slice of objects using the query builder.
|
||||
*
|
||||
* @param object $queryBuilder
|
||||
* @param integer $limit
|
||||
* @param integer $offset
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
abstract protected function fetchSlice($queryBuilder, $limit, $offset);
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
public function populate(\Closure $loggerClosure = null, array $options = array())
|
||||
protected function doPopulate($options, \Closure $loggerClosure = null)
|
||||
{
|
||||
if (!$this->options['debug_logging']) {
|
||||
$logger = $this->disableLogging();
|
||||
}
|
||||
|
||||
$queryBuilder = $this->createQueryBuilder();
|
||||
$nbObjects = $this->countObjects($queryBuilder);
|
||||
$offset = isset($options['offset']) ? intval($options['offset']) : 0;
|
||||
$sleep = isset($options['sleep']) ? intval($options['sleep']) : 0;
|
||||
$batchSize = isset($options['batch-size']) ? intval($options['batch-size']) : $this->options['batch_size'];
|
||||
$ignoreErrors = isset($options['ignore-errors']) ? $options['ignore-errors'] : $this->options['ignore_errors'];
|
||||
$manager = $this->managerRegistry->getManagerForClass($this->objectClass);
|
||||
|
||||
$objects = array();
|
||||
for (; $offset < $nbObjects; $offset += $batchSize) {
|
||||
$objects = $this->getSlice($queryBuilder, $batchSize, $offset, $objects);
|
||||
$objects = array_filter($objects, array($this, 'isObjectIndexable'));
|
||||
$queryBuilder = $this->createQueryBuilder($options['query_builder_method']);
|
||||
$nbObjects = $this->countObjects($queryBuilder);
|
||||
$offset = $options['offset'];
|
||||
|
||||
if (!empty($objects)) {
|
||||
if (!$ignoreErrors) {
|
||||
$objects = array();
|
||||
for (; $offset < $nbObjects; $offset += $options['batch_size']) {
|
||||
try {
|
||||
$objects = $this->getSlice($queryBuilder, $options['batch_size'], $offset, $objects);
|
||||
$objects = $this->filterObjects($options, $objects);
|
||||
|
||||
if (!empty($objects)) {
|
||||
$this->objectPersister->insertMany($objects);
|
||||
} else {
|
||||
try {
|
||||
$this->objectPersister->insertMany($objects);
|
||||
} catch (BulkResponseException $e) {
|
||||
if ($loggerClosure) {
|
||||
$loggerClosure($batchSize, $nbObjects, sprintf('<error>%s</error>', $e->getMessage()));
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (BulkResponseException $e) {
|
||||
if (!$options['ignore_errors']) {
|
||||
throw $e;
|
||||
}
|
||||
|
||||
if (null !== $loggerClosure) {
|
||||
$loggerClosure(
|
||||
$options['batch_size'],
|
||||
$nbObjects,
|
||||
sprintf('<error>%s</error>', $e->getMessage())
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if ($this->options['clear_object_manager']) {
|
||||
if ($options['clear_object_manager']) {
|
||||
$manager->clear();
|
||||
}
|
||||
|
||||
usleep($sleep);
|
||||
usleep($options['sleep']);
|
||||
|
||||
if ($loggerClosure) {
|
||||
$loggerClosure($batchSize, $nbObjects);
|
||||
if (null !== $loggerClosure) {
|
||||
$loggerClosure($options['batch_size'], $nbObjects);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!$this->options['debug_logging']) {
|
||||
$this->enableLogging($logger);
|
||||
}
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
protected function configureOptions()
|
||||
{
|
||||
parent::configureOptions();
|
||||
|
||||
$this->resolver->setDefaults(array(
|
||||
'clear_object_manager' => true,
|
||||
'debug_logging' => false,
|
||||
'ignore_errors' => false,
|
||||
'offset' => 0,
|
||||
'query_builder_method' => 'createQueryBuilder',
|
||||
'sleep' => 0
|
||||
));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -131,47 +167,4 @@ abstract class AbstractProvider extends BaseAbstractProvider
|
|||
$identifierFieldNames
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Counts objects that would be indexed using the query builder.
|
||||
*
|
||||
* @param object $queryBuilder
|
||||
*
|
||||
* @return integer
|
||||
*/
|
||||
abstract protected function countObjects($queryBuilder);
|
||||
|
||||
/**
|
||||
* Disables logging and returns the logger that was previously set.
|
||||
*
|
||||
* @return mixed
|
||||
*/
|
||||
abstract protected function disableLogging();
|
||||
|
||||
/**
|
||||
* Reenables the logger with the previously returned logger from disableLogging();.
|
||||
*
|
||||
* @param mixed $logger
|
||||
*
|
||||
* @return mixed
|
||||
*/
|
||||
abstract protected function enableLogging($logger);
|
||||
|
||||
/**
|
||||
* Fetches a slice of objects using the query builder.
|
||||
*
|
||||
* @param object $queryBuilder
|
||||
* @param integer $limit
|
||||
* @param integer $offset
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
abstract protected function fetchSlice($queryBuilder, $limit, $offset);
|
||||
|
||||
/**
|
||||
* Creates the query builder, which will be used to fetch objects to index.
|
||||
*
|
||||
* @return object
|
||||
*/
|
||||
abstract protected function createQueryBuilder();
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ class Provider extends AbstractProvider
|
|||
}
|
||||
|
||||
/**
|
||||
* @see FOS\ElasticaBundle\Doctrine\AbstractProvider::countObjects()
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
protected function countObjects($queryBuilder)
|
||||
{
|
||||
|
@ -58,7 +58,7 @@ class Provider extends AbstractProvider
|
|||
}
|
||||
|
||||
/**
|
||||
* @see FOS\ElasticaBundle\Doctrine\AbstractProvider::fetchSlice()
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
protected function fetchSlice($queryBuilder, $limit, $offset)
|
||||
{
|
||||
|
@ -75,13 +75,13 @@ class Provider extends AbstractProvider
|
|||
}
|
||||
|
||||
/**
|
||||
* @see FOS\ElasticaBundle\Doctrine\AbstractProvider::createQueryBuilder()
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
protected function createQueryBuilder()
|
||||
protected function createQueryBuilder($method)
|
||||
{
|
||||
return $this->managerRegistry
|
||||
->getManagerForClass($this->objectClass)
|
||||
->getRepository($this->objectClass)
|
||||
->{$this->options['query_builder_method']}();
|
||||
->{$method}();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,7 +46,7 @@ class Provider extends AbstractProvider
|
|||
}
|
||||
|
||||
/**
|
||||
* @see FOS\ElasticaBundle\Doctrine\AbstractProvider::countObjects()
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
protected function countObjects($queryBuilder)
|
||||
{
|
||||
|
@ -69,7 +69,9 @@ class Provider extends AbstractProvider
|
|||
}
|
||||
|
||||
/**
|
||||
* @see FOS\ElasticaBundle\Doctrine\AbstractProvider::fetchSlice()
|
||||
* This method should remain in sync with SliceFetcher::fetch until it is deprecated and removed.
|
||||
*
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
protected function fetchSlice($queryBuilder, $limit, $offset)
|
||||
{
|
||||
|
@ -103,14 +105,14 @@ class Provider extends AbstractProvider
|
|||
}
|
||||
|
||||
/**
|
||||
* @see FOS\ElasticaBundle\Doctrine\AbstractProvider::createQueryBuilder()
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
protected function createQueryBuilder()
|
||||
protected function createQueryBuilder($method)
|
||||
{
|
||||
return $this->managerRegistry
|
||||
->getManagerForClass($this->objectClass)
|
||||
->getRepository($this->objectClass)
|
||||
// ORM query builders require an alias argument
|
||||
->{$this->options['query_builder_method']}(static::ENTITY_ALIAS);
|
||||
->{$method}(static::ENTITY_ALIAS);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,6 +14,9 @@ use FOS\ElasticaBundle\Doctrine\SliceFetcherInterface;
|
|||
class SliceFetcher implements SliceFetcherInterface
|
||||
{
|
||||
/**
|
||||
* This method should remain in sync with Provider::fetchSlice until that method is deprecated and
|
||||
* removed.
|
||||
*
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function fetch($queryBuilder, $limit, $offset, array $previousSlice, array $identifierFieldNames)
|
||||
|
|
|
@ -14,31 +14,43 @@ class Provider extends AbstractProvider
|
|||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
public function populate(\Closure $loggerClosure = null, array $options = array())
|
||||
public function doPopulate($options, \Closure $loggerClosure = null)
|
||||
{
|
||||
$queryClass = $this->objectClass.'Query';
|
||||
$nbObjects = $queryClass::create()->count();
|
||||
$offset = isset($options['offset']) ? intval($options['offset']) : 0;
|
||||
$sleep = isset($options['sleep']) ? intval($options['sleep']) : 0;
|
||||
$batchSize = isset($options['batch-size']) ? intval($options['batch-size']) : $this->options['batch_size'];
|
||||
|
||||
for (; $offset < $nbObjects; $offset += $batchSize) {
|
||||
$offset = $options['offset'];
|
||||
|
||||
for (; $offset < $nbObjects; $offset += $options['batch_size']) {
|
||||
$objects = $queryClass::create()
|
||||
->limit($batchSize)
|
||||
->limit($options['batch_size'])
|
||||
->offset($offset)
|
||||
->find()
|
||||
->getArrayCopy();
|
||||
|
||||
$objects = array_filter($objects, array($this, 'isObjectIndexable'));
|
||||
$objects = $this->filterObjects($options, $objects);
|
||||
if (!empty($objects)) {
|
||||
$this->objectPersister->insertMany($objects);
|
||||
}
|
||||
|
||||
usleep($sleep);
|
||||
usleep($options['sleep']);
|
||||
|
||||
if ($loggerClosure) {
|
||||
$loggerClosure($batchSize, $nbObjects);
|
||||
$loggerClosure($options['batch_size'], $nbObjects);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
protected function disableLogging()
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
protected function enableLogging($logger)
|
||||
{
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
namespace FOS\ElasticaBundle\Provider;
|
||||
|
||||
use FOS\ElasticaBundle\Persister\ObjectPersisterInterface;
|
||||
use Symfony\Component\OptionsResolver\OptionsResolver;
|
||||
|
||||
/**
|
||||
* AbstractProvider.
|
||||
|
@ -10,9 +11,9 @@ use FOS\ElasticaBundle\Persister\ObjectPersisterInterface;
|
|||
abstract class AbstractProvider implements ProviderInterface
|
||||
{
|
||||
/**
|
||||
* @var ObjectPersisterInterface
|
||||
* @var array
|
||||
*/
|
||||
protected $objectPersister;
|
||||
protected $baseOptions;
|
||||
|
||||
/**
|
||||
* @var string
|
||||
|
@ -20,9 +21,14 @@ abstract class AbstractProvider implements ProviderInterface
|
|||
protected $objectClass;
|
||||
|
||||
/**
|
||||
* @var array
|
||||
* @var ObjectPersisterInterface
|
||||
*/
|
||||
protected $options;
|
||||
protected $objectPersister;
|
||||
|
||||
/**
|
||||
* @var OptionsResolver
|
||||
*/
|
||||
protected $resolver;
|
||||
|
||||
/**
|
||||
* @var IndexableInterface
|
||||
|
@ -35,26 +41,114 @@ abstract class AbstractProvider implements ProviderInterface
|
|||
* @param ObjectPersisterInterface $objectPersister
|
||||
* @param IndexableInterface $indexable
|
||||
* @param string $objectClass
|
||||
* @param array $options
|
||||
* @param array $baseOptions
|
||||
*/
|
||||
public function __construct(
|
||||
ObjectPersisterInterface $objectPersister,
|
||||
IndexableInterface $indexable,
|
||||
$objectClass,
|
||||
array $options = array()
|
||||
array $baseOptions = array()
|
||||
) {
|
||||
$this->baseOptions = $baseOptions;
|
||||
$this->indexable = $indexable;
|
||||
$this->objectClass = $objectClass;
|
||||
$this->objectPersister = $objectPersister;
|
||||
$this->resolver = new OptionsResolver();
|
||||
$this->configureOptions();
|
||||
}
|
||||
|
||||
$this->options = array_merge(array(
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
public function populate(\Closure $loggerClosure = null, array $options = array())
|
||||
{
|
||||
$options = $this->resolveOptions($options);
|
||||
|
||||
$logger = !$options['debug_logging'] ?
|
||||
$this->disableLogging() :
|
||||
null;
|
||||
|
||||
$this->doPopulate($options, $loggerClosure);
|
||||
|
||||
if (null !== $logger) {
|
||||
$this->enableLogging($logger);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Disables logging and returns the logger that was previously set.
|
||||
*
|
||||
* @return mixed
|
||||
*/
|
||||
abstract protected function disableLogging();
|
||||
|
||||
/**
|
||||
* Perform actual population.
|
||||
*
|
||||
* @param array $options
|
||||
* @param \Closure $loggerClosure
|
||||
*/
|
||||
abstract protected function doPopulate($options, \Closure $loggerClosure = null);
|
||||
|
||||
/**
|
||||
* Reenables the logger with the previously returned logger from disableLogging();.
|
||||
*
|
||||
* @param mixed $logger
|
||||
*
|
||||
* @return mixed
|
||||
*/
|
||||
abstract protected function enableLogging($logger);
|
||||
|
||||
/**
|
||||
* Configures the option resolver.
|
||||
*/
|
||||
protected function configureOptions()
|
||||
{
|
||||
$this->resolver->setDefaults(array(
|
||||
'batch_size' => 100,
|
||||
), $options);
|
||||
'skip_indexable_check' => false,
|
||||
));
|
||||
|
||||
$this->resolver->setRequired(array(
|
||||
'indexName',
|
||||
'typeName',
|
||||
));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Filters objects away if they are not indexable.
|
||||
*
|
||||
* @param array $options
|
||||
* @param array $objects
|
||||
* @return array
|
||||
*/
|
||||
protected function filterObjects(array $options, array $objects)
|
||||
{
|
||||
if ($options['skip_indexable_check']) {
|
||||
return $objects;
|
||||
}
|
||||
|
||||
$index = $options['indexName'];
|
||||
$type = $options['typeName'];
|
||||
|
||||
$return = array();
|
||||
foreach ($objects as $object) {
|
||||
if (!$this->indexable->isObjectIndexable($index, $type, $object)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$return[] = $object;
|
||||
}
|
||||
|
||||
return $return;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if a given object should be indexed or not.
|
||||
*
|
||||
* @deprecated To be removed in 4.0
|
||||
*
|
||||
* @param object $object
|
||||
*
|
||||
* @return bool
|
||||
|
@ -62,8 +156,8 @@ abstract class AbstractProvider implements ProviderInterface
|
|||
protected function isObjectIndexable($object)
|
||||
{
|
||||
return $this->indexable->isObjectIndexable(
|
||||
$this->options['indexName'],
|
||||
$this->options['typeName'],
|
||||
$this->baseOptions['indexName'],
|
||||
$this->baseOptions['typeName'],
|
||||
$object
|
||||
);
|
||||
}
|
||||
|
@ -82,4 +176,17 @@ abstract class AbstractProvider implements ProviderInterface
|
|||
|
||||
return sprintf('(RAM : current=%uMo peak=%uMo)', $memory, $memoryMax);
|
||||
}
|
||||
|
||||
/**
|
||||
* Merges the base options provided by the class with options passed to the populate
|
||||
* method and runs them through the resolver.
|
||||
*
|
||||
* @param array $options
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
protected function resolveOptions(array $options)
|
||||
{
|
||||
return $this->resolver->resolve(array_merge($this->baseOptions, $options));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -252,7 +252,7 @@ class AbstractProviderTest extends \PHPUnit_Framework_TestCase
|
|||
|
||||
$this->setExpectedException('Elastica\Exception\Bulk\ResponseException');
|
||||
|
||||
$provider->populate(null, array('ignore-errors' => false));
|
||||
$provider->populate(null, array('ignore_errors' => false));
|
||||
}
|
||||
|
||||
public function testPopulateRunsIndexCallable()
|
||||
|
@ -280,7 +280,7 @@ class AbstractProviderTest extends \PHPUnit_Framework_TestCase
|
|||
|
||||
$this->objectPersister->expects($this->once())
|
||||
->method('insertMany')
|
||||
->with(array(1 => 2));
|
||||
->with(array(2));
|
||||
|
||||
$provider->populate();
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue