FOSElasticaBundle/Provider/DoctrineProvider.php

94 lines
3 KiB
PHP
Raw Normal View History

2011-04-17 19:56:32 +02:00
<?php
namespace FOQ\ElasticaBundle\Provider;
use FOQ\ElasticaBundle\Transformer\ModelToElasticaTransformerInterface;
2011-04-17 19:56:32 +02:00
use Elastica_Type;
use Elastica_Document;
use Closure;
use InvalidArgumentException;
class DoctrineProvider implements ProviderInterface
{
protected $type;
protected $objectManager;
protected $objectClass;
protected $transformer;
2011-04-17 19:56:32 +02:00
protected $options = array(
'batch_size' => 100,
'clear_object_manager' => true,
'query_builder_method' => 'createQueryBuilder'
2011-04-17 19:56:32 +02:00
);
public function __construct(Elastica_Type $type, $objectManager, ModelToElasticaTransformerInterface $transformer, $objectClass, array $options = array())
2011-04-17 19:56:32 +02:00
{
$this->type = $type;
$this->objectManager = $objectManager;
$this->objectClass = $objectClass;
$this->transformer = $transformer;
2011-04-17 19:56:32 +02:00
$this->options = array_merge($this->options, $options);
}
/**
* Insert the repository objects in the type index
*
* @param Closure $loggerClosure
*/
public function populate(Closure $loggerClosure)
{
$queryBuilder = $this->createQueryBuilder();
$nbObjects = $queryBuilder->getQuery()->count();
$fields = $this->extractTypeFields();
2011-04-17 19:56:32 +02:00
for ($offset = 0; $offset < $nbObjects; $offset += $this->options['batch_size']) {
$stepStartTime = microtime(true);
$documents = array();
$objects = $queryBuilder->limit($this->options['batch_size'])->skip($offset)->getQuery()->execute()->toArray();
foreach ($objects as $object) {
try {
$documents[] = $this->transformer->transform($object, $fields);
} catch (NotIndexableException $e) {
// skip document
}
}
$this->type->addDocuments($documents);
2011-04-17 19:56:32 +02:00
if ($this->options['clear_object_manager']) {
$this->objectManager->clear();
}
$stepNbObjects = count($objects);
$stepCount = $stepNbObjects+$offset;
$objectsPerSecond = $stepNbObjects / (microtime(true) - $stepStartTime);
$loggerClosure(sprintf('%0.1f%% (%d/%d), %d objects/s', 100*$stepCount/$nbObjects, $stepCount, $nbObjects, $objectsPerSecond));
2011-04-17 19:56:32 +02:00
}
}
/**
* Creates the query buider used to fetch the documents to index
2011-04-17 19:56:32 +02:00
*
* @return Query
2011-04-17 19:56:32 +02:00
**/
protected function createQueryBuilder()
{
return $this->objectManager->getRepository($this->objectClass)->{$this->options['query_builder_method']}();
}
protected function extractTypeFields()
2011-04-17 19:56:32 +02:00
{
$mappings = $this->type->getMapping();
// skip index name
$mappings = reset($mappings);
// skip type name
$mappings = reset($mappings);
$mappings = $mappings['properties'];
if (array_key_exists('__isInitialized__', $mappings)) {
unset($mappings['__isInitialized__']);
}
2011-04-17 19:56:32 +02:00
return array_keys($mappings);
2011-04-17 19:56:32 +02:00
}
}