Doctrine integration refactoring, adds realtime index updates

This commit is contained in:
ornicar 2011-06-07 11:13:34 -07:00
parent 09fe17d36b
commit 6fee4131f4
15 changed files with 508 additions and 58 deletions

View file

@ -82,6 +82,14 @@ class Configuration
->scalarNode('service')->end()
->end()
->end()
->arrayNode('listener')
->children()
->scalarNode('insert')->defaultTrue()->end()
->scalarNode('update')->defaultTrue()->end()
->scalarNode('delete')->defaultTrue()->end()
->scalarNode('service')->end()
->end()
->end()
->arrayNode('finder')
->children()
->scalarNode('service')->end()

View file

@ -167,14 +167,18 @@ class FOQElasticaExtension extends Extension
$elasticaToModelTransformerId = $this->loadElasticaToModelTransformer($typeConfig, $container, $indexName, $typeName);
$modelToElasticaTransformerId = $this->loadModelToElasticaTransformer($typeConfig, $container, $indexName, $typeName);
$objectPersisterId = $this->loadObjectPersister($typeConfig, $typeDef, $container, $indexName, $typeName, $modelToElasticaTransformerId);
if (isset($typeConfig['provider'])) {
$providerId = $this->loadTypeProvider($typeConfig, $container, $modelToElasticaTransformerId, $typeDef, $indexName, $typeName);
$providerId = $this->loadTypeProvider($typeConfig, $container, $objectPersisterId, $typeDef, $indexName, $typeName);
$container->getDefinition('foq_elastica.populator')->addMethodCall('addProvider', array($providerId, new Reference($providerId)));
}
if (isset($typeConfig['finder'])) {
$this->loadTypeFinder($typeConfig, $container, $elasticaToModelTransformerId, $typeDef, $indexName, $typeName);
}
if (isset($typeConfig['listener'])) {
$this->loadTypeListener($typeConfig, $container, $objectPersisterId, $typeDef, $indexName, $typeName);
}
}
protected function loadElasticaToModelTransformer(array $typeConfig, ContainerBuilder $container, $indexName, $typeName)
@ -211,7 +215,22 @@ class FOQElasticaExtension extends Extension
return $serviceId;
}
protected function loadTypeProvider(array $typeConfig, ContainerBuilder $container, $modelToElasticaTransformerId, $typeDef, $indexName, $typeName)
protected function loadObjectPersister(array $typeConfig, Definition $typeDef, ContainerBuilder $container, $indexName, $typeName, $transformerId)
{
$abstractId = sprintf('foq_elastica.object_persister.prototype');
$serviceId = sprintf('foq_elastica.object_persister.%s.%s', $indexName, $typeName);
$serviceDef = new DefinitionDecorator($abstractId);
$serviceDef->replaceArgument(0, $typeDef);
$serviceDef->replaceArgument(1, new Reference($transformerId));
$serviceDef->replaceArgument(2, $typeConfig['model']);
$serviceDef->replaceArgument(3, new Reference('foq_elastica.type_inspector'));
$serviceDef->replaceArgument(4, new Reference('logger'));
$container->setDefinition($serviceId, $serviceDef);
return $serviceId;
}
protected function loadTypeProvider(array $typeConfig, ContainerBuilder $container, $objectPersisterId, $typeDef, $indexName, $typeName)
{
if (isset($typeConfig['provider']['service'])) {
return $typeConfig['provider']['service'];
@ -220,7 +239,7 @@ class FOQElasticaExtension extends Extension
$providerId = sprintf('foq_elastica.provider.%s.%s', $indexName, $typeName);
$providerDef = new DefinitionDecorator($abstractProviderId);
$providerDef->replaceArgument(0, $typeDef);
$providerDef->replaceArgument(2, new Reference($modelToElasticaTransformerId));
$providerDef->replaceArgument(2, new Reference($objectPersisterId));
$providerDef->replaceArgument(3, $typeConfig['model']);
$providerDef->replaceArgument(4, array(
'query_builder_method' => $typeConfig['provider']['query_builder_method'],
@ -232,6 +251,33 @@ class FOQElasticaExtension extends Extension
return $providerId;
}
protected function loadTypeListener(array $typeConfig, ContainerBuilder $container, $objectPersisterId, $typeDef, $indexName, $typeName)
{
if (isset($typeConfig['listener']['service'])) {
return $typeConfig['listener']['service'];
}
$abstractListenerId = sprintf('foq_elastica.listener.prototype.%s', $typeConfig['driver']);
$listenerId = sprintf('foq_elastica.listener.%s.%s', $indexName, $typeName);
$listenerDef = new DefinitionDecorator($abstractListenerId);
$listenerDef->replaceArgument(0, new Reference($objectPersisterId));
$listenerDef->replaceArgument(1, $typeConfig['model']);
$events = array();
$doctrineEvents = array('insert' => 'postPersist', 'update' => 'postUpdate', 'delete' => 'postRemove');
foreach ($doctrineEvents as $event => $doctrineEvent) {
if (isset($typeConfig['listener'][$event]) && $typeConfig['listener'][$event]) {
$events[] = $doctrineEvent;
}
}
$listenerDef->replaceArgument(2, $events);
switch ($typeConfig['driver']) {
case 'orm': $listenerDef->addTag('doctrine.event_subscriber'); break;
case 'mongodb': $listenerDef->addTag('doctrine.common.event_subscriber'); break;
}
$container->setDefinition($listenerId, $listenerDef);
return $listenerId;
}
protected function loadTypeFinder(array $typeConfig, ContainerBuilder $container, $elasticaToModelId, $typeDef, $indexName, $typeName)
{
if (isset($typeConfig['finder']['service'])) {

View file

@ -0,0 +1,47 @@
<?php
namespace FOQ\ElasticaBundle\Doctrine;
use FOQ\ElasticaBundle\Persister\ObjectPersister;
abstract class AbstractListener
{
/**
* Object persister
*
* @var ObjectPersister
*/
protected $objectPersister;
/**
* Class of the domain model
*
* @var string
*/
protected $objectClass;
/**
* List of subscribed events
*
* @var array
*/
protected $events;
/**
* Constructor
**/
public function __construct(ObjectPersister $objectPersister, $objectClass, array $events)
{
$this->objectPersister = $objectPersister;
$this->objectClass = $objectClass;
$this->events = $events;
}
/**
* @return array
*/
public function getSubscribedEvents()
{
return $this->events;
}
}

View file

@ -3,32 +3,53 @@
namespace FOQ\ElasticaBundle\Doctrine;
use FOQ\ElasticaBundle\Provider\ProviderInterface;
use FOQ\ElasticaBundle\Transformer\ModelToElasticaTransformerInterface;
use FOQ\ElasticaBundle\Persister\ObjectPersisterInterface;
use Elastica_Type;
use Elastica_Document;
use Closure;
use InvalidArgumentException;
use FOQ\ElasticaBundle\Provider\NotIndexableException;
abstract class AbstractProvider implements ProviderInterface
{
/**
* Elastica type
*
* @var Elastica_Type
*/
protected $type;
/**
* Domain model object manager
*
* @var object
*/
protected $objectManager;
protected $objectClass;
protected $transformer;
/**
* Object persister
*
* @var ObjectPersisterInterface
*/
protected $objectPersister;
/**
* Provider options
*
* @var array
*/
protected $options = array(
'batch_size' => 100,
'clear_object_manager' => true,
'query_builder_method' => 'createQueryBuilder'
);
public function __construct(Elastica_Type $type, $objectManager, ModelToElasticaTransformerInterface $transformer, $objectClass, array $options = array())
public function __construct(Elastica_Type $type, $objectManager, ObjectPersisterInterface $objectPersister, $objectClass, array $options = array())
{
$this->type = $type;
$this->objectManager = $objectManager;
$this->objectClass = $objectClass;
$this->transformer = $transformer;
$this->options = array_merge($this->options, $options);
$this->type = $type;
$this->objectManager = $objectManager;
$this->objectClass = $objectClass;
$this->objectPersister = $objectPersister;
$this->options = array_merge($this->options, $options);
}
/**
@ -40,7 +61,6 @@ abstract class AbstractProvider implements ProviderInterface
{
$queryBuilder = $this->createQueryBuilder();
$nbObjects = $this->countObjects($queryBuilder);
$fields = $this->extractTypeFields();
for ($offset = 0; $offset < $nbObjects; $offset += $this->options['batch_size']) {
@ -48,14 +68,7 @@ abstract class AbstractProvider implements ProviderInterface
$documents = array();
$objects = $this->fetchSlice($queryBuilder, $this->options['batch_size'], $offset);
foreach ($objects as $object) {
try {
$documents[] = $this->transformer->transform($object, $fields);
} catch (NotIndexableException $e) {
// skip document
}
}
$this->type->addDocuments($documents);
$this->objectPersister->insertMany($objects);
if ($this->options['clear_object_manager']) {
$this->objectManager->clear();
@ -92,21 +105,4 @@ abstract class AbstractProvider implements ProviderInterface
* @return query builder
**/
protected abstract function createQueryBuilder();
protected function extractTypeFields()
{
$mappings = $this->type->getMapping();
// skip index and type name
// < 0.16.0 has both index and type levels
// >= 0.16.0 has only type level
do {
$mappings = reset($mappings);
} while (!isset($mappings['properties']));
$mappings = $mappings['properties'];
if (array_key_exists('__isInitialized__', $mappings)) {
unset($mappings['__isInitialized__']);
}
return array_keys($mappings);
}
}

View file

@ -0,0 +1,37 @@
<?php
namespace FOQ\ElasticaBundle\Doctrine\MongoDB;
use FOQ\ElasticaBundle\Doctrine\AbstractListener;
use Doctrine\ODM\MongoDB\Event\LifecycleEventArgs;
use Doctrine\Common\EventSubscriber;
class Listener extends AbstractListener implements EventSubscriber
{
public function postPersist(LifecycleEventArgs $eventArgs)
{
$document = $eventArgs->getDocument();
if ($document instanceof $this->objectClass) {
$this->objectPersister->insertOne($document);
}
}
public function postUpdate(LifecycleEventArgs $eventArgs)
{
$document = $eventArgs->getDocument();
if ($document instanceof $this->objectClass) {
$this->objectPersister->replaceOne($document);
}
}
public function postRemove(LifecycleEventArgs $eventArgs)
{
$document = $eventArgs->getDocument();
if ($document instanceof $this->objectClass) {
$this->objectPersister->deleteOne($document);
}
}
}

37
Doctrine/ORM/Listener.php Normal file
View file

@ -0,0 +1,37 @@
<?php
namespace FOQ\ElasticaBundle\Doctrine\ORM;
use FOQ\ElasticaBundle\Doctrine\AbstractListener;
use Doctrine\ORM\Event\LifecycleEventArgs;
use Doctrine\Common\EventSubscriber;
class Listener extends AbstractListener implements EventSubscriber
{
public function postPersist(LifecycleEventArgs $eventArgs)
{
$entity = $eventArgs->getEntity();
if ($entity instanceof $this->objectClass) {
$this->objectPersister->insertOne($entity);
}
}
public function postUpdate(LifecycleEventArgs $eventArgs)
{
$entity = $eventArgs->getEntity();
if ($entity instanceof $this->objectClass) {
$this->objectPersister->replaceOne($entity);
}
}
public function postRemove(LifecycleEventArgs $eventArgs)
{
$entity = $eventArgs->getEntity();
if ($entity instanceof $this->objectClass) {
$this->objectPersister->deleteOne($entity);
}
}
}

View file

@ -0,0 +1,145 @@
<?php
namespace FOQ\ElasticaBundle\Persister;
use FOQ\ElasticaBundle\Provider\ProviderInterface;
use FOQ\ElasticaBundle\Transformer\ModelToElasticaTransformerInterface;
use Elastica_Type;
use Elastica_Document;
use FOQ\ElasticaBundle\TypeInspector;
use Monolog\Logger;
use Exception;
/**
* Inserts, replaces and deletes single documents in an elastica type
* Accepts domain model objects and converts them to elastica documents
*
* @author Thibault Duplessis <thibault.duplessis@gmail.com>
*/
class ObjectPersister implements ObjectPersisterInterface
{
protected $type;
protected $transformer;
protected $objectClass;
protected $typeInspector;
protected $logger;
protected $throwExceptions;
protected $fields;
public function __construct(Elastica_Type $type, ModelToElasticaTransformerInterface $transformer, $objectClass, TypeInspector $typeInspector, Logger $logger, $throwExceptions = true)
{
$this->type = $type;
$this->transformer = $transformer;
$this->objectClass = $objectClass;
$this->typeInspector = $typeInspector;
$this->logger = $logger;
$this->throwExceptions = true;
}
/**
* Insert one object into the type
* The object will be transformed to an elastica document
*
* @param object $object
*/
public function insertOne($object)
{
try {
$document = $this->transformToElasticaDocument($object);
$this->type->addDocument($document);
} catch (Exception $e) {
$this->onError($e);
}
}
/**
* Replaces one object in the type
*
* @param object $object
* @return null
**/
public function replaceOne($object)
{
try {
$document = $this->transformToElasticaDocument($object);
$this->type->deleteById($document->getId());
$this->type->addDocument($document);
} catch (Exception $e) {
$this->onError($e);
}
}
/**
* Deletes one object in the type
*
* @param object $object
* @return null
**/
public function deleteOne($object)
{
try {
$document = $this->transformToElasticaDocument($object);
$this->type->deleteById($document->getId());
} catch (Exception $e) {
$this->onError($e);
}
}
/**
* Inserts an array of objects in the type
*
* @param array of domain model objects
**/
public function insertMany(array $objects)
{
try {
$documents = array();
foreach ($objects as $object) {
$documents[] = $this->transformToElasticaDocument($object);
}
$this->type->addDocuments($documents);
} catch (Exception $e) {
$this->onError($e);
}
}
/**
* Transforms an object to an elastica document
*
* @param object $object
* @return Elastica_Document the elastica document
*/
protected function transformToElasticaDocument($object)
{
return $this->transformer->transform($object, $this->getTypeFields());
}
/**
* Gets the list of the type fields
*
* @return array of strings
*/
protected function getTypeFields()
{
if (null === $this->fields) {
$this->fields = $this->typeInspector->getMappingFieldsNames($this->type);
}
return $this->fields;
}
/**
* What to do when an error occurs
*
* @param Exception
**/
protected function onError(Exception $exception)
{
if ($this->throwExceptions) {
throw $exception;
}
$message = sprintf('Elastica object persister failure (%s: %s)', get_class($exception), $exception->getMessage());
$this->logger->addWarning($message);
}
}

View file

@ -0,0 +1,41 @@
<?php
namespace FOQ\ElasticaBundle\Persister;
/**
* Inserts, replaces and deletes single documents in an elastica type
* Accepts domain model objects and converts them to elastica documents
*
* @author Thibault Duplessis <thibault.duplessis@gmail.com>
*/
interface ObjectPersisterInterface
{
/**
* Insert one object into the type
* The object will be transformed to an elastica document
*
* @param object $object
*/
function insertOne($object);
/**
* Replaces one object in the type
*
* @param object $object
**/
function replaceOne($object);
/**
* Deletes one object in the type
*
* @param object $object
**/
function deleteOne($object);
/**
* Inserts an array of objects in the type
*
* @param array of domain model objects
**/
function insertMany(array $objects);
}

View file

@ -1,13 +0,0 @@
<?php
namespace FOQ\ElasticaBundle\Provider;
use RuntimeException;
/**
* Skip a document during population
*/
class NotIndexableException extends RuntimeException
{
}

View file

@ -4,7 +4,17 @@ namespace FOQ\ElasticaBundle\Provider;
use Closure;
/**
* Insert application domain objects into elastica types
*
* @author Thibault Duplessis <thibault.duplessis@gmail.com>
*/
interface ProviderInterface
{
/**
* Add all domain objects of a repository to the elastica type
*
* @param Closure $loggerClosure
*/
function populate(Closure $loggerClosure);
}

View file

@ -127,7 +127,9 @@ some configuration will let ElasticaBundle do it for us.
types:
user:
mappings:
# your mappings
username: { boost: 5 }
firstName: { boost: 3 }
# more mappings...
doctrine:
driver: orm
model: Application\UserBundle\Entity\User
@ -260,3 +262,35 @@ You can even get paginated results!
/** var Zend\Paginator\Paginator */
$userPaginator = $finder->findPaginated('bob');
### Realtime, selective index update
If you use the doctrine integration, you can let ElasticaBundle update the indexes automatically
when an object is added, updated or removed. It uses doctrine lifecycle events.
Declare that you want to update the index in real time:
foq_elastica:
clients:
default: { host: localhost, port: 9200 }
indexes:
website:
client: default
types:
user:
mappings:
# your mappings
doctrine:
driver: orm
model: Application\UserBundle\Entity\User
listener: # by default, listens to "insert", "update" and "delete"
Now the index is automatically updated each time the state of the bound doctrine repository changes.
No need to repopulate the whole "user" index when a new `User` is created.
You can also choose to only listen for some of the events:
doctrine:
listener:
insert: true
update: false
delete: true

View file

@ -30,8 +30,19 @@
<argument /> <!-- mappings -->
</service>
<service id="foq_elastica.type_inspector" class="FOQ\ElasticaBundle\TypeInspector" public="false" />
<service id="foq_elastica.object_persister.prototype" class="FOQ\ElasticaBundle\Persister\ObjectPersister" public="false" abstract="true">
<argument /> <!-- type -->
<argument /> <!-- model to elastica transformer -->
<argument /> <!-- model -->
<argument /> <!-- type inspector -->
<argument /> <!-- logger -->
<argument>%kernel.debug%</argument>
</service>
<service id="foq_elastica.finder.prototype" class="FOQ\ElasticaBundle\Finder\TransformedFinder" public="true" abstract="true">
<argument /> <!-- index -->
<argument /> <!-- searchable -->
<argument /> <!-- transformer -->
</service>

View file

@ -7,13 +7,19 @@
<services>
<service id="foq_elastica.provider.prototype.mongodb" class="FOQ\ElasticaBundle\Doctrine\MongoDB\Provider" public="false" abstract="true">
<argument /> <!-- index -->
<argument /> <!-- type -->
<argument type="service" id="doctrine.odm.mongodb.document_manager" />
<argument /> <!-- transformer -->
<argument /> <!-- object persister -->
<argument /> <!-- model -->
<argument type="collection" /> <!-- options -->
</service>
<service id="foq_elastica.listener.prototype.mongodb" class="FOQ\ElasticaBundle\Doctrine\MongoDB\Listener" public="false" abstract="true">
<argument /> <!-- object persister -->
<argument /> <!-- model -->
<argument type="collection" /> <!-- events -->
</service>
<service id="foq_elastica.elastica_to_model_transformer.prototype.mongodb" class="FOQ\ElasticaBundle\Doctrine\MongoDB\ElasticaToModelTransformer" public="false">
<argument type="service" id="doctrine.odm.mongodb.document_manager" />
<argument /> <!-- model -->

View file

@ -7,13 +7,19 @@
<services>
<service id="foq_elastica.provider.prototype.orm" class="FOQ\ElasticaBundle\Doctrine\ORM\Provider" public="false" abstract="true">
<argument /> <!-- index -->
<argument /> <!-- type -->
<argument type="service" id="doctrine.orm.entity_manager" />
<argument /> <!-- transformer -->
<argument /> <!-- object persister -->
<argument /> <!-- model -->
<argument type="collection" /> <!-- options -->
</service>
<service id="foq_elastica.listener.prototype.orm" class="FOQ\ElasticaBundle\Doctrine\ORM\Listener" public="false" abstract="true">
<argument /> <!-- object persister -->
<argument /> <!-- model -->
<argument type="collection" /> <!-- events -->
</service>
<service id="foq_elastica.elastica_to_model_transformer.prototype.orm" class="FOQ\ElasticaBundle\Doctrine\ORM\ElasticaToModelTransformer" public="false">
<argument type="service" id="doctrine.orm.entity_manager" />
<argument /> <!-- model -->

39
TypeInspector.php Normal file
View file

@ -0,0 +1,39 @@
<?php
namespace FOQ\ElasticaBundle;
use Elastica_Type;
/**
* Extracts the mapping fields from a type
*
* @author Thibault Duplessis <thibault.duplessis@gmail.com>
*/
class TypeInspector
{
/**
* Gets the type mapping fields
*
* @param Elastica_Type $type
* @return array list of fields names
*/
public function getMappingFieldsNames(Elastica_Type $type)
{
$mappings = $type->getMapping();
// skip index and type name
// < 0.16.0 has both index and type levels
// >= 0.16.0 has only type level
do {
$mappings = reset($mappings);
} while (is_array($mappings) && !isset($mappings['properties']));
if (!isset($mappings['properties'])) {
return array();
}
$mappings = $mappings['properties'];
if (array_key_exists('__isInitialized__', $mappings)) {
unset($mappings['__isInitialized__']);
}
return array_keys($mappings);
}
}