diff --git a/Doctrine/Listener.php b/Doctrine/Listener.php index d6314dc..c254513 100644 --- a/Doctrine/Listener.php +++ b/Doctrine/Listener.php @@ -210,21 +210,24 @@ class Listener implements EventSubscriber } /** - * Persist scheduled action to ElasticSearch + * Persist scheduled objects to ElasticSearch */ private function persistScheduled() { - $this->objectPersister->bulkPersist($this->scheduledForInsertion, ObjectPersisterInterface::BULK_INSERT); - $this->objectPersister->bulkPersist($this->scheduledForUpdate, ObjectPersisterInterface::BULK_REPLACE); - - foreach ($this->scheduledForDeletion as $entity) { - $this->objectPersister->deleteOne($entity); + if (count($this->scheduledForInsertion)) { + $this->objectPersister->insertMany($this->scheduledForInsertion); + } + if (count($this->scheduledForUpdate)) { + $this->objectPersister->replaceMany($this->scheduledForUpdate); + } + if (count($this->scheduledForDeletion)) { + $this->objectPersister->deleteMany($this->scheduledForDeletion); } } /** * Iterate through scheduled actions before flushing to emulate 2.x behavior. Note that the ElasticSearch index - * will fall out of sync with the data source in event of a crash on flush. + * will fall out of sync with the source data in the event of a crash during flush. */ public function preFlush(EventArgs $eventArgs) { diff --git a/Persister/ObjectPersister.php b/Persister/ObjectPersister.php index b5a1be7..3592a78 100644 --- a/Persister/ObjectPersister.php +++ b/Persister/ObjectPersister.php @@ -84,23 +84,46 @@ class ObjectPersister implements ObjectPersisterInterface } /** - * Bulk update an array of objects in the type for the given method + * Bulk insert an array of objects in the type for the given method * * @param array $objects array of domain model objects * @param string Method to call */ - public function bulkPersist(array $objects, $method) + public function insertMany(array $objects) { - if (!count($objects)) { - return; - } - $documents = array(); foreach ($objects as $object) { $documents[] = $this->transformToElasticaDocument($object); } + $this->type->addDocuments($documents); + } - $this->type->$method($documents); + /** + * Bulk updates an array of objects in the type + * + * @param array $objects array of domain model objects + */ + public function replaceMany(array $objects) + { + $documents = array(); + foreach ($objects as $object) { + $documents[] = $this->transformToElasticaDocument($object); + } + $this->type->updateDocuments($documents); + } + + /** + * Bulk deletes an array of objects in the type + * + * @param array $objects array of domain model objects + */ + public function deleteMany(array $objects) + { + $documents = array(); + foreach ($objects as $object) { + $documents[] = $this->transformToElasticaDocument($object); + } + $this->type->deleteDocuments($documents); } /** diff --git a/Persister/ObjectPersisterInterface.php b/Persister/ObjectPersisterInterface.php index 5c4ecd2..a25aafc 100644 --- a/Persister/ObjectPersisterInterface.php +++ b/Persister/ObjectPersisterInterface.php @@ -10,9 +10,6 @@ namespace FOS\ElasticaBundle\Persister; */ interface ObjectPersisterInterface { - const BULK_INSERT = 'addDocuments'; - const BULK_REPLACE = 'updateDocuments'; - /** * Insert one object into the type * The object will be transformed to an elastica document @@ -41,14 +38,27 @@ interface ObjectPersisterInterface * @param mixed $id * * @return null - **/ + */ function deleteById($id); /** - * Bulk update an array of objects in the type for the given method + * Bulk inserts an array of objects in the type * * @param array $objects array of domain model objects - * @param string Method to call */ - function bulkPersist(array $objects, $method); + function insertMany(array $objects); + + /** + * Bulk updates an array of objects in the type + * + * @param array $objects array of domain model objects + */ + function replaceMany(array $objects); + + /** + * Bulk deletes an array of objects in the type + * + * @param array $objects array of domain model objects + */ + function deleteMany(array $objects); } diff --git a/README.md b/README.md index b3a4c5d..cb6e5f8 100644 --- a/README.md +++ b/README.md @@ -627,8 +627,8 @@ to `true`: delete: true immediate: true -> Updating ElasticSearch before flushing may cause the ElasticSearch index to fall out of sync with the -> original data in the event of a crash. +> Using `immediate` to update ElasticSearch before flush completes may cause the ElasticSearch index to fall out of +> sync with the source database in the event of a crash during the flush itself, such as in the case of a bad query. ### Checking an entity method for listener