Bulk delete

This commit is contained in:
nurikabe 2014-02-01 02:14:21 +00:00
parent b6d010a9d7
commit 1dcaadbe6f
4 changed files with 59 additions and 23 deletions

View file

@ -210,21 +210,24 @@ class Listener implements EventSubscriber
}
/**
* Persist scheduled action to ElasticSearch
* Persist scheduled objects to ElasticSearch
*/
private function persistScheduled()
{
$this->objectPersister->bulkPersist($this->scheduledForInsertion, ObjectPersisterInterface::BULK_INSERT);
$this->objectPersister->bulkPersist($this->scheduledForUpdate, ObjectPersisterInterface::BULK_REPLACE);
foreach ($this->scheduledForDeletion as $entity) {
$this->objectPersister->deleteOne($entity);
if (count($this->scheduledForInsertion)) {
$this->objectPersister->insertMany($this->scheduledForInsertion);
}
if (count($this->scheduledForUpdate)) {
$this->objectPersister->replaceMany($this->scheduledForUpdate);
}
if (count($this->scheduledForDeletion)) {
$this->objectPersister->deleteMany($this->scheduledForDeletion);
}
}
/**
* Iterate through scheduled actions before flushing to emulate 2.x behavior. Note that the ElasticSearch index
* will fall out of sync with the data source in event of a crash on flush.
* will fall out of sync with the source data in the event of a crash during flush.
*/
public function preFlush(EventArgs $eventArgs)
{

View file

@ -84,23 +84,46 @@ class ObjectPersister implements ObjectPersisterInterface
}
/**
* Bulk update an array of objects in the type for the given method
* Bulk insert an array of objects in the type for the given method
*
* @param array $objects array of domain model objects
* @param string Method to call
*/
public function bulkPersist(array $objects, $method)
public function insertMany(array $objects)
{
if (!count($objects)) {
return;
}
$documents = array();
foreach ($objects as $object) {
$documents[] = $this->transformToElasticaDocument($object);
}
$this->type->addDocuments($documents);
}
$this->type->$method($documents);
/**
* Bulk updates an array of objects in the type
*
* @param array $objects array of domain model objects
*/
public function replaceMany(array $objects)
{
$documents = array();
foreach ($objects as $object) {
$documents[] = $this->transformToElasticaDocument($object);
}
$this->type->updateDocuments($documents);
}
/**
* Bulk deletes an array of objects in the type
*
* @param array $objects array of domain model objects
*/
public function deleteMany(array $objects)
{
$documents = array();
foreach ($objects as $object) {
$documents[] = $this->transformToElasticaDocument($object);
}
$this->type->deleteDocuments($documents);
}
/**

View file

@ -10,9 +10,6 @@ namespace FOS\ElasticaBundle\Persister;
*/
interface ObjectPersisterInterface
{
const BULK_INSERT = 'addDocuments';
const BULK_REPLACE = 'updateDocuments';
/**
* Insert one object into the type
* The object will be transformed to an elastica document
@ -41,14 +38,27 @@ interface ObjectPersisterInterface
* @param mixed $id
*
* @return null
**/
*/
function deleteById($id);
/**
* Bulk update an array of objects in the type for the given method
* Bulk inserts an array of objects in the type
*
* @param array $objects array of domain model objects
* @param string Method to call
*/
function bulkPersist(array $objects, $method);
function insertMany(array $objects);
/**
* Bulk updates an array of objects in the type
*
* @param array $objects array of domain model objects
*/
function replaceMany(array $objects);
/**
* Bulk deletes an array of objects in the type
*
* @param array $objects array of domain model objects
*/
function deleteMany(array $objects);
}

View file

@ -627,8 +627,8 @@ to `true`:
delete: true
immediate: true
> Updating ElasticSearch before flushing may cause the ElasticSearch index to fall out of sync with the
> original data in the event of a crash.
> Using `immediate` to update ElasticSearch before flush completes may cause the ElasticSearch index to fall out of
> sync with the source database in the event of a crash during the flush itself, such as in the case of a bad query.
### Checking an entity method for listener