diff --git a/DependencyInjection/Configuration.php b/DependencyInjection/Configuration.php
index adcaf6a..0f868af 100644
--- a/DependencyInjection/Configuration.php
+++ b/DependencyInjection/Configuration.php
@@ -177,6 +177,7 @@ class Configuration implements ConfigurationInterface
->scalarNode('insert')->defaultTrue()->end()
->scalarNode('update')->defaultTrue()->end()
->scalarNode('delete')->defaultTrue()->end()
+ ->scalarNode('persist')->defaultValue('postFlush')->end()
->scalarNode('service')->end()
->variableNode('is_indexable_callback')->defaultNull()->end()
->end()
@@ -269,6 +270,7 @@ class Configuration implements ConfigurationInterface
->scalarNode('insert')->defaultTrue()->end()
->scalarNode('update')->defaultTrue()->end()
->scalarNode('delete')->defaultTrue()->end()
+ ->booleanNode('immediate')->defaultFalse()->end()
->scalarNode('service')->end()
->variableNode('is_indexable_callback')->defaultNull()->end()
->end()
diff --git a/DependencyInjection/FOSElasticaExtension.php b/DependencyInjection/FOSElasticaExtension.php
index 96e1fdf..41c8c1f 100644
--- a/DependencyInjection/FOSElasticaExtension.php
+++ b/DependencyInjection/FOSElasticaExtension.php
@@ -448,13 +448,32 @@ class FOSElasticaExtension extends Extension
return $listenerId;
}
+ /**
+ * Map Elastica to Doctrine events for the current driver
+ */
private function getDoctrineEvents(array $typeConfig)
{
+ // Flush always calls depending on actions scheduled in lifecycle listeners
+ $typeConfig['listener']['flush'] = true;
+
+ switch ($typeConfig['driver']) {
+ case 'orm':
+ $eventsClass = '\Doctrine\ORM\Events';
+ break;
+ case 'mongodb':
+ $eventsClass = '\Doctrine\ODM\MongoDB\Events';
+ break;
+ default:
+ throw new InvalidArgumentException(sprintf('Cannot determine events for driver "%s"', $typeConfig['driver']));
+ break;
+ }
+
$events = array();
$eventMapping = array(
- 'insert' => array('postPersist'),
- 'update' => array('postUpdate'),
- 'delete' => array('postRemove', 'preRemove')
+ 'insert' => array(constant($eventsClass.'::postPersist')),
+ 'update' => array(constant($eventsClass.'::postUpdate')),
+ 'delete' => array(constant($eventsClass.'::preRemove')),
+ 'flush' => array($typeConfig['listener']['immediate'] ? constant($eventsClass.'::preFlush') : constant($eventsClass.'::postFlush'))
);
foreach ($eventMapping as $event => $doctrineEvents) {
diff --git a/Doctrine/AbstractListener.php b/Doctrine/Listener.php
similarity index 70%
rename from Doctrine/AbstractListener.php
rename to Doctrine/Listener.php
index 3b62444..c254513 100644
--- a/Doctrine/AbstractListener.php
+++ b/Doctrine/Listener.php
@@ -2,15 +2,15 @@
namespace FOS\ElasticaBundle\Doctrine;
+use Doctrine\Common\EventArgs;
use Doctrine\Common\EventSubscriber;
-use Doctrine\Common\Persistence\ObjectManager;
use FOS\ElasticaBundle\Persister\ObjectPersisterInterface;
use FOS\ElasticaBundle\Persister\ObjectPersister;
use Symfony\Component\ExpressionLanguage\Expression;
use Symfony\Component\ExpressionLanguage\ExpressionLanguage;
use Symfony\Component\ExpressionLanguage\SyntaxError;
-abstract class AbstractListener implements EventSubscriber
+class Listener implements EventSubscriber
{
/**
* Object persister
@@ -48,11 +48,11 @@ abstract class AbstractListener implements EventSubscriber
protected $isIndexableCallback;
/**
- * Objects scheduled for removal
- *
- * @var array
+ * Objects scheduled for insertion, replacement, or removal
*/
- private $scheduledForRemoval = array();
+ public $scheduledForInsertion = array();
+ public $scheduledForUpdate = array();
+ public $scheduledForDeletion = array();
/**
* An instance of ExpressionLanguage
@@ -149,37 +149,6 @@ abstract class AbstractListener implements EventSubscriber
: call_user_func($this->isIndexableCallback, $object);
}
- /**
- * Schedules the object for removal.
- *
- * This is usually called during the pre-remove event.
- *
- * @param object $object
- * @param ObjectManager $objectManager
- */
- protected function scheduleForRemoval($object, ObjectManager $objectManager)
- {
- $metadata = $objectManager->getClassMetadata($this->objectClass);
- $esId = $metadata->getFieldValue($object, $this->esIdentifierField);
- $this->scheduledForRemoval[spl_object_hash($object)] = $esId;
- }
-
- /**
- * Removes the object if it was scheduled for removal.
- *
- * This is usually called during the post-remove event.
- *
- * @param object $object
- */
- protected function removeIfScheduled($object)
- {
- $objectHash = spl_object_hash($object);
- if (isset($this->scheduledForRemoval[$objectHash])) {
- $this->objectPersister->deleteById($this->scheduledForRemoval[$objectHash]);
- unset($this->scheduledForRemoval[$objectHash]);
- }
- }
-
/**
* @param mixed $object
* @return string
@@ -207,4 +176,70 @@ abstract class AbstractListener implements EventSubscriber
return $this->expressionLanguage;
}
+
+ public function postPersist(EventArgs $eventArgs)
+ {
+ $entity = $eventArgs->getEntity();
+
+ if ($entity instanceof $this->objectClass && $this->isObjectIndexable($entity)) {
+ $this->scheduledForInsertion[] = $entity;
+ }
+ }
+
+ public function postUpdate(EventArgs $eventArgs)
+ {
+ $entity = $eventArgs->getEntity();
+
+ if ($entity instanceof $this->objectClass) {
+ if ($this->isObjectIndexable($entity)) {
+ $this->scheduledForUpdate[] = $entity;
+ } else {
+ // Delete if no longer indexable
+ $this->scheduledForDeletion[] = $entity;
+ }
+ }
+ }
+
+ public function preRemove(EventArgs $eventArgs)
+ {
+ $entity = $eventArgs->getEntity();
+
+ if ($entity instanceof $this->objectClass) {
+ $this->scheduledForDeletion[] = $entity;
+ }
+ }
+
+ /**
+ * Persist scheduled objects to ElasticSearch
+ */
+ private function persistScheduled()
+ {
+ if (count($this->scheduledForInsertion)) {
+ $this->objectPersister->insertMany($this->scheduledForInsertion);
+ }
+ if (count($this->scheduledForUpdate)) {
+ $this->objectPersister->replaceMany($this->scheduledForUpdate);
+ }
+ if (count($this->scheduledForDeletion)) {
+ $this->objectPersister->deleteMany($this->scheduledForDeletion);
+ }
+ }
+
+ /**
+ * Iterate through scheduled actions before flushing to emulate 2.x behavior. Note that the ElasticSearch index
+ * will fall out of sync with the source data in the event of a crash during flush.
+ */
+ public function preFlush(EventArgs $eventArgs)
+ {
+ $this->persistScheduled();
+ }
+
+ /**
+ * Iterating through scheduled actions *after* flushing ensures that the ElasticSearch index will be affected
+ * only if the query is successful
+ */
+ public function postFlush(EventArgs $eventArgs)
+ {
+ $this->persistScheduled();
+ }
}
diff --git a/Doctrine/MongoDB/Listener.php b/Doctrine/MongoDB/Listener.php
deleted file mode 100644
index 9fa3536..0000000
--- a/Doctrine/MongoDB/Listener.php
+++ /dev/null
@@ -1,50 +0,0 @@
-getDocument();
-
- if ($document instanceof $this->objectClass && $this->isObjectIndexable($document)) {
- $this->objectPersister->insertOne($document);
- }
- }
-
- public function postUpdate(LifecycleEventArgs $eventArgs)
- {
- $document = $eventArgs->getDocument();
-
- if ($document instanceof $this->objectClass) {
- if ($this->isObjectIndexable($document)) {
- $this->objectPersister->replaceOne($document);
- } else {
- $this->scheduleForRemoval($document, $eventArgs->getDocumentManager());
- $this->removeIfScheduled($document);
- }
- }
- }
-
- public function preRemove(LifecycleEventArgs $eventArgs)
- {
- $document = $eventArgs->getDocument();
-
- if ($document instanceof $this->objectClass) {
- $this->scheduleForRemoval($document, $eventArgs->getDocumentManager());
- }
- }
-
- public function postRemove(LifecycleEventArgs $eventArgs)
- {
- $document = $eventArgs->getDocument();
-
- if ($document instanceof $this->objectClass) {
- $this->removeIfScheduled($document);
- }
- }
-}
diff --git a/Doctrine/ORM/Listener.php b/Doctrine/ORM/Listener.php
deleted file mode 100644
index 790ddb8..0000000
--- a/Doctrine/ORM/Listener.php
+++ /dev/null
@@ -1,50 +0,0 @@
-getEntity();
-
- if ($entity instanceof $this->objectClass && $this->isObjectIndexable($entity)) {
- $this->objectPersister->insertOne($entity);
- }
- }
-
- public function postUpdate(LifecycleEventArgs $eventArgs)
- {
- $entity = $eventArgs->getEntity();
-
- if ($entity instanceof $this->objectClass) {
- if ($this->isObjectIndexable($entity)) {
- $this->objectPersister->replaceOne($entity);
- } else {
- $this->scheduleForRemoval($entity, $eventArgs->getEntityManager());
- $this->removeIfScheduled($entity);
- }
- }
- }
-
- public function preRemove(LifecycleEventArgs $eventArgs)
- {
- $entity = $eventArgs->getEntity();
-
- if ($entity instanceof $this->objectClass) {
- $this->scheduleForRemoval($entity, $eventArgs->getEntityManager());
- }
- }
-
- public function postRemove(LifecycleEventArgs $eventArgs)
- {
- $entity = $eventArgs->getEntity();
-
- if ($entity instanceof $this->objectClass) {
- $this->removeIfScheduled($entity);
- }
- }
-}
diff --git a/Persister/ObjectPersister.php b/Persister/ObjectPersister.php
index 450e43b..3592a78 100644
--- a/Persister/ObjectPersister.php
+++ b/Persister/ObjectPersister.php
@@ -83,12 +83,12 @@ class ObjectPersister implements ObjectPersisterInterface
} catch (NotFoundException $e) {}
}
-
/**
- * Inserts an array of objects in the type
+ * Bulk insert an array of objects in the type for the given method
*
* @param array $objects array of domain model objects
- **/
+ * @param string Method to call
+ */
public function insertMany(array $objects)
{
$documents = array();
@@ -98,6 +98,34 @@ class ObjectPersister implements ObjectPersisterInterface
$this->type->addDocuments($documents);
}
+ /**
+ * Bulk updates an array of objects in the type
+ *
+ * @param array $objects array of domain model objects
+ */
+ public function replaceMany(array $objects)
+ {
+ $documents = array();
+ foreach ($objects as $object) {
+ $documents[] = $this->transformToElasticaDocument($object);
+ }
+ $this->type->updateDocuments($documents);
+ }
+
+ /**
+ * Bulk deletes an array of objects in the type
+ *
+ * @param array $objects array of domain model objects
+ */
+ public function deleteMany(array $objects)
+ {
+ $documents = array();
+ foreach ($objects as $object) {
+ $documents[] = $this->transformToElasticaDocument($object);
+ }
+ $this->type->deleteDocuments($documents);
+ }
+
/**
* Transforms an object to an elastica document
*
@@ -108,4 +136,4 @@ class ObjectPersister implements ObjectPersisterInterface
{
return $this->transformer->transform($object, $this->fields);
}
-}
+}
\ No newline at end of file
diff --git a/Persister/ObjectPersisterInterface.php b/Persister/ObjectPersisterInterface.php
index a50bcc8..a25aafc 100644
--- a/Persister/ObjectPersisterInterface.php
+++ b/Persister/ObjectPersisterInterface.php
@@ -38,13 +38,27 @@ interface ObjectPersisterInterface
* @param mixed $id
*
* @return null
- **/
+ */
function deleteById($id);
/**
- * Inserts an array of objects in the type
+ * Bulk inserts an array of objects in the type
*
* @param array $objects array of domain model objects
- **/
+ */
function insertMany(array $objects);
+
+ /**
+ * Bulk updates an array of objects in the type
+ *
+ * @param array $objects array of domain model objects
+ */
+ function replaceMany(array $objects);
+
+ /**
+ * Bulk deletes an array of objects in the type
+ *
+ * @param array $objects array of domain model objects
+ */
+ function deleteMany(array $objects);
}
diff --git a/README.md b/README.md
index aaaf0a5..aba2593 100644
--- a/README.md
+++ b/README.md
@@ -585,7 +585,11 @@ class User
### Realtime, selective index update
If you use the Doctrine integration, you can let ElasticaBundle update the indexes automatically
-when an object is added, updated or removed. It uses Doctrine lifecycle events.
+when an object is added, updated or removed. It uses Doctrine lifecycle events to schedule updates
+and then synchronizes changes either before or after flush.
+
+> **Propel** doesn't support this feature yet.
+
Declare that you want to update the index in real time:
fos_elastica:
@@ -601,7 +605,7 @@ Declare that you want to update the index in real time:
persistence:
driver: orm
model: Application\UserBundle\Entity\User
- listener: ~ # by default, listens to "insert", "update" and "delete"
+ listener: ~ # by default, listens to "insert", "update" and "delete" and updates `postFlush`
Now the index is automatically updated each time the state of the bound Doctrine repository changes.
No need to repopulate the whole "user" index when a new `User` is created.
@@ -614,7 +618,19 @@ You can also choose to only listen for some of the events:
update: false
delete: true
-> **Propel** doesn't support this feature yet.
+By default, the ElasticSearch index will be updated after flush. To update before flushing, set `immediate`
+to `true`:
+
+ persistence:
+ listener:
+ insert: true
+ update: false
+ delete: true
+ immediate: true
+
+> Using `immediate` to update ElasticSearch before flush completes may cause the ElasticSearch index to fall out of
+> sync with the source database in the event of a crash during the flush itself, such as in the case of a bad query.
+
### Checking an entity method for listener
diff --git a/Resources/config/mongodb.xml b/Resources/config/mongodb.xml
index e60e3dc..0af7aa1 100644
--- a/Resources/config/mongodb.xml
+++ b/Resources/config/mongodb.xml
@@ -13,7 +13,7 @@
-
+
diff --git a/Resources/config/orm.xml b/Resources/config/orm.xml
index 4fd6ae7..5bd16e5 100644
--- a/Resources/config/orm.xml
+++ b/Resources/config/orm.xml
@@ -13,7 +13,7 @@
-
+
diff --git a/Tests/Doctrine/AbstractListenerTest.php b/Tests/Doctrine/AbstractListenerTest.php
index e99e26d..a9eff66 100644
--- a/Tests/Doctrine/AbstractListenerTest.php
+++ b/Tests/Doctrine/AbstractListenerTest.php
@@ -3,9 +3,11 @@
namespace FOS\ElasticaBundle\Tests\Doctrine;
/**
+ * See concrete MongoDB/ORM instances of this abstract test
+ *
* @author Richard Miller
*/
-abstract class AbstractListenerTest extends \PHPUnit_Framework_TestCase
+abstract class ListenerTest extends \PHPUnit_Framework_TestCase
{
public function testObjectInsertedOnPersist()
{
@@ -14,12 +16,16 @@ abstract class AbstractListenerTest extends \PHPUnit_Framework_TestCase
$entity = new Listener\Entity(1);
$eventArgs = $this->createLifecycleEventArgs($entity, $this->getMockObjectManager());
- $persister->expects($this->once())
- ->method('insertOne')
- ->with($entity);
-
$listener = $this->createListener($persister, get_class($entity), array());
$listener->postPersist($eventArgs);
+
+ $this->assertEquals($entity, current($listener->scheduledForInsertion));
+
+ $persister->expects($this->once())
+ ->method('insertMany')
+ ->with($listener->scheduledForInsertion);
+
+ $listener->postFlush($eventArgs);
}
/**
@@ -32,12 +38,18 @@ abstract class AbstractListenerTest extends \PHPUnit_Framework_TestCase
$entity = new Listener\Entity(1, false);
$eventArgs = $this->createLifecycleEventArgs($entity, $this->getMockObjectManager());
- $persister->expects($this->never())
- ->method('insertOne');
-
$listener = $this->createListener($persister, get_class($entity), array());
$listener->setIsIndexableCallback($isIndexableCallback);
$listener->postPersist($eventArgs);
+
+ $this->assertEmpty($listener->scheduledForInsertion);
+
+ $persister->expects($this->never())
+ ->method('insertOne');
+ $persister->expects($this->never())
+ ->method('insertMany');
+
+ $listener->postFlush($eventArgs);
}
public function testObjectReplacedOnUpdate()
@@ -47,15 +59,18 @@ abstract class AbstractListenerTest extends \PHPUnit_Framework_TestCase
$entity = new Listener\Entity(1);
$eventArgs = $this->createLifecycleEventArgs($entity, $this->getMockObjectManager());
- $persister->expects($this->once())
- ->method('replaceOne')
- ->with($entity);
+ $listener = $this->createListener($persister, get_class($entity), array());
+ $listener->postUpdate($eventArgs);
+ $this->assertEquals($entity, current($listener->scheduledForUpdate));
+
+ $persister->expects($this->once())
+ ->method('replaceMany')
+ ->with(array($entity));
$persister->expects($this->never())
->method('deleteById');
- $listener = $this->createListener($persister, get_class($entity), array());
- $listener->postUpdate($eventArgs);
+ $listener->postFlush($eventArgs);
}
/**
@@ -80,16 +95,20 @@ abstract class AbstractListenerTest extends \PHPUnit_Framework_TestCase
->with($entity, 'id')
->will($this->returnValue($entity->getId()));
- $persister->expects($this->never())
- ->method('replaceOne');
-
- $persister->expects($this->once())
- ->method('deleteById')
- ->with($entity->getId());
-
$listener = $this->createListener($persister, get_class($entity), array());
$listener->setIsIndexableCallback($isIndexableCallback);
$listener->postUpdate($eventArgs);
+
+ $this->assertEmpty($listener->scheduledForUpdate);
+ $this->assertEquals($entity, current($listener->scheduledForDeletion));
+
+ $persister->expects($this->never())
+ ->method('replaceOne');
+ $persister->expects($this->once())
+ ->method('deleteMany')
+ ->with(array($entity));
+
+ $listener->postFlush($eventArgs);
}
public function testObjectDeletedOnRemove()
@@ -111,13 +130,16 @@ abstract class AbstractListenerTest extends \PHPUnit_Framework_TestCase
->with($entity, 'id')
->will($this->returnValue($entity->getId()));
- $persister->expects($this->once())
- ->method('deleteById')
- ->with($entity->getId());
-
$listener = $this->createListener($persister, get_class($entity), array());
$listener->preRemove($eventArgs);
- $listener->postRemove($eventArgs);
+
+ $this->assertEquals($entity, current($listener->scheduledForDeletion));
+
+ $persister->expects($this->once())
+ ->method('deleteMany')
+ ->with(array($entity));
+
+ $listener->postFlush($eventArgs);
}
public function testObjectWithNonStandardIdentifierDeletedOnRemove()
@@ -139,13 +161,16 @@ abstract class AbstractListenerTest extends \PHPUnit_Framework_TestCase
->with($entity, 'identifier')
->will($this->returnValue($entity->getId()));
- $persister->expects($this->once())
- ->method('deleteById')
- ->with($entity->getId());
-
$listener = $this->createListener($persister, get_class($entity), array(), 'identifier');
$listener->preRemove($eventArgs);
- $listener->postRemove($eventArgs);
+
+ $this->assertEquals($entity, current($listener->scheduledForDeletion));
+
+ $persister->expects($this->once())
+ ->method('deleteMany')
+ ->with(array($entity));
+
+ $listener->postFlush($eventArgs);
}
/**
diff --git a/Tests/Doctrine/MongoDB/ListenerTest.php b/Tests/Doctrine/MongoDB/ListenerTest.php
index 7f1a9ab..37a0203 100644
--- a/Tests/Doctrine/MongoDB/ListenerTest.php
+++ b/Tests/Doctrine/MongoDB/ListenerTest.php
@@ -2,9 +2,9 @@
namespace FOS\ElasticaBundle\Tests\Doctrine\MongoDB;
-use FOS\ElasticaBundle\Tests\Doctrine\AbstractListenerTest;
+use FOS\ElasticaBundle\Tests\Doctrine\ListenerTest as BaseListenerTest;
-class ListenerTest extends AbstractListenerTest
+class ListenerTest extends BaseListenerTest
{
public function setUp()
{
@@ -25,7 +25,7 @@ class ListenerTest extends AbstractListenerTest
protected function getListenerClass()
{
- return 'FOS\ElasticaBundle\Doctrine\MongoDB\Listener';
+ return 'FOS\ElasticaBundle\Doctrine\Listener';
}
protected function getObjectManagerClass()
diff --git a/Tests/Doctrine/ORM/ListenerTest.php b/Tests/Doctrine/ORM/ListenerTest.php
index 48702c0..12a89b2 100644
--- a/Tests/Doctrine/ORM/ListenerTest.php
+++ b/Tests/Doctrine/ORM/ListenerTest.php
@@ -2,9 +2,9 @@
namespace FOS\ElasticaBundle\Tests\Doctrine\ORM;
-use FOS\ElasticaBundle\Tests\Doctrine\AbstractListenerTest;
+use FOS\ElasticaBundle\Tests\Doctrine\ListenerTest as BaseListenerTest;
-class ListenerTest extends AbstractListenerTest
+class ListenerTest extends BaseListenerTest
{
public function setUp()
{
@@ -25,7 +25,7 @@ class ListenerTest extends AbstractListenerTest
protected function getListenerClass()
{
- return 'FOS\ElasticaBundle\Doctrine\ORM\Listener';
+ return 'FOS\ElasticaBundle\Doctrine\Listener';
}
protected function getObjectManagerClass()