Merge pull request #415 from nurikabe/master

Refactoring to update ElasticSearch index in postFush
This commit is contained in:
Tim Nagel 2014-03-17 09:11:43 +11:00
commit 857c1c8e48
13 changed files with 226 additions and 187 deletions

View file

@ -177,6 +177,7 @@ class Configuration implements ConfigurationInterface
->scalarNode('insert')->defaultTrue()->end()
->scalarNode('update')->defaultTrue()->end()
->scalarNode('delete')->defaultTrue()->end()
->scalarNode('persist')->defaultValue('postFlush')->end()
->scalarNode('service')->end()
->variableNode('is_indexable_callback')->defaultNull()->end()
->end()
@ -269,6 +270,7 @@ class Configuration implements ConfigurationInterface
->scalarNode('insert')->defaultTrue()->end()
->scalarNode('update')->defaultTrue()->end()
->scalarNode('delete')->defaultTrue()->end()
->booleanNode('immediate')->defaultFalse()->end()
->scalarNode('service')->end()
->variableNode('is_indexable_callback')->defaultNull()->end()
->end()

View file

@ -448,13 +448,32 @@ class FOSElasticaExtension extends Extension
return $listenerId;
}
/**
* Map Elastica to Doctrine events for the current driver
*/
private function getDoctrineEvents(array $typeConfig)
{
// Flush always calls depending on actions scheduled in lifecycle listeners
$typeConfig['listener']['flush'] = true;
switch ($typeConfig['driver']) {
case 'orm':
$eventsClass = '\Doctrine\ORM\Events';
break;
case 'mongodb':
$eventsClass = '\Doctrine\ODM\MongoDB\Events';
break;
default:
throw new InvalidArgumentException(sprintf('Cannot determine events for driver "%s"', $typeConfig['driver']));
break;
}
$events = array();
$eventMapping = array(
'insert' => array('postPersist'),
'update' => array('postUpdate'),
'delete' => array('postRemove', 'preRemove')
'insert' => array(constant($eventsClass.'::postPersist')),
'update' => array(constant($eventsClass.'::postUpdate')),
'delete' => array(constant($eventsClass.'::preRemove')),
'flush' => array($typeConfig['listener']['immediate'] ? constant($eventsClass.'::preFlush') : constant($eventsClass.'::postFlush'))
);
foreach ($eventMapping as $event => $doctrineEvents) {

View file

@ -2,15 +2,15 @@
namespace FOS\ElasticaBundle\Doctrine;
use Doctrine\Common\EventArgs;
use Doctrine\Common\EventSubscriber;
use Doctrine\Common\Persistence\ObjectManager;
use FOS\ElasticaBundle\Persister\ObjectPersisterInterface;
use FOS\ElasticaBundle\Persister\ObjectPersister;
use Symfony\Component\ExpressionLanguage\Expression;
use Symfony\Component\ExpressionLanguage\ExpressionLanguage;
use Symfony\Component\ExpressionLanguage\SyntaxError;
abstract class AbstractListener implements EventSubscriber
class Listener implements EventSubscriber
{
/**
* Object persister
@ -48,11 +48,11 @@ abstract class AbstractListener implements EventSubscriber
protected $isIndexableCallback;
/**
* Objects scheduled for removal
*
* @var array
* Objects scheduled for insertion, replacement, or removal
*/
private $scheduledForRemoval = array();
public $scheduledForInsertion = array();
public $scheduledForUpdate = array();
public $scheduledForDeletion = array();
/**
* An instance of ExpressionLanguage
@ -149,37 +149,6 @@ abstract class AbstractListener implements EventSubscriber
: call_user_func($this->isIndexableCallback, $object);
}
/**
* Schedules the object for removal.
*
* This is usually called during the pre-remove event.
*
* @param object $object
* @param ObjectManager $objectManager
*/
protected function scheduleForRemoval($object, ObjectManager $objectManager)
{
$metadata = $objectManager->getClassMetadata($this->objectClass);
$esId = $metadata->getFieldValue($object, $this->esIdentifierField);
$this->scheduledForRemoval[spl_object_hash($object)] = $esId;
}
/**
* Removes the object if it was scheduled for removal.
*
* This is usually called during the post-remove event.
*
* @param object $object
*/
protected function removeIfScheduled($object)
{
$objectHash = spl_object_hash($object);
if (isset($this->scheduledForRemoval[$objectHash])) {
$this->objectPersister->deleteById($this->scheduledForRemoval[$objectHash]);
unset($this->scheduledForRemoval[$objectHash]);
}
}
/**
* @param mixed $object
* @return string
@ -207,4 +176,70 @@ abstract class AbstractListener implements EventSubscriber
return $this->expressionLanguage;
}
public function postPersist(EventArgs $eventArgs)
{
$entity = $eventArgs->getEntity();
if ($entity instanceof $this->objectClass && $this->isObjectIndexable($entity)) {
$this->scheduledForInsertion[] = $entity;
}
}
public function postUpdate(EventArgs $eventArgs)
{
$entity = $eventArgs->getEntity();
if ($entity instanceof $this->objectClass) {
if ($this->isObjectIndexable($entity)) {
$this->scheduledForUpdate[] = $entity;
} else {
// Delete if no longer indexable
$this->scheduledForDeletion[] = $entity;
}
}
}
public function preRemove(EventArgs $eventArgs)
{
$entity = $eventArgs->getEntity();
if ($entity instanceof $this->objectClass) {
$this->scheduledForDeletion[] = $entity;
}
}
/**
* Persist scheduled objects to ElasticSearch
*/
private function persistScheduled()
{
if (count($this->scheduledForInsertion)) {
$this->objectPersister->insertMany($this->scheduledForInsertion);
}
if (count($this->scheduledForUpdate)) {
$this->objectPersister->replaceMany($this->scheduledForUpdate);
}
if (count($this->scheduledForDeletion)) {
$this->objectPersister->deleteMany($this->scheduledForDeletion);
}
}
/**
* Iterate through scheduled actions before flushing to emulate 2.x behavior. Note that the ElasticSearch index
* will fall out of sync with the source data in the event of a crash during flush.
*/
public function preFlush(EventArgs $eventArgs)
{
$this->persistScheduled();
}
/**
* Iterating through scheduled actions *after* flushing ensures that the ElasticSearch index will be affected
* only if the query is successful
*/
public function postFlush(EventArgs $eventArgs)
{
$this->persistScheduled();
}
}

View file

@ -1,50 +0,0 @@
<?php
namespace FOS\ElasticaBundle\Doctrine\MongoDB;
use Doctrine\ODM\MongoDB\Event\LifecycleEventArgs;
use FOS\ElasticaBundle\Doctrine\AbstractListener;
class Listener extends AbstractListener
{
public function postPersist(LifecycleEventArgs $eventArgs)
{
$document = $eventArgs->getDocument();
if ($document instanceof $this->objectClass && $this->isObjectIndexable($document)) {
$this->objectPersister->insertOne($document);
}
}
public function postUpdate(LifecycleEventArgs $eventArgs)
{
$document = $eventArgs->getDocument();
if ($document instanceof $this->objectClass) {
if ($this->isObjectIndexable($document)) {
$this->objectPersister->replaceOne($document);
} else {
$this->scheduleForRemoval($document, $eventArgs->getDocumentManager());
$this->removeIfScheduled($document);
}
}
}
public function preRemove(LifecycleEventArgs $eventArgs)
{
$document = $eventArgs->getDocument();
if ($document instanceof $this->objectClass) {
$this->scheduleForRemoval($document, $eventArgs->getDocumentManager());
}
}
public function postRemove(LifecycleEventArgs $eventArgs)
{
$document = $eventArgs->getDocument();
if ($document instanceof $this->objectClass) {
$this->removeIfScheduled($document);
}
}
}

View file

@ -1,50 +0,0 @@
<?php
namespace FOS\ElasticaBundle\Doctrine\ORM;
use Doctrine\ORM\Event\LifecycleEventArgs;
use FOS\ElasticaBundle\Doctrine\AbstractListener;
class Listener extends AbstractListener
{
public function postPersist(LifecycleEventArgs $eventArgs)
{
$entity = $eventArgs->getEntity();
if ($entity instanceof $this->objectClass && $this->isObjectIndexable($entity)) {
$this->objectPersister->insertOne($entity);
}
}
public function postUpdate(LifecycleEventArgs $eventArgs)
{
$entity = $eventArgs->getEntity();
if ($entity instanceof $this->objectClass) {
if ($this->isObjectIndexable($entity)) {
$this->objectPersister->replaceOne($entity);
} else {
$this->scheduleForRemoval($entity, $eventArgs->getEntityManager());
$this->removeIfScheduled($entity);
}
}
}
public function preRemove(LifecycleEventArgs $eventArgs)
{
$entity = $eventArgs->getEntity();
if ($entity instanceof $this->objectClass) {
$this->scheduleForRemoval($entity, $eventArgs->getEntityManager());
}
}
public function postRemove(LifecycleEventArgs $eventArgs)
{
$entity = $eventArgs->getEntity();
if ($entity instanceof $this->objectClass) {
$this->removeIfScheduled($entity);
}
}
}

View file

@ -83,12 +83,12 @@ class ObjectPersister implements ObjectPersisterInterface
} catch (NotFoundException $e) {}
}
/**
* Inserts an array of objects in the type
* Bulk insert an array of objects in the type for the given method
*
* @param array $objects array of domain model objects
**/
* @param string Method to call
*/
public function insertMany(array $objects)
{
$documents = array();
@ -98,6 +98,34 @@ class ObjectPersister implements ObjectPersisterInterface
$this->type->addDocuments($documents);
}
/**
* Bulk updates an array of objects in the type
*
* @param array $objects array of domain model objects
*/
public function replaceMany(array $objects)
{
$documents = array();
foreach ($objects as $object) {
$documents[] = $this->transformToElasticaDocument($object);
}
$this->type->updateDocuments($documents);
}
/**
* Bulk deletes an array of objects in the type
*
* @param array $objects array of domain model objects
*/
public function deleteMany(array $objects)
{
$documents = array();
foreach ($objects as $object) {
$documents[] = $this->transformToElasticaDocument($object);
}
$this->type->deleteDocuments($documents);
}
/**
* Transforms an object to an elastica document
*
@ -108,4 +136,4 @@ class ObjectPersister implements ObjectPersisterInterface
{
return $this->transformer->transform($object, $this->fields);
}
}
}

View file

@ -38,13 +38,27 @@ interface ObjectPersisterInterface
* @param mixed $id
*
* @return null
**/
*/
function deleteById($id);
/**
* Inserts an array of objects in the type
* Bulk inserts an array of objects in the type
*
* @param array $objects array of domain model objects
**/
*/
function insertMany(array $objects);
/**
* Bulk updates an array of objects in the type
*
* @param array $objects array of domain model objects
*/
function replaceMany(array $objects);
/**
* Bulk deletes an array of objects in the type
*
* @param array $objects array of domain model objects
*/
function deleteMany(array $objects);
}

View file

@ -585,7 +585,11 @@ class User
### Realtime, selective index update
If you use the Doctrine integration, you can let ElasticaBundle update the indexes automatically
when an object is added, updated or removed. It uses Doctrine lifecycle events.
when an object is added, updated or removed. It uses Doctrine lifecycle events to schedule updates
and then synchronizes changes either before or after flush.
> **Propel** doesn't support this feature yet.
Declare that you want to update the index in real time:
fos_elastica:
@ -601,7 +605,7 @@ Declare that you want to update the index in real time:
persistence:
driver: orm
model: Application\UserBundle\Entity\User
listener: ~ # by default, listens to "insert", "update" and "delete"
listener: ~ # by default, listens to "insert", "update" and "delete" and updates `postFlush`
Now the index is automatically updated each time the state of the bound Doctrine repository changes.
No need to repopulate the whole "user" index when a new `User` is created.
@ -614,7 +618,19 @@ You can also choose to only listen for some of the events:
update: false
delete: true
> **Propel** doesn't support this feature yet.
By default, the ElasticSearch index will be updated after flush. To update before flushing, set `immediate`
to `true`:
persistence:
listener:
insert: true
update: false
delete: true
immediate: true
> Using `immediate` to update ElasticSearch before flush completes may cause the ElasticSearch index to fall out of
> sync with the source database in the event of a crash during the flush itself, such as in the case of a bad query.
### Checking an entity method for listener

View file

@ -13,7 +13,7 @@
<argument type="service" id="doctrine_mongodb" />
</service>
<service id="fos_elastica.listener.prototype.mongodb" class="FOS\ElasticaBundle\Doctrine\MongoDB\Listener" public="false" abstract="true">
<service id="fos_elastica.listener.prototype.mongodb" class="FOS\ElasticaBundle\Doctrine\Listener" public="false">
<argument /> <!-- object persister -->
<argument /> <!-- model -->
<argument type="collection" /> <!-- events -->

View file

@ -13,7 +13,7 @@
<argument type="service" id="doctrine" />
</service>
<service id="fos_elastica.listener.prototype.orm" class="FOS\ElasticaBundle\Doctrine\ORM\Listener" public="false" abstract="true">
<service id="fos_elastica.listener.prototype.orm" class="FOS\ElasticaBundle\Doctrine\Listener" public="false">
<argument /> <!-- object persister -->
<argument /> <!-- model -->
<argument type="collection" /> <!-- events -->

View file

@ -3,9 +3,11 @@
namespace FOS\ElasticaBundle\Tests\Doctrine;
/**
* See concrete MongoDB/ORM instances of this abstract test
*
* @author Richard Miller <info@limethinking.co.uk>
*/
abstract class AbstractListenerTest extends \PHPUnit_Framework_TestCase
abstract class ListenerTest extends \PHPUnit_Framework_TestCase
{
public function testObjectInsertedOnPersist()
{
@ -14,12 +16,16 @@ abstract class AbstractListenerTest extends \PHPUnit_Framework_TestCase
$entity = new Listener\Entity(1);
$eventArgs = $this->createLifecycleEventArgs($entity, $this->getMockObjectManager());
$persister->expects($this->once())
->method('insertOne')
->with($entity);
$listener = $this->createListener($persister, get_class($entity), array());
$listener->postPersist($eventArgs);
$this->assertEquals($entity, current($listener->scheduledForInsertion));
$persister->expects($this->once())
->method('insertMany')
->with($listener->scheduledForInsertion);
$listener->postFlush($eventArgs);
}
/**
@ -32,12 +38,18 @@ abstract class AbstractListenerTest extends \PHPUnit_Framework_TestCase
$entity = new Listener\Entity(1, false);
$eventArgs = $this->createLifecycleEventArgs($entity, $this->getMockObjectManager());
$persister->expects($this->never())
->method('insertOne');
$listener = $this->createListener($persister, get_class($entity), array());
$listener->setIsIndexableCallback($isIndexableCallback);
$listener->postPersist($eventArgs);
$this->assertEmpty($listener->scheduledForInsertion);
$persister->expects($this->never())
->method('insertOne');
$persister->expects($this->never())
->method('insertMany');
$listener->postFlush($eventArgs);
}
public function testObjectReplacedOnUpdate()
@ -47,15 +59,18 @@ abstract class AbstractListenerTest extends \PHPUnit_Framework_TestCase
$entity = new Listener\Entity(1);
$eventArgs = $this->createLifecycleEventArgs($entity, $this->getMockObjectManager());
$persister->expects($this->once())
->method('replaceOne')
->with($entity);
$listener = $this->createListener($persister, get_class($entity), array());
$listener->postUpdate($eventArgs);
$this->assertEquals($entity, current($listener->scheduledForUpdate));
$persister->expects($this->once())
->method('replaceMany')
->with(array($entity));
$persister->expects($this->never())
->method('deleteById');
$listener = $this->createListener($persister, get_class($entity), array());
$listener->postUpdate($eventArgs);
$listener->postFlush($eventArgs);
}
/**
@ -80,16 +95,20 @@ abstract class AbstractListenerTest extends \PHPUnit_Framework_TestCase
->with($entity, 'id')
->will($this->returnValue($entity->getId()));
$persister->expects($this->never())
->method('replaceOne');
$persister->expects($this->once())
->method('deleteById')
->with($entity->getId());
$listener = $this->createListener($persister, get_class($entity), array());
$listener->setIsIndexableCallback($isIndexableCallback);
$listener->postUpdate($eventArgs);
$this->assertEmpty($listener->scheduledForUpdate);
$this->assertEquals($entity, current($listener->scheduledForDeletion));
$persister->expects($this->never())
->method('replaceOne');
$persister->expects($this->once())
->method('deleteMany')
->with(array($entity));
$listener->postFlush($eventArgs);
}
public function testObjectDeletedOnRemove()
@ -111,13 +130,16 @@ abstract class AbstractListenerTest extends \PHPUnit_Framework_TestCase
->with($entity, 'id')
->will($this->returnValue($entity->getId()));
$persister->expects($this->once())
->method('deleteById')
->with($entity->getId());
$listener = $this->createListener($persister, get_class($entity), array());
$listener->preRemove($eventArgs);
$listener->postRemove($eventArgs);
$this->assertEquals($entity, current($listener->scheduledForDeletion));
$persister->expects($this->once())
->method('deleteMany')
->with(array($entity));
$listener->postFlush($eventArgs);
}
public function testObjectWithNonStandardIdentifierDeletedOnRemove()
@ -139,13 +161,16 @@ abstract class AbstractListenerTest extends \PHPUnit_Framework_TestCase
->with($entity, 'identifier')
->will($this->returnValue($entity->getId()));
$persister->expects($this->once())
->method('deleteById')
->with($entity->getId());
$listener = $this->createListener($persister, get_class($entity), array(), 'identifier');
$listener->preRemove($eventArgs);
$listener->postRemove($eventArgs);
$this->assertEquals($entity, current($listener->scheduledForDeletion));
$persister->expects($this->once())
->method('deleteMany')
->with(array($entity));
$listener->postFlush($eventArgs);
}
/**

View file

@ -2,9 +2,9 @@
namespace FOS\ElasticaBundle\Tests\Doctrine\MongoDB;
use FOS\ElasticaBundle\Tests\Doctrine\AbstractListenerTest;
use FOS\ElasticaBundle\Tests\Doctrine\ListenerTest as BaseListenerTest;
class ListenerTest extends AbstractListenerTest
class ListenerTest extends BaseListenerTest
{
public function setUp()
{
@ -25,7 +25,7 @@ class ListenerTest extends AbstractListenerTest
protected function getListenerClass()
{
return 'FOS\ElasticaBundle\Doctrine\MongoDB\Listener';
return 'FOS\ElasticaBundle\Doctrine\Listener';
}
protected function getObjectManagerClass()

View file

@ -2,9 +2,9 @@
namespace FOS\ElasticaBundle\Tests\Doctrine\ORM;
use FOS\ElasticaBundle\Tests\Doctrine\AbstractListenerTest;
use FOS\ElasticaBundle\Tests\Doctrine\ListenerTest as BaseListenerTest;
class ListenerTest extends AbstractListenerTest
class ListenerTest extends BaseListenerTest
{
public function setUp()
{
@ -25,7 +25,7 @@ class ListenerTest extends AbstractListenerTest
protected function getListenerClass()
{
return 'FOS\ElasticaBundle\Doctrine\ORM\Listener';
return 'FOS\ElasticaBundle\Doctrine\Listener';
}
protected function getObjectManagerClass()