Ir al contenido

Blog


Indexación más rápida con Apache Kafka y Elasticsearch

14 de julio de 2021

|
Satish Saley

Satish Saley

Danial Asif

Danial Asif

Siddharth Kumar

Siddharth Kumar

Mantener una experiencia de pedido en línea agradable implica garantizar que los grandes índices de búsqueda sigan siendo eficaces a escala. Para DoorDash, esto suponía un reto especial, ya que el número de tiendas, artículos y otros datos aumentaba cada día. Con esta carga, podía llevar hasta una semana reindexar todos los cambios y actualizar nuestra base de datos de búsqueda. 

We needed a fast way to index all of our platform’s searchable data to improve product discovery, ensuring that we offered consumers all available ordering options. In addition, this project would also increase the speed of experimentation on our platform so we could improve our search performance more quickly. 

Nuestra solución consistió en crear una nueva plataforma de indexación de búsquedas que utiliza la indexación incremental en nuestras fuentes de datos. Basamos esta plataforma en tres proyectos de código abierto: Apache Kafka, Apache Flink y Elasticsearch.

DoorDash’s problem with search indexing 

Nuestro sistema de indexación heredado no era fiable ni ampliable, y además era lento. Un sistema de indexación fiable garantizaría que los cambios en los almacenes y los artículos se reflejaran en el índice de búsqueda en tiempo real. La indexación incremental ayuda a actualizar los datos más rápidamente, construyendo índices frescos para introducir nuevos analizadores y campos adicionales en menos tiempo, lo que en última instancia ayuda a mejorar la recuperación.

Teams from new business verticals within DoorDash wanted to build their own search experience but didn’t want to reinvent the wheel when it came to indexing the search data. Therefore, we needed a plug-and-play solution to improve new search experiences without slowing down development for these business vertical teams. 

Creación de un proceso de indexación de documentos basado en eventos 

Hemos resuelto estos problemas creando una nueva plataforma de indexación de búsquedas que proporciona una indexación rápida y fiable para diferentes sectores verticales, al tiempo que mejora el rendimiento de las búsquedas y la productividad de los equipos de búsqueda. Utiliza Kafka como cola de mensajes y para el almacenamiento de datos, y Flink para la transformación de datos y su envío a Elasticsearch.

Arquitectura de alto nivel

Diagrama del proceso de indexación de datos
Figura 1: El canal de datos de nuestro nuevo sistema de índices de búsqueda utiliza Kafka para la cola de mensajes y el almacenamiento de datos, y Flink para ETL y la sincronización con Elasticsearch.

La figura 1 muestra varios componentes de nuestra cadena de índices de búsqueda. Los componentes se agrupan en cuatro categorías:

  • Fuentes de datos: Son los sistemas que poseen operaciones CRUD sobre los datos. Los llamamos la fuente de verdad de los datos. En nuestra pila utilizamos Postgres como base de datos y Snowflake como almacén de datos. 
  • Destino de los datos: Es el almacén de datos que se ha optimizado para la búsqueda. En nuestro caso elegimos Elasticsearch.
  • Aplicación Flink: Añadimos dos aplicaciones Flink personalizadas en nuestra canalización de indexación, Assemblers para transformar los datos y Sinks para enviar los datos al almacenamiento de destino. Los ensambladores se encargan de ensamblar todos los datos necesarios en un documento de Elasticsearch. Los sumideros son responsables de dar forma a los documentos según el esquema y escribir los datos en el clúster Elasticsearch de destino.
  • Cola de mensajes: Utilizamos Kafka como tecnología de cola de mensajes. El componente Kafka 2, de la Figura 1, más arriba, utiliza los temas de registro compactados y conservados indefinidamente.

Bound together, these components comprise an-end to-end data pipeline. The data changes in data sources are propagated to Flink applications using Kafka. Flink applications implement business logic to curate search documents and write those to the destination. Now that we understand the high level components, let’s go through the different indexing use cases.

Manténgase informado con las actualizaciones semanales

Suscríbase a nuestro blog de ingeniería para recibir actualizaciones periódicas sobre los proyectos más interesantes en los que trabaja nuestro equipo.

Indexación incremental

El proceso de indexación procesa los cambios incrementales de datos procedentes de dos fuentes distintas. La primera captura los cambios de datos a medida que se producen en tiempo real. Normalmente, estos eventos se generan cuando operadores humanos realizan cambios ad hoc en almacenes o artículos. La segunda son los cambios de datos ETL. Nuestros modelos de aprendizaje automático generan datos ETL en un almacén de datos. El proceso de indexación gestiona los eventos de estas dos fuentes de datos de forma diferente.

Indexación de eventos de captura de datos de cambios (CDC) 

DoorDash’s data about merchants gets created and updated continuously, and needs to be addressed by our index pipeline solution. For example, these updates can be anything from merchant operators adding tags to a store to updating menus. We need to reflect these changes on the consumer experience as quickly as possible or consumers will see stale data in the application. These updates to the platform are saved in data stores such as Postgres and Apache Cassandra. Iterative workflows also crunch the data in the data warehouse with daily cadence, powering things such as business intelligence applications.

To reliably capture these update events from a service’s database, we explored enabling change data capture (CDC) for Aurora/Postgres using Debezium connector, a Red Hat-developed open source project for capturing row-level changes. The initial performance testing carried out by the storage team suggested that this strategy had too much overhead and was not performant, especially when the service uses the same database for serving online traffic. Therefore, we implemented save hooks in the application, which are responsible for handling data update requests, to propagate change events through Kafka whenever there is a change on the underlying data store. We call this approach Application Level CDC.

With Application Level CDC, we could run into consistency issues. A distributed application has multiple instances. Two separate update calls may get served via two different instances. If we include updated values in the Kafka messages, it wouldn’t guarantee consistency and solve the issue because in certain cases multiple instances of the application will push events that are updating the same value. 

Por ejemplo, si la instancia de aplicación nº 1 envía un evento, {store_id: 10, is_active=true}y la instancia de aplicación nº 2 envía un evento, {store_id: 10, is_active=false}...habría conflictos en el lado del consumidor.

Para garantizar la coherencia, solo enviamos los ID de entidad modificados en los eventos de Kafka. Al recibir los eventos de Kafka, nuestra aplicación Assembler llama a las API de REST de la aplicación para recopilar otra información sobre las entidades que están presentes en los eventos de Kafka. Las llamadas a las API REST garantizan la coherencia de los datos sobre la entidad. Assembler combina la información para crear un evento que envía a Kafka para que la aplicación Sink lo consuma. El ensamblador implementa una deduplicación por ventanas, que impide llamar a las API REST para la misma entidad varias veces en un periodo de tiempo determinado. El ensamblador también agrega eventos para llamar a puntos finales REST de forma masiva. Por ejemplo, durante un periodo de 10 segundos, agrega actualizaciones de artículos para una tienda. Llama a las API de REST para ese almacén incluyendo todos los artículos deducidos y agregados.

En resumen, utilizamos el CDC a nivel de aplicación para capturar los eventos de cambio de datos. Resolvemos los problemas de coherencia con eventos simplificados y API REST. Utilizamos funciones de deduplicación y ventana para optimizar el procesamiento de eventos. 

Indexación de datos ETL

Many properties of the store and item documents that are critical to our retrieval process, such as scores and tags generated by ML models, are updated in bulk once a day. This data is either model generated, as when an ML model runs the freshest data, or manually curated, as when our human operators manually tag items with “chicken” for a particular store. This data gets populated into tables in our data warehouse after a nightly run of the respective ETL jobs. 

Antes de nuestra nueva plataforma de índice de búsqueda, no disponíamos de una forma fiable de cargar datos en nuestro índice, sino que utilizábamos soluciones lentas e imprecisas. Queríamos mejorar nuestro proceso actual dotando a nuestra nueva plataforma de índices de búsqueda del mecanismo necesario para introducir datos ETL en nuestro índice de forma fiable en un plazo de 24 horas. 

Los patrones de CDC para el caso de uso de ETL son muy diferentes del caso de actualización incremental descrito en la sección anterior. En el caso de la actualización incremental, los almacenes de datos comerciales se actualizan constantemente, lo que da lugar a un flujo continuo de actualizaciones a lo largo del día. En cambio, en el caso de uso de ETL, las actualizaciones se producen de una sola vez cuando se ejecuta la ETL, sin que se produzcan otras actualizaciones hasta la siguiente ejecución.

We decided not to use a variant of the Application Level CDC for the ETL sources because we would see large spikes in updates everytime the ETL ran, and this spike could overly stress our systems and degrade performance. Instead, we wanted a mechanism to spread out the ETL ingestion over an interval so that systems don't get overwhelmed.

Como solución, desarrollamos una función de origen de Flink personalizada que transmite periódicamente todas las filas de una tabla ETL a Kafka en lotes, cuyo tamaño se elige para garantizar que los sistemas posteriores no se vean desbordados. 

Envío de documentos a Elasticsearch

Una vez que las aplicaciones de ensamblaje publican los datos en los temas de destino, tenemos un consumidor que lee los mensajes hidratados, los transforma de acuerdo con el esquema de índice específico y los envía a su índice correspondiente. Este proceso requiere la gestión del esquema, el índice y el clúster. Mantenemos un único grupo de consumidores Kafka por cada índice ElasticSearch para que los consumidores puedan mantener desplazamientos para cada índice. Para transformar los mensajes, utilizamos un DocumentProcessor(s), que toma un evento hidratado del tema de destino y emite documentos formateados que están listos para ser indexados. 

El proceso Sink utiliza Flink Elasticsearch Connector para escribir documentos JSON en Elasticsearch. Tiene capacidades de limitación de velocidad y estrangulamiento, esenciales para proteger los clústeres de Elasticsearch cuando el sistema está sometido a una gran carga de escritura. El proceso también admite la indexación masiva, en la que reunimos todos los documentos y las operaciones relevantes durante una ventana de tiempo y realizamos solicitudes masivas. Cualquier fallo a la hora de indexar un documento hace que éste se registre y se almacene en una cola de letra muerta que puede procesarse más tarde.

Rellenar rápidamente un nuevo índice

A menudo, es posible que queramos añadir una nueva propiedad a nuestro índice, como añadir el ID de mercado asociado a una tienda o artículo al documento porque nos ayuda en la fragmentación. Del mismo modo, puede que necesitemos recrear rápidamente un nuevo índice, como cuando queremos probar diferentes estructuras de índice para ejecutar benchmarks de eficiencia. 

En el sistema heredado dependíamos de un trabajo lento y poco fiable que normalmente tardaba un mes en reindexar todos los documentos de la tienda y los artículos. Dada la larga duración de la indexación, era difícil calcular correctamente la tasa de error asociada al proceso de reindexación. Por tanto, nunca estábamos seguros de la calidad de la indexación. A menudo recibíamos quejas sobre desajustes en los detalles de la tienda entre el índice y la fuente de verdad, que había que corregir manualmente.

Con nuestra nueva plataforma de índice de búsqueda, queríamos un proceso para recrear rápidamente un nuevo índice o rellenar una propiedad en un índice existente en 24 horas. Para el proceso de bootstrapping, necesitábamos un mecanismo para recrear rápidamente todos los documentos que debían indexarse en Elasticsearch. Este proceso implica dos pasos: 

  1. Transmisión de todos los ID de entidad correspondientes a los documentos que deben indexarse en ElasticSearch 
  2. Asignación de los ID de entidad a su forma final mediante llamadas externas antes de que se envíen para su indexación. 

El proceso para asignar el ID de entidad a la forma final de la entidad ya se había establecido como parte de nuestro trabajo en el ensamblador en línea, mencionado anteriormente. Por lo tanto, todo lo que se necesitaba era transmitir todos los ID de documentos que necesitaban ser indexados en Elasticsearch. En consecuencia, mantenemos una copia actualizada de todos los ID de entidad que necesitan ser indexados en tablas bootstrap en nuestro almacén de datos. Cuando necesitamos realizar el bootstrap, utilizamos la función de origen descrita en la sección ETL para transmitir todas las filas de estas tablas bootstrap a Kafka. Encapsulamos la lógica para realizar los dos pasos anteriores en un único trabajo.

Si ejecutamos nuestro proceso de indexación incremental al mismo tiempo que nuestro proceso de arranque, corremos el riesgo de obtener datos obsoletos en Elasticsearch. Para evitar estos problemas, reducimos nuestro indexador incremental cada vez que se ejecuta el bootstrap, y lo volvemos a aumentar una vez que el bootstrap se ha completado.

Si lo ponemos todo junto, los pasos que damos para rellenar y recrear el índice son los siguientes:

  • Cree el índice y actualice sus propiedades según sea necesario, y actualice la lógica empresarial y las configuraciones en el ensamblador y el sumidero para rellenar la nueva propiedad.
  • Reducir el ensamblador en línea. 
  • Amplía el trabajo de arranque.
  • Una vez finalizado el bootstrap, reduzca el trabajo de bootstrap y aumente el ensamblador en línea. Cuando el desplazamiento sea reciente, el proceso de arranque habrá finalizado.

Activación de una función de reindexación forzada 

From time to time, some of our documents in Elasticsearch might have stale data, possibly because some events from upstream didn’t get delivered, or one of our downstream services took too long to respond. In such cases, we can force a reindex of any documents in question. 

Para realizar esta tarea, enviamos un mensaje con el ID de la entidad a indexar al tema del que consume datos el ensamblador en línea. Una vez consumido el mensaje, se pone en marcha nuestro canal de indexación descrito anteriormente y cada documento se vuelve a indexar en Elasticsearch.

Anotamos los mensajes que se envían en nuestras tareas puntuales de indexación con etiquetas únicas que nos proporcionan una traza detallada del documento a medida que pasa por las distintas etapas del flujo de indexación. Además de garantizarnos que el documento ha sido indexado, nos proporciona una gran cantidad de información de depuración que nos ayuda a validar y descubrir cualquier error que pudiera haber impedido su indexación en primer lugar.

Resultados

Nuestra nueva plataforma de indexación de búsquedas es más fiable. La velocidad de indexación incremental ayuda a que los datos se actualicen más rápido y aparezcan con mayor prontitud en nuestras aplicaciones de consumo. Una reindexación más rápida permite crear nuevos índices en poco tiempo para mejorar la recuperación: 

  • Reducción del tiempo de relleno de todo nuestro catálogo de tiendas de una semana a 6,5 horas.
  • Reducción del tiempo de relleno de todo nuestro catálogo de artículos de dos semanas a 6,5 horas.
  • Reducción del tiempo de reindexación de las tiendas y artículos existentes en la plataforma de una semana a 2 horas.

Conclusión

Los datos viven en el corazón de cualquier organización. Mover los datos sin problemas y remodelarlos para diferentes casos de uso es una operación esencial en nuestra arquitectura de microservicios. Esta nueva plataforma de índice de búsqueda permite a otros equipos de DoorDash diseñar experiencias de búsqueda para líneas de negocio específicas sin tener que construir una arquitectura de índice de búsqueda completamente nueva. Nuestra confianza en las herramientas de código abierto para este índice de búsqueda significa una gran cantidad de documentación accesible en línea e ingenieros con esta experiencia que podrían unirse a nuestro equipo. 

En general, este tipo de solución se aplica a cualquier empresa con un catálogo en línea grande y en crecimiento que se centre en realizar cambios en su experiencia de búsqueda. Al adoptar un enfoque similar al descrito anteriormente, los equipos pueden reducir el tiempo de reindexación y permitir iteraciones más rápidas y menos intervenciones manuales, al tiempo que mejoran la precisión de su índice. Nuestro enfoque es especialmente beneficioso para las empresas que tienen un catálogo en rápido crecimiento y múltiples operadores manuales que realizan cambios que deben reflejarse en el índice.  

Fotografía del encabezado de Jan Antonin Kolar en Unsplash.

Sobre los autores

Trabajos relacionados

Ubicación
San Francisco, CA; Sunnyvale, CA
Departamento
Ingeniería
Ubicación
San Francisco, CA; Sunnyvale, CA; Seattle, WA
Departamento
Ingeniería
Ubicación
Pune, India
Departamento
Ingeniería
Ubicación
San Francisco, CA; Seattle, WA; Sunnyvale, CA
Departamento
Ingeniería
Ubicación
San Francisco, CA; Seattle, WA; Sunnyvale, CA
Departamento
Ingeniería