Skip to content

Blog


Construire une indexation plus rapide avec Apache Kafka et Elasticsearch

14 juillet 2021

|

Danial Asif

Pour maintenir une expérience de commande en ligne agréable, il faut s'assurer que les index de recherche importants restent efficaces à grande échelle. Pour DoorDash, il s'agissait d'un défi particulier car le nombre de magasins, d'articles et d'autres données augmentait chaque jour. Dans ces conditions, la réindexation de tous les changements et la mise à jour de notre base de données de recherche pouvaient prendre jusqu'à une semaine. 

We needed a fast way to index all of our platform‚Äôs searchable data to improve product discovery, ensuring that we offered consumers all available ordering options. In addition, this project would also increase the speed of experimentation on our platform so we could improve our search performance more quickly. 

Notre solution a consisté à créer une nouvelle plateforme d'indexation de recherche qui utilise l'indexation incrémentale sur nos sources de données. Nous avons basé cette plateforme sur trois projets open source, Apache Kafka, Apache Flink et Elasticsearch.

DoorDash‚Äôs problem with search indexing 

Notre ancien système d'indexation n'était ni fiable ni extensible, et il était lent. Un système d'indexation fiable garantirait que les modifications apportées aux magasins et aux articles sont reflétées dans l'index de recherche en temps réel. L'indexation incrémentale permet d'actualiser les données plus rapidement, en construisant de nouveaux index pour introduire de nouveaux analyseurs et des champs supplémentaires dans des délais plus courts, ce qui permet en fin de compte d'améliorer la recherche.

Teams from new business verticals within DoorDash wanted to build their own search experience but didn‚Äôt want to reinvent the wheel when it came to indexing the search data. Therefore, we needed a plug-and-play solution to improve new search experiences without slowing down development for these business vertical teams. 

Construction d'un pipeline événementiel pour l'indexation de documents 

Nous avons résolu ces problèmes en construisant une nouvelle plateforme d'indexation de recherche qui fournit une indexation rapide et fiable pour alimenter différents secteurs verticaux tout en améliorant les performances de recherche et la productivité de l'équipe de recherche. Elle utilise Kafka comme file d'attente de messages et pour le stockage des données, et Flink pour la transformation des données et l'envoi des données à Elasticsearch.

Architecture de haut niveau

Schéma du pipeline d'indexation des données
Figure 1 : Le pipeline de données de notre nouveau système d'indexation utilise Kafka pour la mise en file d'attente des messages et le stockage des données, et Flink pour l'ETL et la synchronisation avec Elasticsearch.

La figure 1 ci-dessus montre les différents composants de notre pipeline d'index de recherche. Les composants sont regroupés en quatre catégories :

  • Les sources de données : Il s'agit des systèmes qui effectuent des opérations CRUD sur les données. Nous les appelons la source de vérité pour les données. Dans notre pile, nous avons utilisé Postgres comme base de données et Snowflake comme entrepôt de données. 
  • Destination des données : Il s'agit du magasin de données qui a été optimisé pour la recherche. Dans notre cas, nous avons choisi Elasticsearch.
  • Application Flink : Nous avons ajouté deux applications Flink personnalisées dans notre pipeline d'indexation, Assemblers pour la transformation des données et Sinks pour l'envoi des données vers le stockage de destination. Les assembleurs sont chargés d'assembler toutes les données requises dans un document Elasticsearch. Les Sinks sont responsables de la mise en forme des documents selon le schéma et de l'écriture des données vers le cluster Elasticsearch ciblé.
  • File d'attente de messages : Nous avons utilisé Kafka comme technologie de file d'attente de messages. Le composant Kafka 2, de la figure 1 ci-dessus, utilise les rubriques " log compacted and preserved indefinitely ".

Bound together, these components comprise an-end to-end data pipeline. The data changes in data sources are propagated to Flink applications using Kafka. Flink applications implement business logic to curate search documents and write those to the destination. Now that we understand the high level components, let’s go through the different indexing use cases.

Restez informé grâce aux mises à jour hebdomadaires

Abonnez-vous à notre blog d'ingénierie pour recevoir régulièrement des informations sur les projets les plus intéressants sur lesquels notre équipe travaille.

Indexation incrémentale

Le pipeline d'indexation traite les modifications incrémentielles des données provenant de deux sources différentes. La première capture les modifications de données au fur et à mesure qu'elles se produisent en temps réel. En général, ces événements sont générés lorsque des opérateurs humains apportent des modifications ad hoc aux magasins ou aux articles. La seconde concerne les modifications de données ETL. Nos modèles d'apprentissage automatique génèrent des données ETL dans un entrepôt de données. Le pipeline d'indexation traite différemment les événements provenant de ces deux sources de données.

Indexation des événements de capture des données de changement (CDC) 

DoorDash’s data about merchants gets created and updated continuously, and needs to be addressed by our index pipeline solution. For example, these updates can be anything from merchant operators adding tags to a store to updating menus. We need to reflect these changes on the consumer experience as quickly as possible or consumers will see stale data in the application. These updates to the platform are saved in data stores such as Postgres and Apache Cassandra. Iterative workflows also crunch the data in the data warehouse with daily cadence, powering things such as business intelligence applications.

To reliably capture these update events from a service’s database, we explored enabling change data capture (CDC) for Aurora/Postgres using Debezium connector, a Red Hat-developed open source project for capturing row-level changes. The initial performance testing carried out by the storage team suggested that this strategy had too much overhead and was not performant, especially when the service uses the same database for serving online traffic. Therefore, we implemented save hooks in the application, which are responsible for handling data update requests, to propagate change events through Kafka whenever there is a change on the underlying data store. We call this approach Application Level CDC.

With Application Level CDC, we could run into consistency issues. A distributed application has multiple instances. Two separate update calls may get served via two different instances. If we include updated values in the Kafka messages, it wouldn‚Äôt guarantee consistency and solve the issue because in certain cases multiple instances of the application will push events that are updating the same value. 

Par exemple, si l'instance d'application n° 1 envoie un événement, {store_id: 10, is_active=true}et l'instance d'application n° 2 envoie un événement, {store_id: 10, is_active=false}Il y aurait des conflits du côté des consommateurs.

Pour garantir la cohérence, nous n'envoyons que les ID d'entités modifiées dans les événements Kafka. À la réception des événements Kafka, notre application Assembler appelle les API REST de l'application pour recueillir d'autres informations sur les entités présentes dans les événements Kafka. Les appels à l'API REST garantissent la cohérence des données concernant l'entité. L'assembleur fusionne les informations pour créer un événement qu'il pousse vers Kafka pour que l'application Sink puisse le consommer. L'assembleur met en œuvre une déduplication fenêtrée, qui empêche d'appeler les API REST pour la même entité plusieurs fois dans un laps de temps donné. L'assembleur procède également à l'agrégation d'événements afin d'appeler des points d'extrémité REST en masse. Par exemple, sur une période de 10 secondes, il regroupe les mises à jour d'articles pour un magasin. Il appelle les API REST de ce magasin, y compris tous les éléments déduits et agrégés.

En résumé, nous utilisons le CDC au niveau de l'application pour capturer les événements de changement de données. Nous résolvons les problèmes de cohérence à l'aide d'événements simplifiés et d'API REST. Nous utilisons des fonctions de déduplication et de fenêtre pour optimiser le traitement des événements. 

Indexation des données ETL

Many properties of the store and item documents that are critical to our retrieval process, such as scores and tags generated by ML models, are updated in bulk once a day. This data is either model generated, as when an ML model runs the freshest data, or manually curated, as when our human operators manually tag items with ‚Äúchicken‚Äù for a particular store. This data gets populated into tables in our data warehouse after a nightly run of the respective ETL jobs. 

Avant notre nouvelle plateforme d'indexation, nous ne disposions pas d'un moyen fiable de télécharger des données dans notre index, mais nous utilisions des solutions de contournement lentes et imprécises. Nous voulions améliorer notre pipeline existant en donnant à notre nouvelle plateforme d'indexation le mécanisme nécessaire pour ingérer de manière fiable les données ETL dans notre index dans un délai de 24 heures. 

Les modèles CDC pour le cas d'utilisation ETL sont très différents du cas de mise à jour incrémentielle décrit dans la section précédente. Dans le cas de la mise à jour incrémentielle, les magasins de données des commerçants sont constamment mis à jour, ce qui se traduit par un flux continu de mises à jour au cours de la journée. En revanche, dans le cas de l'ETL, les mises à jour sont effectuées en une seule fois lors de l'exécution de l'ETL, sans autre mise à jour jusqu'à la prochaine exécution.

Nous avons décidé de ne pas utiliser une variante du CDC au niveau de l'application pour les sources de l'ETL, car nous verrions des pics importants de mises à jour à chaque fois que l'ETL s'exécuterait, et ces pics risqueraient de surcharger nos systèmes et de dégrader les performances. Nous voulions plutôt un mécanisme permettant de répartir l'ingestion de l'ETL sur un intervalle afin que les systèmes ne soient pas submergés.

Pour y remédier, nous avons développé une fonction source Flink personnalisée qui transmet périodiquement toutes les lignes d'une table ETL à Kafka par lots, la taille des lots étant choisie de manière à ce que les systèmes en aval ne soient pas submergés. 

Envoi de documents à Elasticsearch

Une fois que les applications Assembler publient des données vers les thèmes de destination, nous avons un consommateur qui lit les messages hydratés, transforme les messages selon le schéma d'index spécifique et les envoie à l'index approprié. Ce processus nécessite la gestion du schéma, de l'index et du cluster. Nous maintenons un groupe de consommateurs Kafka unique par index ElasticSearch afin que les consommateurs puissent maintenir des décalages pour chaque index. Pour transformer les messages, nous utilisons un DocumentProcessor(s), qui reçoit un événement hydraté du sujet de destination et produit des documents formatés prêts à être indexés. 

Le processus Sink utilise le connecteur Flink Elasticsearch pour écrire des documents JSON dans Elasticsearch. Il dispose d'emblée de capacités de limitation de débit et d'étranglement, essentielles pour protéger les clusters Elasticsearch lorsque le système est soumis à une forte charge d'écriture. Le processus prend également en charge l'indexation en bloc, où nous rassemblons tous les documents et les opérations pertinentes sur une fenêtre temporelle et effectuons des requêtes en bloc. En cas d'échec de l'indexation d'un document, celui-ci est enregistré et stocké dans une file d'attente de lettres mortes qui peut être traitée ultérieurement.

Remplir rapidement un nouvel index

Il arrive souvent que nous voulions ajouter une nouvelle propriété à notre index, par exemple l'ID du marché associé à un magasin ou à un article dans le document, parce que cela nous aide dans le sharding. De même, nous pouvons avoir besoin de recréer rapidement un nouvel index, par exemple lorsque nous voulons essayer différentes structures d'index pour effectuer des tests d'efficacité. 

Dans l'ancien système, nous nous appuyions sur un travail lent et peu fiable qui prenait généralement un mois pour réindexer tous les documents des magasins et des articles. Compte tenu de la longue durée de l'indexation, il était difficile d'estimer correctement le taux d'erreur associé au processus de réindexation. Nous n'étions donc jamais certains de la qualité de l'indexation. Nous avons souvent reçu des plaintes concernant des incohérences dans les détails des magasins entre l'index et la source de vérité, qui devaient être corrigées manuellement.

Avec notre nouvelle plateforme d'index de recherche, nous voulions un processus permettant de recréer rapidement un nouvel index ou de remplir une propriété dans un index existant dans un délai de 24 heures. Pour le processus d'amorçage, nous avions besoin d'un mécanisme permettant de recréer rapidement tous les documents devant être indexés dans Elasticsearch. Ce processus comporte deux étapes : 

  1. Streaming de tous les ID d'entités correspondant aux documents qui doivent être indexés dans ElasticSearch 
  2. Mappage des identifiants d'entités dans leur forme finale en effectuant des appels externes avant qu'ils ne soient envoyés en aval pour l'indexation. 

Le pipeline de mise en correspondance de l'ID de l'entité avec la forme finale de l'entité avait déjà été établi dans le cadre de notre travail sur l'assembleur en ligne, mentionné ci-dessus. Par conséquent, tout ce qu'il fallait faire, c'était de diffuser tous les identifiants de documents qui devaient être indexés dans Elasticsearch. En conséquence, nous maintenons une copie à jour de tous les identifiants d'entités qui doivent être indexés dans des tables d'amorçage dans notre entrepôt de données. Lorsque nous avons besoin de démarrer, nous utilisons la fonction source décrite dans la section ETL pour diffuser toutes les lignes de ces tables d'amorçage vers Kafka. Nous encapsulons la logique permettant d'exécuter les deux étapes ci-dessus dans un seul job.

Si nous exécutons notre pipeline d'indexation incrémentale en même temps que notre pipeline d'amorçage, nous risquons d'obtenir des données périmées dans Elasticsearch. Pour éviter ces problèmes, nous réduisons notre indexeur incrémental à chaque fois que le bootstrap est exécuté, et nous l'augmentons à nouveau une fois le bootstrap terminé.

En résumé, les étapes à suivre pour remblayer et recréer l'index sont les suivantes :

  • Créez l'index et mettez à jour ses propriétés si nécessaire, et mettez à jour la logique métier et les configurations dans l'assembleur et le puits pour remplir la nouvelle propriété.
  • Réduire la taille de l'assembleur en ligne. 
  • Augmenter le travail d'amorçage.
  • Une fois l'amorçage terminé, réduisez le travail d'amorçage et augmentez l'assembleur en ligne. Lorsque le décalage devient récent, le processus d'amorçage est terminé.

Activation d'une fonction de réindexation forcée 

From time to time, some of our documents in Elasticsearch might have stale data, possibly because some events from upstream didn‚Äôt get delivered, or one of our downstream services took too long to respond. In such cases, we can force a reindex of any documents in question. 

Pour accomplir cette tâche, nous envoyons un message avec l'ID de l'entité à indexer dans le thème à partir duquel l'assembleur en ligne consomme les données. Une fois le message consommé, notre pipeline d'indexation décrit ci-dessus est lancé et chaque document est réindexé dans Elasticsearch.

Nous annotons les messages envoyés dans le cadre de nos tâches d'indexation ponctuelles avec des balises uniques, ce qui nous fournit une trace détaillée du document à mesure qu'il passe par les différentes étapes du flux d'indexation. En plus de nous garantir que le document a bien été indexé, cela nous fournit une mine d'informations de débogage qui nous aident à valider et à découvrir les bogues qui auraient pu empêcher l'indexation du document en premier lieu.

Résultats

Notre nouvelle plateforme d'indexation est plus fiable. La vitesse d'indexation incrémentale permet d'actualiser les données plus rapidement et de les faire apparaître plus vite dans nos applications grand public. La réindexation plus rapide a permis de construire de nouveaux index en peu de temps afin d'améliorer notre recherche : 

  • Réduction du temps de remblayage de l'ensemble de notre catalogue de magasins d'une semaine à 6,5 heures.
  • Réduction du temps de remblayage de l'ensemble de notre catalogue d'articles de deux semaines à 6,5 heures.
  • Réduction du temps nécessaire à la réindexation des magasins et des articles existants sur la plateforme, qui est passé d'une semaine à deux heures.

Conclusion

Les données sont au cœur de toute organisation. Déplacer les données de manière transparente et les remodeler pour différents cas d'utilisation est une opération essentielle dans notre architecture de microservices. Cette nouvelle plateforme d'index de recherche permet à d'autres équipes de DoorDash de concevoir des expériences de recherche pour des secteurs d'activité spécifiques sans avoir à construire une toute nouvelle architecture d'index de recherche. Notre dépendance aux outils open source pour cet index de recherche signifie beaucoup de documentation accessible en ligne et des ingénieurs avec cette expertise qui pourraient rejoindre notre équipe. 

En règle générale, ce type de solution s'applique à toute entreprise disposant d'un catalogue en ligne important et en pleine expansion, et qui souhaite apporter des modifications à son expérience de recherche. En adoptant une approche similaire à celle décrite ci-dessus, les équipes peuvent réduire le temps de réindexation et permettre des itérations plus rapides et moins d'interventions manuelles tout en améliorant la précision de leur index. Notre approche est particulièrement bénéfique pour les entreprises dont le catalogue croît rapidement et dont les multiples opérateurs manuels effectuent des changements qui doivent être reflétés dans l'index.  

Photo d'en-tête par Jan Antonin Kolar sur Unsplash.

About the Author

Emplois connexes

Localisation
Seattle, WA; Sunnyvale, CA; San francisco, CA
Département
Ingénierie
Localisation
San Francisco, CA ; Sunnyvale, CA ; Los Angeles, CA ; Seattle, WA ; New York, NY
Département
Ingénierie
Localisation
San Francisco, CA ; Sunnyvale, CA ; Los Angeles, CA ; Seattle, WA ; New York, NY
Département
Ingénierie
Localisation
New York, NY; San Francisco, CA; Los Angeles, CA; Seattle, WA; Sunnyvale, CA
Département
Ingénierie
Localisation
Toronto, ON
Département
Ingénierie