Pour maintenir une expérience de commande en ligne agréable, il faut s'assurer que les index de recherche importants restent efficaces à grande échelle. Pour DoorDash, il s'agissait d'un défi particulier car le nombre de magasins, d'articles et d'autres données augmentait chaque jour. Dans ces conditions, la réindexation de tous les changements et la mise à jour de notre base de données de recherche pouvaient prendre jusqu'à une semaine.
Nous avions besoin d'un moyen rapide d'indexer toutes les données consultables de notre plateforme afin d'améliorer la découverte des produits, en veillant à offrir aux consommateurs toutes les options de commande disponibles. En outre, ce projet permettrait d'augmenter la vitesse d'expérimentation sur notre plateforme afin d'améliorer plus rapidement nos performances de recherche.
Notre solution a consisté à créer une nouvelle plateforme d'indexation de recherche qui utilise l'indexation incrémentale sur nos sources de données. Nous avons basé cette plateforme sur trois projets open source, Apache Kafka, Apache Flink et Elasticsearch.
Le problème d'indexation de DoorDash
Notre ancien système d'indexation n'était ni fiable ni extensible, et il était lent. Un système d'indexation fiable garantirait que les modifications apportées aux magasins et aux articles sont reflétées dans l'index de recherche en temps réel. L'indexation incrémentale permet d'actualiser les données plus rapidement, en construisant de nouveaux index pour introduire de nouveaux analyseurs et des champs supplémentaires dans des délais plus courts, ce qui permet en fin de compte d'améliorer la recherche.
Les équipes des nouveaux secteurs d'activité de DoorDash voulaient créer leur propre expérience de recherche, mais ne voulaient pas réinventer la roue lorsqu'il s'agissait d'indexer les données de recherche. Nous avions donc besoin d'une solution prête à l'emploi pour améliorer les nouvelles expériences de recherche sans ralentir le développement de ces équipes.
Construction d'un pipeline événementiel pour l'indexation de documents
Nous avons résolu ces problèmes en construisant une nouvelle plateforme d'indexation de recherche qui fournit une indexation rapide et fiable pour alimenter différents secteurs verticaux tout en améliorant les performances de recherche et la productivité de l'équipe de recherche. Elle utilise Kafka comme file d'attente de messages et pour le stockage des données, et Flink pour la transformation des données et l'envoi des données à Elasticsearch.
Architecture de haut niveau
La figure 1 ci-dessus montre les différents composants de notre pipeline d'index de recherche. Les composants sont regroupés en quatre catégories :
- Les sources de données : Il s'agit des systèmes qui effectuent des opérations CRUD sur les données. Nous les appelons la source de vérité pour les données. Dans notre pile, nous avons utilisé Postgres comme base de données et Snowflake comme entrepôt de données.
- Destination des données : Il s'agit du magasin de données qui a été optimisé pour la recherche. Dans notre cas, nous avons choisi Elasticsearch.
- Application Flink : Nous avons ajouté deux applications Flink personnalisées dans notre pipeline d'indexation, Assemblers pour la transformation des données et Sinks pour l'envoi des données vers le stockage de destination. Les assembleurs sont chargés d'assembler toutes les données requises dans un document Elasticsearch. Les Sinks sont responsables de la mise en forme des documents selon le schéma et de l'écriture des données vers le cluster Elasticsearch ciblé.
- File d'attente de messages : Nous avons utilisé Kafka comme technologie de file d'attente de messages. Le composant Kafka 2, de la figure 1 ci-dessus, utilise les rubriques "log compacted and preserved indefinitely ".
Liés ensemble, ces composants constituent un pipeline de données de bout en bout. Les changements de données dans les sources de données sont propagés aux applications Flink à l'aide de Kafka. Les applications Flink mettent en œuvre une logique métier pour traiter les documents de recherche et les écrire vers la destination. Maintenant que nous comprenons les composants de haut niveau, passons en revue les différents cas d'utilisation de l'indexation.
Restez informé grâce aux mises à jour hebdomadaires
Abonnez-vous à notre blog d'ingénierie pour recevoir régulièrement des informations sur les projets les plus intéressants sur lesquels notre équipe travaille.
Veuillez saisir une adresse électronique valide.
Merci de vous être abonné !
Indexation incrémentale
Le pipeline d'indexation traite les modifications incrémentielles des données provenant de deux sources différentes. La première capture les modifications de données au fur et à mesure qu'elles se produisent en temps réel. En général, ces événements sont générés lorsque des opérateurs humains apportent des modifications ad hoc aux magasins ou aux articles. La seconde concerne les modifications de données ETL. Nos modèles d'apprentissage automatique génèrent des données ETL dans un entrepôt de données. Le pipeline d'indexation traite différemment les événements provenant de ces deux sources de données.
Indexation des événements de capture des données de changement (CDC)
Les données de DoorDash sur les commerçants sont créées et mises à jour en permanence, et doivent être traitées par notre solution de pipeline d'indexation. Par exemple, ces mises à jour peuvent aller de l'ajout de tags par les commerçants à la mise à jour des menus. Nous devons répercuter ces changements sur l'expérience du consommateur le plus rapidement possible, sinon les consommateurs verront des données périmées dans l'application. Ces mises à jour de la plateforme sont enregistrées dans des magasins de données tels que Postgres et Apache Cassandra. Des flux de travail itératifs traitent également les données dans l'entrepôt de données à une cadence quotidienne, alimentant des applications telles que les applications de veille stratégique.
Pour capturer de manière fiable ces événements de mise à jour à partir de la base de données d'un service, nous avons étudié la possibilité d'activer la capture des données de changement (CDC) pour Aurora/Postgres à l'aide du connecteur Debezium, un projet open source développé par Red Hat pour capturer les changements au niveau des lignes. Les tests de performance initiaux effectués par l'équipe de stockage ont suggéré que cette stratégie entraînait trop de frais généraux et n'était pas performante, en particulier lorsque le service utilise la même base de données pour desservir le trafic en ligne. Par conséquent, nous avons mis en œuvre des crochets de sauvegarde dans l'application, qui sont responsables du traitement des demandes de mise à jour des données, afin de propager les événements de changement via Kafka chaque fois qu'il y a un changement dans le magasin de données sous-jacent. Nous appelons cette approche CDC au niveau de l'application.
Avec le CDC au niveau de l'application, nous pourrions rencontrer des problèmes de cohérence. Une application distribuée possède plusieurs instances. Deux appels de mise à jour distincts peuvent être servis par deux instances différentes. Si nous incluons les valeurs mises à jour dans les messages Kafka, cela ne garantira pas la cohérence et ne résoudra pas le problème car, dans certains cas, plusieurs instances de l'application enverront des événements qui mettent à jour la même valeur.
Par exemple, si l'instance d'application n° 1 envoie un événement, {store_id: 10, is_active=true}
et l'instance d'application n° 2 envoie un événement, {store_id: 10, is_active=false}
Il y aurait des conflits du côté des consommateurs.
Pour assurer la cohérence, nous n'envoyons que les ID d'entités modifiées dans les événements Kafka. À la réception des événements Kafka, notre application Assembler appelle les API REST de l'application pour recueillir d'autres informations sur les entités présentes dans les événements Kafka. Les appels à l'API REST garantissent la cohérence des données concernant l'entité. L'assembleur fusionne les informations pour créer un événement qu'il pousse vers Kafka pour que l'application Sink puisse le consommer. L'assembleur met en œuvre une déduplication fenêtrée, qui empêche d'appeler les API REST pour la même entité plusieurs fois dans un laps de temps donné. L'assembleur procède également à l'agrégation d'événements afin d'appeler des points d'extrémité REST en masse. Par exemple, sur une période de 10 secondes, il regroupe les mises à jour d'articles pour un magasin. Il appelle les API REST de ce magasin, y compris tous les éléments déduits et agrégés.
En résumé, nous utilisons le CDC au niveau de l'application pour capturer les événements de changement de données. Nous résolvons les problèmes de cohérence à l'aide d'événements simplifiés et d'API REST. Nous utilisons des fonctions de déduplication et de fenêtre pour optimiser le traitement des événements.
Indexation des données ETL
De nombreuses propriétés des documents relatifs aux magasins et aux articles qui sont essentielles à notre processus d'extraction, telles que les scores et les étiquettes générés par les modèles ML, sont mises à jour en masse une fois par jour. Ces données sont soit générées par un modèle, par exemple lorsqu'un modèle de modélisation moléculaire exécute les données les plus récentes, soit créées manuellement, par exemple lorsque nos opérateurs humains étiquettent manuellement les articles avec le mot "poulet" pour un magasin particulier. Ces données sont intégrées dans les tables de notre entrepôt de données après l'exécution nocturne des tâches ETL correspondantes.
Avant notre nouvelle plateforme d'indexation, nous ne disposions pas d'un moyen fiable de télécharger des données dans notre index, mais nous utilisions des solutions de contournement lentes et imprécises. Nous voulions améliorer notre pipeline existant en donnant à notre nouvelle plateforme d'indexation le mécanisme nécessaire pour ingérer de manière fiable les données ETL dans notre index dans un délai de 24 heures.
Les modèles CDC pour le cas d'utilisation ETL sont très différents du cas de mise à jour incrémentielle décrit dans la section précédente. Dans le cas de la mise à jour incrémentielle, les magasins de données des commerçants sont constamment mis à jour, ce qui se traduit par un flux continu de mises à jour au cours de la journée. En revanche, dans le cas de l'ETL, les mises à jour sont effectuées en une seule fois lors de l'exécution de l'ETL, et il n'y a pas d'autres mises à jour avant l'exécution suivante.
Nous avons décidé de ne pas utiliser une variante du CDC au niveau de l'application pour les sources de l'ETL, car nous constaterions des pics importants de mises à jour à chaque fois que l'ETL s'exécuterait, ce qui risquerait de soumettre nos systèmes à une pression excessive et de dégrader les performances. Nous voulions plutôt un mécanisme permettant de répartir l'ingestion de l'ETL sur un intervalle afin que les systèmes ne soient pas submergés.
Pour y remédier, nous avons développé une fonction source Flink personnalisée qui transmet périodiquement toutes les lignes d'une table ETL à Kafka par lots, la taille des lots étant choisie de manière à ce que les systèmes en aval ne soient pas submergés.
Envoi de documents à Elasticsearch
Une fois que les applications Assembler publient des données vers les thèmes de destination, nous avons un consommateur qui lit les messages hydratés, transforme les messages selon le schéma d'index spécifique et les envoie à l'index approprié. Ce processus nécessite la gestion du schéma, de l'index et du cluster. Nous maintenons un groupe de consommateurs Kafka unique par index ElasticSearch afin que les consommateurs puissent maintenir des décalages pour chaque index. Pour transformer les messages, nous utilisons un DocumentProcessor(s), qui reçoit un événement hydraté du sujet de destination et produit des documents formatés prêts à être indexés.
Le processus Sink utilise le connecteur Flink Elasticsearch pour écrire des documents JSON dans Elasticsearch. Il dispose d'emblée de capacités de limitation de débit et d'étranglement, essentielles pour protéger les clusters Elasticsearch lorsque le système est soumis à une forte charge d'écriture. Le processus prend également en charge l'indexation en bloc, où nous rassemblons tous les documents et les opérations pertinentes sur une fenêtre temporelle et effectuons des requêtes en bloc. En cas d'échec de l'indexation d'un document, celui-ci est enregistré et stocké dans une file d'attente de lettres mortes qui peut être traitée ultérieurement.
Remplir rapidement un nouvel index
Il arrive souvent que nous voulions ajouter une nouvelle propriété à notre index, par exemple l'ID du marché associé à un magasin ou à un article dans le document, parce que cela nous aide dans le sharding. De même, nous pouvons avoir besoin de recréer rapidement un nouvel index, par exemple lorsque nous voulons essayer différentes structures d'index pour effectuer des tests d'efficacité.
Dans l'ancien système, nous nous appuyions sur un travail lent et peu fiable qui prenait généralement un mois pour réindexer tous les documents des magasins et des articles. Compte tenu de la longue durée de l'indexation, il était difficile d'estimer correctement le taux d'erreur associé au processus de réindexation. Nous n'étions donc jamais certains de la qualité de l'indexation. Nous avons souvent reçu des plaintes concernant des incohérences dans les détails des magasins entre l'index et la source de vérité, qui devaient être corrigées manuellement.
Avec notre nouvelle plateforme d'index de recherche, nous voulions un processus permettant de recréer rapidement un nouvel index ou de remplir une propriété dans un index existant dans un délai de 24 heures. Pour le processus d'amorçage, nous avions besoin d'un mécanisme permettant de recréer rapidement tous les documents devant être indexés dans Elasticsearch. Ce processus comporte deux étapes :
- Streaming de tous les ID d'entités correspondant aux documents qui doivent être indexés dans ElasticSearch
- Mappage des identifiants d'entités dans leur forme finale en effectuant des appels externes avant qu'ils ne soient envoyés en aval pour l'indexation.
Le pipeline de mise en correspondance de l'ID de l'entité avec la forme finale de l'entité avait déjà été établi dans le cadre de notre travail sur l'assembleur en ligne, mentionné ci-dessus. Par conséquent, tout ce qu'il fallait faire, c'était de diffuser tous les identifiants de documents qui devaient être indexés dans Elasticsearch. En conséquence, nous maintenons une copie à jour de tous les identifiants d'entités qui doivent être indexés dans des tables d'amorçage dans notre entrepôt de données. Lorsque nous avons besoin de démarrer, nous utilisons la fonction source décrite dans la section ETL pour envoyer toutes les lignes de ces tables d'amorçage vers Kafka. Nous encapsulons la logique permettant d'exécuter les deux étapes ci-dessus dans un seul job.
Si nous exécutons notre pipeline d'indexation incrémentale en même temps que notre pipeline d'amorçage, nous risquons d'obtenir des données périmées dans Elasticsearch. Pour éviter ces problèmes, nous réduisons notre indexeur incrémental à chaque fois que le bootstrap est exécuté, et nous l'augmentons à nouveau une fois le bootstrap terminé.
En résumé, les étapes à suivre pour remblayer et recréer l'index sont les suivantes :
- Créez l'index et mettez à jour ses propriétés si nécessaire, et mettez à jour la logique métier et les configurations dans l'assembleur et le puits pour remplir la nouvelle propriété.
- Réduire la taille de l'assembleur en ligne.
- Augmenter le travail d'amorçage.
- Une fois l'amorçage terminé, réduisez le travail d'amorçage et augmentez l'assembleur en ligne. Lorsque le décalage devient récent, le processus d'amorçage est terminé.
Activation d'une fonction de réindexation forcée
De temps en temps, certains de nos documents dans Elasticsearch peuvent avoir des données périmées, peut-être parce que certains événements en amont n'ont pas été livrés, ou que l'un de nos services en aval a pris trop de temps pour répondre. Dans de tels cas, nous pouvons forcer une réindexation des documents en question.
Pour accomplir cette tâche, nous envoyons un message avec l'ID de l'entité à indexer dans le thème à partir duquel l'assembleur en ligne consomme les données. Une fois le message consommé, notre pipeline d'indexation décrit ci-dessus est lancé et chaque document est réindexé dans Elasticsearch.
Nous annotons les messages envoyés dans le cadre de nos tâches d'indexation ponctuelles avec des balises uniques, ce qui nous fournit une trace détaillée du document à mesure qu'il passe par les différentes étapes du flux d'indexation. En plus de nous garantir que le document a bien été indexé, cela nous fournit une mine d'informations de débogage qui nous aident à valider et à découvrir les bogues qui auraient pu empêcher l'indexation du document en premier lieu.
Résultats
Notre nouvelle plateforme d'indexation est plus fiable. La vitesse d'indexation incrémentale permet d'actualiser les données plus rapidement et de les faire apparaître plus vite dans nos applications grand public. La réindexation plus rapide a permis de construire de nouveaux index en peu de temps afin d'améliorer notre recherche :
- Réduction du temps de remblayage de l'ensemble de notre catalogue de magasins d'une semaine à 6,5 heures.
- Réduction du temps de remblayage de l'ensemble de notre catalogue d'articles de deux semaines à 6,5 heures.
- Réduction du temps nécessaire à la réindexation des magasins et des articles existants sur la plateforme, qui est passé d'une semaine à deux heures.
Conclusion
Les données sont au cœur de toute organisation. Déplacer les données de manière transparente et les remodeler pour différents cas d'utilisation est une opération essentielle dans notre architecture de microservices. Cette nouvelle plateforme d'index de recherche permet à d'autres équipes de DoorDash de concevoir des expériences de recherche pour des secteurs d'activité spécifiques sans avoir à construire une toute nouvelle architecture d'index de recherche. Notre dépendance aux outils open source pour cet index de recherche signifie beaucoup de documentation accessible en ligne et des ingénieurs avec cette expertise qui pourraient rejoindre notre équipe.
En règle générale, ce type de solution s'applique à toute entreprise disposant d'un catalogue en ligne important et en pleine expansion, et qui souhaite apporter des modifications à son expérience de recherche. En adoptant une approche similaire à celle décrite ci-dessus, les équipes peuvent réduire le temps de réindexation et permettre des itérations plus rapides et moins d'interventions manuelles tout en améliorant la précision de leur index. Notre approche est particulièrement bénéfique pour les entreprises dont le catalogue croît rapidement et dont les multiples opérateurs manuels apportent des modifications qui doivent être reflétées dans l'index.
Photo d'en-tête par Jan Antonin Kolar sur Unsplash.