[Provider] Change provider construction (possible BC break)

* Created AbstractProvider class (for all DB services), which handles the default batch_size option.
 * The logger Closure is now optional for populate().
 * Removed unused Elastica_Type argument from Provider constructors.
 * Added unit tests for Doctrine's AbstractProvider class.
 * The extra argument (ManagerRegistry) for Doctrine providers is now an appended constructor argument, so the extension no longer needs to use different replacement indexes for Propel/Doctrine providers.
This commit is contained in:
Jeremy Mikola 2012-03-09 18:51:08 -05:00
parent 89a368ae35
commit e09225eb09
12 changed files with 350 additions and 182 deletions

View file

@ -281,26 +281,14 @@ class FOQElasticaExtension extends Extension
if (isset($typeConfig['provider']['service'])) {
return $typeConfig['provider']['service'];
}
$abstractProviderId = sprintf('foq_elastica.provider.prototype.%s', $typeConfig['driver']);
$providerId = sprintf('foq_elastica.provider.%s.%s', $indexName, $typeName);
$providerDef = new DefinitionDecorator($abstractProviderId);
$providerDef = new DefinitionDecorator('foq_elastica.provider.prototype.' . $typeConfig['driver']);
$providerDef->addTag('foq_elastica.provider', array('index' => $indexName, 'type' => $typeName));
$providerDef->replaceArgument(0, $typeDef);
// Doctrine has a mandatory service as second argument
$argPos = ('propel' === $typeConfig['driver']) ? 1 : 2;
$providerDef->replaceArgument($argPos, new Reference($objectPersisterId));
$providerDef->replaceArgument($argPos + 1, $typeConfig['model']);
$options = array('batch_size' => $typeConfig['provider']['batch_size']);
if ('propel' !== $typeConfig['driver']) {
$options['query_builder_method'] = $typeConfig['provider']['query_builder_method'];
$options['clear_object_manager'] = $typeConfig['provider']['clear_object_manager'];
}
$providerDef->replaceArgument($argPos + 2, $options);
$providerDef->replaceArgument(0, new Reference($objectPersisterId));
$providerDef->replaceArgument(1, $typeConfig['model']);
// Propel provider can simply ignore Doctrine-specific options
$providerDef->replaceArgument(2, array_diff_key($typeConfig['provider'], array('service' => 1)));
$container->setDefinition($providerId, $providerDef);
return $providerId;

View file

@ -2,106 +2,85 @@
namespace FOQ\ElasticaBundle\Doctrine;
use FOQ\ElasticaBundle\Provider\ProviderInterface;
use Doctrine\Common\Persistence\ManagerRegistry;
use FOQ\ElasticaBundle\Persister\ObjectPersisterInterface;
use Elastica_Type;
use Elastica_Document;
use Closure;
use InvalidArgumentException;
use FOQ\ElasticaBundle\Provider\AbstractProvider as BaseAbstractProvider;
abstract class AbstractProvider implements ProviderInterface
abstract class AbstractProvider extends BaseAbstractProvider
{
/**
* Elastica type
*
* @var Elastica_Type
*/
protected $type;
protected $managerRegistry;
/**
* Manager registry
* Constructor.
*
* @var object
* @param ObjectPersisterInterface $objectPersister
* @param string $objectClass
* @param array $options
* @param ManagerRegistry $managerRegistry
*/
protected $registry;
/**
* Object persister
*
* @var ObjectPersisterInterface
*/
protected $objectPersister;
/**
* Provider options
*
* @var array
*/
protected $options = array(
'batch_size' => 100,
'clear_object_manager' => true,
'query_builder_method' => 'createQueryBuilder'
);
public function __construct(Elastica_Type $type, $registry, ObjectPersisterInterface $objectPersister, $objectClass, array $options = array())
public function __construct(ObjectPersisterInterface $objectPersister, $objectClass, array $options, $managerRegistry)
{
$this->type = $type;
$this->registry = $registry;
$this->objectClass = $objectClass;
$this->objectPersister = $objectPersister;
$this->options = array_merge($this->options, $options);
parent::__construct($objectPersister, $objectClass, array_merge(array(
'clear_object_manager' => true,
'query_builder_method' => 'createQueryBuilder',
), $options));
$this->managerRegistry = $managerRegistry;
}
/**
* Insert the repository objects in the type index
*
* @param Closure $loggerClosure
* @see FOQ\ElasticaBundle\Provider\ProviderInterface::populate()
*/
public function populate(Closure $loggerClosure)
public function populate(\Closure $loggerClosure = null)
{
$queryBuilder = $this->createQueryBuilder();
$nbObjects = $this->countObjects($queryBuilder);
$nbObjects = $this->countObjects($queryBuilder);
for ($offset = 0; $offset < $nbObjects; $offset += $this->options['batch_size']) {
if ($loggerClosure) {
$stepStartTime = microtime(true);
}
$stepStartTime = microtime(true);
$objects = $this->fetchSlice($queryBuilder, $this->options['batch_size'], $offset);
$this->objectPersister->insertMany($objects);
if ($this->options['clear_object_manager']) {
$this->registry->getManagerForClass($this->objectClass)->clear();
$this->managerRegistry->getManagerForClass($this->objectClass)->clear();
}
$stepNbObjects = count($objects);
$stepCount = $stepNbObjects+$offset;
$objectsPerSecond = $stepNbObjects / (microtime(true) - $stepStartTime);
$loggerClosure(sprintf('%0.1f%% (%d/%d), %d objects/s', 100*$stepCount/$nbObjects, $stepCount, $nbObjects, $objectsPerSecond));
if ($loggerClosure) {
$stepNbObjects = count($objects);
$stepCount = $stepNbObjects + $offset;
$percentComplete = 100 * $stepCount / $nbObjects;
$objectsPerSecond = $stepNbObjects / (microtime(true) - $stepStartTime);
$loggerClosure(sprintf('%0.1f%% (%d/%d), %d objects/s', $percentComplete, $stepCount, $nbObjects, $objectsPerSecond));
}
}
}
/**
* Counts the objects of a query builder
* Counts objects that would be indexed using the query builder.
*
* @param queryBuilder
* @return int
**/
* @param object $queryBuilder
* @return integer
*/
protected abstract function countObjects($queryBuilder);
/**
* Fetches a slice of objects
* Fetches a slice of objects using the query builder.
*
* @param queryBuilder
* @param int limit
* @param int offset
* @return array of objects
**/
* @param object $queryBuilder
* @param integer $limit
* @param integer $offset
* @return array
*/
protected abstract function fetchSlice($queryBuilder, $limit, $offset);
/**
* Creates the query builder used to fetch the documents to index
* Creates the query builder, which will be used to fetch objects to index.
*
* @return query builder
**/
* @return object
*/
protected abstract function createQueryBuilder();
}

View file

@ -2,42 +2,49 @@
namespace FOQ\ElasticaBundle\Doctrine\MongoDB;
use Doctrine\ODM\MongoDB\Query\Builder;
use FOQ\ElasticaBundle\Doctrine\AbstractProvider;
use FOQ\ElasticaBundle\Exception\InvalidArgumentTypeException;
class Provider extends AbstractProvider
{
/**
* Counts the objects of a query builder
*
* @param queryBuilder
* @return int
**/
* @see FOQ\ElasticaBundle\Doctrine\AbstractProvider::countObjects()
*/
protected function countObjects($queryBuilder)
{
return $queryBuilder->getQuery()->count();
if (!$queryBuilder instanceof Builder) {
throw new InvalidArgumentTypeException($queryBuilder, 'Doctrine\ODM\MongoDB\Query\Builder');
}
return $queryBuilder
->getQuery()
->count();
}
/**
* Fetches a slice of objects
*
* @param queryBuilder
* @param int limit
* @param int offset
* @return array of objects
**/
* @see FOQ\ElasticaBundle\Doctrine\AbstractProvider::fetchSlice()
*/
protected function fetchSlice($queryBuilder, $limit, $offset)
{
return $queryBuilder->limit($limit)->skip($offset)->getQuery()->execute()->toArray();
if (!$queryBuilder instanceof Builder) {
throw new InvalidArgumentTypeException($queryBuilder, 'Doctrine\ODM\MongoDB\Query\Builder');
}
return $queryBuilder
->limit($limit)
->skip($offset)
->getQuery()
->execute()
->toArray();
}
/**
* Creates the query builder used to fetch the documents to index
*
* @return query builder
**/
* @see FOQ\ElasticaBundle\Doctrine\AbstractProvider::createQueryBuilder()
*/
protected function createQueryBuilder()
{
return $this->registry
return $this->managerRegistry
->getManagerForClass($this->objectClass)
->getRepository($this->objectClass)
->{$this->options['query_builder_method']}();

View file

@ -2,51 +2,59 @@
namespace FOQ\ElasticaBundle\Doctrine\ORM;
use Doctrine\ORM\QueryBuilder;
use FOQ\ElasticaBundle\Doctrine\AbstractProvider;
use FOQ\ElasticaBundle\Exception\InvalidArgumentTypeException;
class Provider extends AbstractProvider
{
/**
* Counts the objects of a query builder
*
* @param queryBuilder
* @return int
**/
* @see FOQ\ElasticaBundle\Doctrine\AbstractProvider::countObjects()
*/
protected function countObjects($queryBuilder)
{
$qb = clone $queryBuilder;
$qb->select($qb->expr()->count($queryBuilder->getRootAlias()))
->resetDQLPart('orderBy'); // no need to order the query. It does not change the count and make the query less efficient.
if (!$queryBuilder instanceof QueryBuilder) {
throw new InvalidArgumentTypeException($queryBuilder, 'Doctrine\ORM\QueryBuilder');
}
return $qb->getQuery()->getSingleScalarResult();
/* Clone the query builder before altering its field selection and DQL,
* lest we leave the query builder in a bad state for fetchSlice().
*/
$qb = clone $queryBuilder;
return $qb
->select($qb->expr()->count($queryBuilder->getRootAlias()))
// Remove ordering for efficiency; it doesn't affect the count
->resetDQLPart('orderBy')
->getQuery()
->getSingleScalarResult();
}
/**
* Fetches a slice of objects
*
* @param queryBuilder
* @param int limit
* @param int offset
* @return array of objects
**/
* @see FOQ\ElasticaBundle\Doctrine\AbstractProvider::fetchSlice()
*/
protected function fetchSlice($queryBuilder, $limit, $offset)
{
$queryBuilder->setFirstResult($offset);
$queryBuilder->setMaxResults($limit);
if (!$queryBuilder instanceof QueryBuilder) {
throw new InvalidArgumentTypeException($queryBuilder, 'Doctrine\ORM\QueryBuilder');
}
return $queryBuilder->getQuery()->getResult();
return $queryBuilder
->setFirstResult($offset)
->setMaxResults($limit)
->getQuery()
->getResult();
}
/**
* Creates the query builder used to fetch the documents to index
*
* @return query builder
**/
* @see FOQ\ElasticaBundle\Doctrine\AbstractProvider::createQueryBuilder()
*/
protected function createQueryBuilder()
{
return $this->registry
return $this->managerRegistry
->getManagerForClass($this->objectClass)
->getRepository($this->objectClass)
// ORM query builders require an alias argument
->{$this->options['query_builder_method']}('a');
}
}

View file

@ -0,0 +1,11 @@
<?php
namespace FOQ\ElasticaBundle\Exception;
class InvalidArgumentTypeException extends \InvalidArgumentException
{
public function __construct($value, $expectedType)
{
parent::__construct(sprintf('Expected argument of type "%s", "%s" given', $expectedType, is_object($value) ? get_class($value) : gettype($value)));
}
}

View file

@ -2,64 +2,28 @@
namespace FOQ\ElasticaBundle\Propel;
use FOQ\ElasticaBundle\Provider\ProviderInterface;
use FOQ\ElasticaBundle\Persister\ObjectPersisterInterface;
use Elastica_Type;
use Elastica_Document;
use Closure;
use InvalidArgumentException;
use FOQ\ElasticaBundle\Provider\AbstractProvider;
/**
* Propel provider
*
* @author William Durand <william.durand1@gmail.com>
*/
class Provider implements ProviderInterface
class Provider extends AbstractProvider
{
/**
* Elastica type
*
* @var Elastica_Type
* @see FOQ\ElasticaBundle\Provider\ProviderInterface::populate()
*/
protected $type;
/**
* Object persister
*
* @var ObjectPersisterInterface
*/
protected $objectPersister;
/**
* Provider options
*
* @var array
*/
protected $options = array(
'batch_size' => 100,
);
public function __construct(Elastica_Type $type, ObjectPersisterInterface $objectPersister, $objectClass, array $options = array())
{
$this->type = $type;
$this->objectClass = $objectClass;
$this->objectPersister = $objectPersister;
$this->options = array_merge($this->options, $options);
}
/**
* Insert the repository objects in the type index
*
* @param Closure $loggerClosure
*/
public function populate(Closure $loggerClosure)
public function populate(\Closure $loggerClosure = null)
{
$queryClass = $this->objectClass . 'Query';
$nbObjects = $queryClass::create()->count();
$nbObjects = $queryClass::create()->count();
for ($offset = 0; $offset < $nbObjects; $offset += $this->options['batch_size']) {
if ($loggerClosure) {
$stepStartTime = microtime(true);
}
$stepStartTime = microtime(true);
$objects = $queryClass::create()
->limit($this->options['batch_size'])
->offset($offset)
@ -67,10 +31,13 @@ class Provider implements ProviderInterface
$this->objectPersister->insertMany($objects->getArrayCopy());
$stepNbObjects = count($objects);
$stepCount = $stepNbObjects+$offset;
$objectsPerSecond = $stepNbObjects / (microtime(true) - $stepStartTime);
$loggerClosure(sprintf('%0.1f%% (%d/%d), %d objects/s', 100*$stepCount/$nbObjects, $stepCount, $nbObjects, $objectsPerSecond));
if ($loggerClosure) {
$stepNbObjects = count($objects);
$stepCount = $stepNbObjects + $offset;
$percentComplete = 100 * $stepCount / $nbObjects;
$objectsPerSecond = $stepNbObjects / (microtime(true) - $stepStartTime);
$loggerClosure(sprintf('%0.1f%% (%d/%d), %d objects/s', $percentComplete, $stepCount, $nbObjects, $objectsPerSecond));
}
}
}
}

View file

@ -0,0 +1,30 @@
<?php
namespace FOQ\ElasticaBundle\Provider;
use FOQ\ElasticaBundle\Persister\ObjectPersisterInterface;
use FOQ\ElasticaBundle\Provider\ProviderInterface;
abstract class AbstractProvider implements ProviderInterface
{
protected $objectClass;
protected $objectPersister;
protected $options;
/**
* Constructor.
*
* @param ObjectPersisterInterface $objectPersister
* @param string $objectClass
* @param array $options
*/
public function __construct(ObjectPersisterInterface $objectPersister, $objectClass, array $options = array())
{
$this->objectPersister = $objectPersister;
$this->objectClass = $objectClass;
$this->options = array_merge(array(
'batch_size' => 100,
), $options);
}
}

View file

@ -2,8 +2,6 @@
namespace FOQ\ElasticaBundle\Provider;
use Closure;
/**
* Insert application domain objects into elastica types
*
@ -12,9 +10,9 @@ use Closure;
interface ProviderInterface
{
/**
* Add all domain objects of a repository to the elastica type
* Persists all domain objects to ElasticSearch for this provider.
*
* @param Closure $loggerClosure
*/
function populate(Closure $loggerClosure);
function populate(\Closure $loggerClosure = null);
}

View file

@ -7,11 +7,10 @@
<services>
<service id="foq_elastica.provider.prototype.mongodb" class="FOQ\ElasticaBundle\Doctrine\MongoDB\Provider" public="false" abstract="true">
<argument /> <!-- type -->
<argument type="service" id="doctrine.odm.mongodb" />
<argument /> <!-- object persister -->
<argument /> <!-- model -->
<argument type="collection" /> <!-- options -->
<argument type="service" id="doctrine.odm.mongodb" />
</service>
<service id="foq_elastica.listener.prototype.mongodb" class="FOQ\ElasticaBundle\Doctrine\MongoDB\Listener" public="false" abstract="true">

View file

@ -7,11 +7,10 @@
<services>
<service id="foq_elastica.provider.prototype.orm" class="FOQ\ElasticaBundle\Doctrine\ORM\Provider" public="false" abstract="true">
<argument /> <!-- type -->
<argument type="service" id="doctrine" />
<argument /> <!-- object persister -->
<argument /> <!-- model -->
<argument type="collection" /> <!-- options -->
<argument type="service" id="doctrine" />
</service>
<service id="foq_elastica.listener.prototype.orm" class="FOQ\ElasticaBundle\Doctrine\ORM\Listener" public="false" abstract="true">

View file

@ -6,7 +6,6 @@
<services>
<service id="foq_elastica.provider.prototype.propel" class="FOQ\ElasticaBundle\Propel\Provider" public="false" abstract="true">
<argument /> <!-- type -->
<argument /> <!-- object persister -->
<argument /> <!-- model -->
<argument type="collection" /> <!-- options -->

View file

@ -0,0 +1,183 @@
<?php
namespace FOQ\ElasticaBundle\Tests\Doctrine;
class AbstractProviderTest extends \PHPUnit_Framework_TestCase
{
private $objectClass;
private $objectManager;
private $objectPersister;
private $options;
private $managerRegistry;
public function setUp()
{
if (!interface_exists('Doctrine\Common\Persistence\ManagerRegistry')) {
$this->markTestSkipped('Doctrine Common is not available.');
}
$this->objectClass = 'objectClass';
$this->options = array();
$this->objectPersister = $this->getMockObjectPersister();
$this->managerRegistry = $this->getMockManagerRegistry();
$this->objectManager = $this->getMockObjectManager();
$this->managerRegistry->expects($this->any())
->method('getManagerForClass')
->with($this->objectClass)
->will($this->returnValue($this->objectManager));
}
/**
* @dataProvider providePopulateIterations
*/
public function testPopulateIterations($nbObjects, $objectsByIteration, $batchSize)
{
$this->options['batch_size'] = $batchSize;
$provider = $this->getMockAbstractProvider();
$queryBuilder = new \stdClass();
$provider->expects($this->once())
->method('createQueryBuilder')
->will($this->returnValue($queryBuilder));
$provider->expects($this->once())
->method('countObjects')
->with($queryBuilder)
->will($this->returnValue($nbObjects));
$providerInvocationOffset = 2;
foreach ($objectsByIteration as $i => $objects) {
$offset = $objects[0] - 1;
$provider->expects($this->at($providerInvocationOffset + $i))
->method('fetchSlice')
->with($queryBuilder, $batchSize, $offset)
->will($this->returnValue($objects));
$this->objectPersister->expects($this->at($i))
->method('insertMany')
->with($objects);
$this->objectManager->expects($this->at($i))
->method('clear');
}
$provider->populate();
}
public function providePopulateIterations()
{
return array(
array(
100,
array(range(1,100)),
100,
),
array(
105,
array(range(1, 50), range(51, 100), range(101, 105)),
50,
),
);
}
public function testPopulateShouldNotClearsObjectManager()
{
$nbObjects = 1;
$objects = array(1);
$this->options['clear_object_manager'] = false;
$provider = $this->getMockAbstractProvider();
$provider->expects($this->any())
->method('countObjects')
->will($this->returnValue($nbObjects));
$provider->expects($this->any())
->method('fetchSlice')
->will($this->returnValue($objects));
$this->objectManager->expects($this->never())
->method('clear');
$provider->populate();
}
public function testPopulateInvokesLoggerClosure()
{
$nbObjects = 1;
$objects = array(1);
$provider = $this->getMockAbstractProvider();
$provider->expects($this->any())
->method('countObjects')
->will($this->returnValue($nbObjects));
$provider->expects($this->any())
->method('fetchSlice')
->will($this->returnValue($objects));
$loggerClosureInvoked = false;
$loggerClosure = function () use (&$loggerClosureInvoked) {
$loggerClosureInvoked = true;
};
$provider->populate();
$this->assertFalse($loggerClosureInvoked);
$provider->populate($loggerClosure);
$this->assertTrue($loggerClosureInvoked);
}
/**
* @return FOQ\ElasticaBundle\Doctrine\AbstractProvider
*/
private function getMockAbstractProvider()
{
return $this->getMockForAbstractClass('FOQ\ElasticaBundle\Doctrine\AbstractProvider', array(
$this->objectPersister,
$this->objectClass,
$this->options,
$this->managerRegistry,
));
}
/**
* @return Doctrine\Common\Persistence\ManagerRegistry
*/
private function getMockManagerRegistry()
{
return $this->getMock('Doctrine\Common\Persistence\ManagerRegistry');
}
/**
* @return FOQ\ElasticaBundle\Tests\Doctrine\ObjectManager
*/
private function getMockObjectManager()
{
return $this->getMock(__NAMESPACE__ . '\ObjectManager');
}
/**
* @return FOQ\ElasticaBundle\Persister\ObjectPersisterInterface
*/
private function getMockObjectPersister()
{
return $this->getMock('FOQ\ElasticaBundle\Persister\ObjectPersisterInterface');
}
}
/**
* Doctrine\Common\Persistence\ObjectManager does not include a clear() method
* in its interface, so create a new interface for mocking.
*/
interface ObjectManager
{
function clear();
}