vendor/shopware/elasticsearch/Admin/AdminSearchRegistry.php line 157

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Elasticsearch\Admin;
  3. use Doctrine\DBAL\Connection;
  4. use Elasticsearch\Client;
  5. use Shopware\Core\Framework\DataAbstractionLayer\Event\EntityWrittenContainerEvent;
  6. use Shopware\Core\Framework\Event\ProgressAdvancedEvent;
  7. use Shopware\Core\Framework\Event\ProgressFinishedEvent;
  8. use Shopware\Core\Framework\Event\ProgressStartedEvent;
  9. use Shopware\Core\Framework\Log\Package;
  10. use Shopware\Core\Framework\Uuid\Uuid;
  11. use Shopware\Elasticsearch\Admin\Indexer\AbstractAdminIndexer;
  12. use Shopware\Elasticsearch\Exception\ElasticsearchIndexingException;
  13. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  14. use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
  15. use Symfony\Component\Messenger\MessageBusInterface;
  16. use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
  17. /**
  18.  * @internal
  19.  *
  20.  * @final
  21.  */
  22. #[Package('system-settings')]
  23. class AdminSearchRegistry implements MessageHandlerInterfaceEventSubscriberInterface
  24. {
  25.     /**
  26.      * @var array<string, mixed>
  27.      */
  28.     private array $indexer;
  29.     private Connection $connection;
  30.     private MessageBusInterface $queue;
  31.     private EventDispatcherInterface $dispatcher;
  32.     private Client $client;
  33.     private AdminElasticsearchHelper $adminEsHelper;
  34.     /**
  35.      * @var array<mixed>
  36.      */
  37.     private array $config;
  38.     /**
  39.      * @var array<mixed>
  40.      */
  41.     private array $mapping;
  42.     /**
  43.      * @param AbstractAdminIndexer[] $indexer
  44.      * @param array<mixed> $config
  45.      * @param array<mixed> $mapping
  46.      */
  47.     public function __construct(
  48.         $indexer,
  49.         Connection $connection,
  50.         MessageBusInterface $queue,
  51.         EventDispatcherInterface $dispatcher,
  52.         Client $client,
  53.         AdminElasticsearchHelper $adminEsHelper,
  54.         array $config,
  55.         array $mapping
  56.     ) {
  57.         $this->indexer $indexer instanceof \Traversable iterator_to_array($indexer) : $indexer;
  58.         $this->connection $connection;
  59.         $this->queue $queue;
  60.         $this->dispatcher $dispatcher;
  61.         $this->client $client;
  62.         $this->adminEsHelper $adminEsHelper;
  63.         $this->mapping $mapping;
  64.         if (isset($config['settings']['index'])) {
  65.             if (\array_key_exists('number_of_shards'$config['settings']['index']) && $config['settings']['index']['number_of_shards'] === null) {
  66.                 unset($config['settings']['index']['number_of_shards']);
  67.             }
  68.             if (\array_key_exists('number_of_replicas'$config['settings']['index']) && $config['settings']['index']['number_of_replicas'] === null) {
  69.                 unset($config['settings']['index']['number_of_replicas']);
  70.             }
  71.         }
  72.         $this->config $config;
  73.     }
  74.     public function __invoke(AdminSearchIndexingMessage $message): void
  75.     {
  76.         $indexer $this->getIndexer($message->getEntity());
  77.         $documents $indexer->fetch($message->getIds());
  78.         $this->push($indexer$message->getIndices(), $documents$message->getIds());
  79.     }
  80.     public static function getSubscribedEvents(): array
  81.     {
  82.         return [
  83.             EntityWrittenContainerEvent::class => [
  84.                 ['refresh', -1000],
  85.             ],
  86.         ];
  87.     }
  88.     /**
  89.      * @return iterable<class-string>
  90.      */
  91.     public static function getHandledMessages(): iterable
  92.     {
  93.         return [
  94.             AdminSearchIndexingMessage::class,
  95.         ];
  96.     }
  97.     public function iterate(AdminIndexingBehavior $indexingBehavior): void
  98.     {
  99.         if ($this->adminEsHelper->getEnabled() === false) {
  100.             return;
  101.         }
  102.         /** @var array<string> $entities */
  103.         $entities array_keys($this->indexer);
  104.         if ($indexingBehavior->getOnlyEntities()) {
  105.             $entities array_intersect($entities$indexingBehavior->getOnlyEntities());
  106.         } elseif ($indexingBehavior->getSkipEntities()) {
  107.             $entities array_diff($entities$indexingBehavior->getSkipEntities());
  108.         }
  109.         $indices $this->createIndices($entities);
  110.         foreach ($entities as $entityName) {
  111.             $indexer $this->getIndexer($entityName);
  112.             $iterator $indexer->getIterator();
  113.             $this->dispatcher->dispatch(new ProgressStartedEvent($indexer->getName(), $iterator->fetchCount()));
  114.             while ($ids $iterator->fetch()) {
  115.                 // we provide no queue when the data is sent by the admin
  116.                 if ($indexingBehavior->getNoQueue() === true) {
  117.                     $this->__invoke(new AdminSearchIndexingMessage($indexer->getEntity(), $indexer->getName(), $indices$ids));
  118.                 } else {
  119.                     $this->queue->dispatch(new AdminSearchIndexingMessage($indexer->getEntity(), $indexer->getName(), $indices$ids));
  120.                 }
  121.                 $this->dispatcher->dispatch(new ProgressAdvancedEvent(\count($ids)));
  122.             }
  123.             $this->dispatcher->dispatch(new ProgressFinishedEvent($indexer->getName()));
  124.         }
  125.         $this->swapAlias($indices);
  126.     }
  127.     public function refresh(EntityWrittenContainerEvent $event): void
  128.     {
  129.         if ($this->adminEsHelper->getEnabled() === false || !$this->isIndexedEntityWritten($event)) {
  130.             return;
  131.         }
  132.         if ($this->adminEsHelper->getRefreshIndices()) {
  133.             $this->refreshIndices();
  134.         }
  135.         /** @var array<string, string> $indices */
  136.         $indices $this->connection->fetchAllKeyValue('SELECT `alias`, `index` FROM admin_elasticsearch_index_task');
  137.         if (empty($indices)) {
  138.             return;
  139.         }
  140.         foreach ($this->indexer as $indexer) {
  141.             $ids $event->getPrimaryKeys($indexer->getEntity());
  142.             if (empty($ids)) {
  143.                 continue;
  144.             }
  145.             $documents $indexer->fetch($ids);
  146.             $this->push($indexer$indices$documents$ids);
  147.         }
  148.     }
  149.     /**
  150.      * @return AbstractAdminIndexer[]
  151.      */
  152.     public function getIndexers(): iterable
  153.     {
  154.         return $this->indexer;
  155.     }
  156.     public function getIndexer(string $name): AbstractAdminIndexer
  157.     {
  158.         $indexer $this->indexer[$name] ?? null;
  159.         if ($indexer) {
  160.             return $indexer;
  161.         }
  162.         throw new ElasticsearchIndexingException([\sprintf('Indexer for name %s not found'$name)]);
  163.     }
  164.     private function isIndexedEntityWritten(EntityWrittenContainerEvent $event): bool
  165.     {
  166.         foreach ($this->indexer as $indexer) {
  167.             $ids $event->getPrimaryKeys($indexer->getEntity());
  168.             if (!empty($ids)) {
  169.                 return true;
  170.             }
  171.         }
  172.         return false;
  173.     }
  174.     /**
  175.      * @param array<string, string> $indices
  176.      * @param array<string, array<string|int, string>> $data
  177.      * @param array<string> $ids
  178.      */
  179.     private function push(AbstractAdminIndexer $indexer, array $indices, array $data, array $ids): void
  180.     {
  181.         $alias $this->adminEsHelper->getIndex($indexer->getName());
  182.         if (!isset($indices[$alias])) {
  183.             return;
  184.         }
  185.         $toRemove array_filter($ids, static function (string $id) use ($data): bool {
  186.             return !isset($data[$id]);
  187.         });
  188.         $documents = [];
  189.         foreach ($data as $id => $document) {
  190.             $documents[] = ['index' => ['_id' => $id]];
  191.             $documents[] = \array_replace(
  192.                 ['entityName' => $indexer->getEntity(), 'parameters' => [], 'textBoosted' => '''text' => ''],
  193.                 $document
  194.             );
  195.         }
  196.         foreach ($toRemove as $id) {
  197.             $documents[] = ['delete' => ['_id' => $id]];
  198.         }
  199.         $arguments = [
  200.             'index' => $indices[$alias],
  201.             'body' => $documents,
  202.         ];
  203.         $result $this->client->bulk($arguments);
  204.         if (\is_array($result) && !empty($result['errors'])) {
  205.             $errors $this->parseErrors($result);
  206.             throw new ElasticsearchIndexingException($errors);
  207.         }
  208.     }
  209.     /**
  210.      * @param array<string> $entities
  211.      *
  212.      * @throws \Doctrine\DBAL\Exception
  213.      *
  214.      * @return array<string, string>
  215.      */
  216.     private function createIndices(array $entities): array
  217.     {
  218.         $indexTasks = [];
  219.         $indices = [];
  220.         foreach ($entities as $entityName) {
  221.             $indexer $this->getIndexer($entityName);
  222.             $alias $this->adminEsHelper->getIndex($indexer->getName());
  223.             $index $alias '_' time();
  224.             if ($this->indexExists($index)) {
  225.                 continue;
  226.             }
  227.             $indices[$alias] = $index;
  228.             $this->create($indexer$index$alias);
  229.             $iterator $indexer->getIterator();
  230.             $indexTasks[] = [
  231.                 'id' => Uuid::randomBytes(),
  232.                 '`entity`' => $indexer->getEntity(),
  233.                 '`index`' => $index,
  234.                 '`alias`' => $alias,
  235.                 '`doc_count`' => $iterator->fetchCount(),
  236.             ];
  237.         }
  238.         $this->connection->executeStatement(
  239.             'DELETE FROM admin_elasticsearch_index_task WHERE `entity` IN (:entities)',
  240.             ['entities' => $entities],
  241.             ['entities' => Connection::PARAM_STR_ARRAY]
  242.         );
  243.         foreach ($indexTasks as $task) {
  244.             $this->connection->insert('admin_elasticsearch_index_task'$task);
  245.         }
  246.         return $indices;
  247.     }
  248.     private function refreshIndices(): void
  249.     {
  250.         $entities = [];
  251.         $indexTasks = [];
  252.         foreach ($this->indexer as $indexer) {
  253.             $alias $this->adminEsHelper->getIndex($indexer->getName());
  254.             if ($this->aliasExists($alias)) {
  255.                 continue;
  256.             }
  257.             $index $alias '_' time();
  258.             $this->create($indexer$index$alias);
  259.             $entities[] = $indexer->getEntity();
  260.             $iterator $indexer->getIterator();
  261.             $indexTasks[] = [
  262.                 'id' => Uuid::randomBytes(),
  263.                 '`entity`' => $indexer->getEntity(),
  264.                 '`index`' => $index,
  265.                 '`alias`' => $alias,
  266.                 '`doc_count`' => $iterator->fetchCount(),
  267.             ];
  268.         }
  269.         $this->connection->executeStatement(
  270.             'DELETE FROM admin_elasticsearch_index_task WHERE `entity` IN (:entities)',
  271.             ['entities' => $entities],
  272.             ['entities' => Connection::PARAM_STR_ARRAY]
  273.         );
  274.         foreach ($indexTasks as $task) {
  275.             $this->connection->insert('admin_elasticsearch_index_task'$task);
  276.         }
  277.     }
  278.     private function create(AbstractAdminIndexer $indexerstring $indexstring $alias): void
  279.     {
  280.         $mapping $indexer->mapping([
  281.             'properties' => [
  282.                 'id' => ['type' => 'keyword'],
  283.                 'textBoosted' => ['type' => 'text'],
  284.                 'text' => ['type' => 'text'],
  285.                 'entityName' => ['type' => 'keyword'],
  286.                 'parameters' => ['type' => 'keyword'],
  287.             ],
  288.         ]);
  289.         $mapping array_merge_recursive($mapping$this->mapping);
  290.         $body array_merge(
  291.             $this->config,
  292.             ['mappings' => $mapping]
  293.         );
  294.         $this->client->indices()->create([
  295.             'index' => $index,
  296.             'body' => $body,
  297.         ]);
  298.         $this->createAliasIfNotExisting($index$alias);
  299.     }
  300.     private function indexExists(string $name): bool
  301.     {
  302.         return $this->client->indices()->exists(['index' => $name]);
  303.     }
  304.     private function aliasExists(string $alias): bool
  305.     {
  306.         return $this->client->indices()->existsAlias(['name' => $alias]);
  307.     }
  308.     /**
  309.      * @param array<string, array<array<string, mixed>>> $result
  310.      *
  311.      * @return array<array{reason: string}|string>
  312.      */
  313.     private function parseErrors(array $result): array
  314.     {
  315.         $errors = [];
  316.         foreach ($result['items'] as $item) {
  317.             $item $item['index'] ?? $item['delete'];
  318.             if (\in_array($item['status'], [200201], true)) {
  319.                 continue;
  320.             }
  321.             $errors[] = [
  322.                 'index' => $item['_index'],
  323.                 'id' => $item['_id'],
  324.                 'type' => $item['error']['type'] ?? $item['_type'],
  325.                 'reason' => $item['error']['reason'] ?? $item['result'],
  326.             ];
  327.         }
  328.         return $errors;
  329.     }
  330.     private function createAliasIfNotExisting(string $indexstring $alias): void
  331.     {
  332.         $exist $this->client->indices()->existsAlias(['name' => $alias]);
  333.         if ($exist) {
  334.             return;
  335.         }
  336.         $this->putAlias($index$alias);
  337.     }
  338.     /**
  339.      * @param array<string, string> $indices
  340.      */
  341.     private function swapAlias($indices): void
  342.     {
  343.         foreach ($indices as $alias => $index) {
  344.             $exist $this->client->indices()->existsAlias(['name' => $alias]);
  345.             if (!$exist) {
  346.                 $this->putAlias($index$alias);
  347.                 return;
  348.             }
  349.             $current $this->client->indices()->getAlias(['name' => $alias]);
  350.             if (!isset($current[$index])) {
  351.                 $this->putAlias($index$alias);
  352.             }
  353.             unset($current[$index]);
  354.             $current array_keys($current);
  355.             foreach ($current as $value) {
  356.                 $this->client->indices()->delete(['index' => $value]);
  357.             }
  358.         }
  359.     }
  360.     private function putAlias(string $indexstring $alias): void
  361.     {
  362.         $this->client->indices()->refresh([
  363.             'index' => $index,
  364.         ]);
  365.         $this->client->indices()->putAlias(['index' => $index'name' => $alias]);
  366.     }
  367. }