Utilice la biblioteca de clientes de Kinesis - Amazon Kinesis Data Streams

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Utilice la biblioteca de clientes de Kinesis

Uno de los métodos para desarrollar aplicaciones de consumo personalizadas que puedan procesar datos de flujos de KDS datos consiste en utilizar la biblioteca de clientes de Kinesis ()KCL.

nota

Tanto para la versión KCL 1.x como para la KCL 2.x, se recomienda actualizar a la última versión KCL 1.x o KCL 2.x, según el escenario de uso. Tanto la KCL 1.x como la KCL 2.x se actualizan periódicamente con las versiones más recientes, que incluyen los últimos parches de dependencia y seguridad, correcciones de errores y nuevas funciones compatibles con versiones anteriores. Para obtener más información, consulte /releases. https://github.com/awslabs/ amazon-kinesis-client

¿Qué es Kinesis Client Library?

KCLle ayuda a consumir y procesar datos de una transmisión de datos de Kinesis al encargarse de muchas de las tareas complejas asociadas a la informática distribuida. Estas incluyen equilibrar la carga entre varias instancias de aplicaciones de consumo, responder a los errores de las instancias de aplicaciones de consumo, comprobar los registros procesados y reaccionar ante la repartición. The KCL se encarga de todas estas subtareas para que pueda centrar sus esfuerzos en escribir su lógica de procesamiento de registros personalizada.

KCLEs diferente de los Kinesis Data APIs Streams que están disponibles en AWS SDKs. Los Kinesis Data APIs Streams le ayudan a gestionar muchos aspectos de Kinesis Data Streams, como la creación de transmisiones, la refragmentación y la creación y obtención de registros. KCLProporciona una capa de abstracción en torno a todas estas subtareas, específicamente para que pueda centrarse en la lógica de procesamiento de datos personalizada de su aplicación de consumo. Para obtener información sobre Kinesis Data API Streams, consulte la referencia de API Amazon Kinesis.

importante

KCLSe trata de una biblioteca de Java. Support para lenguajes distintos de Java se proporciona mediante una interfaz multilingüe llamada. MultiLangDaemon Este daemon está basado en Java y se ejecuta en segundo plano cuando se utiliza un KCL lenguaje distinto de Java. Por ejemplo, si instala KCL para Python y escribe su aplicación de consumo completamente en Python, seguirá necesitando instalar Java en su sistema debido a la MultiLangDaemon. Además, MultiLangDaemon tiene algunos ajustes predeterminados que puede que tengas que personalizar para tu caso de uso, por ejemplo, la AWS región a la que se conecta. Para obtener más información sobre el MultiLangDaemon encendido GitHub, consulte el KCL MultiLangDaemon proyecto.

KCLActúa como intermediario entre la lógica de procesamiento de registros y Kinesis Data Streams. La KCL realiza las siguientes tareas:

  • Se conecta al flujo de datos.

  • Enumera las particiones del flujo de datos.

  • Utiliza los arrendamientos para coordinar las asociaciones de fragmentos con sus trabajadores

  • Crea instancias de un procesador de registros para cada fragmento que administra

  • Extrae registros de datos del flujo de datos.

  • Inserta los registros en el procesador de registros correspondiente

  • Genera puntos de comprobación para los registros procesados

  • Equilibra las asociaciones entre procesos de trabajo y particiones (arrendamientos) cuando cambia el recuento de instancias de procesos de trabajo o cuando se vuelve a realizar la partición del flujo de datos (las particiones se dividen o fusionan).

KCLversiones disponibles

Actualmente, puede utilizar cualquiera de las siguientes versiones compatibles de KCL para crear sus aplicaciones de consumo personalizadas:

Puede usar KCL 1.x o KCL 2.x para crear aplicaciones de consumo que utilicen un rendimiento compartido. Para obtener más información, consulte Desarrolle consumidores personalizados con un rendimiento compartido mediante KCL.

Para crear aplicaciones de consumo que utilicen un rendimiento específico (usuarios con distribución mejorada), solo puede utilizar la versión 2.x. KCL Para obtener más información, consulte Desarrolle consumidores personalizados con un rendimiento dedicado (distribución mejorada).

Para obtener información sobre las diferencias entre KCL 1.x y KCL 2.x e instrucciones sobre cómo migrar de KCL 1.x a 2.x, consulte. KCL Migre a los consumidores de la KCL versión 1.x a la KCL 2.x

Conceptos de KCL

  • KCLaplicación para consumidores: una aplicación creada a medida que utiliza KCL y está diseñada para leer y procesar registros de flujos de datos.

  • Instancia de aplicación de KCL consumo: las aplicaciones de consumo suelen estar distribuidas y una o más instancias de aplicación se ejecutan simultáneamente para coordinar los fallos y equilibrar la carga de forma dinámica del procesamiento de registros de datos.

  • Trabajador: clase de alto nivel que utiliza una instancia de aplicación de KCL consumo para empezar a procesar datos.

    importante

    Cada instancia KCL de aplicación de consumo tiene un trabajador.

    El proceso de trabajo inicializa y supervisa diversas tareas, como la sincronización de la información sobre los arrendamientos y las particiones, el seguimiento de las asignaciones de las particiones y el procesamiento de los datos de las particiones. Un trabajador KCL proporciona la información de configuración de la aplicación de consumo, como el nombre del flujo de datos cuyos registros de datos va a procesar la aplicación de KCL consumo y las AWS credenciales necesarias para acceder a este flujo de datos. El empleado también pone en marcha esa instancia de aplicación de KCL consumo específica para entregar los registros de datos del flujo de datos a los procesadores de registros.

  • Arrendamiento: datos que definen el enlace entre un proceso de trabajo y una partición. Las aplicaciones distribuidas para KCL consumidores utilizan los arrendamientos para dividir el procesamiento de registros de datos entre una flota de trabajadores. En un momento dado, cada fragmento de registro de datos está vinculado a un trabajador en particular mediante un contrato de arrendamiento identificado por la leaseKeyvariable.

    De forma predeterminada, un trabajador puede tener uno o más contratos de arrendamiento (sujetos al valor de la variable maxLeasesForTrabajador) al mismo tiempo.

    importante

    Cada proceso de trabajo competirá por tener todos los arrendamientos disponibles para todas las particiones disponibles en un flujo de datos. Sin embargo, solo un proceso de trabajo podrá mantener satisfactoriamente cada arrendamiento a la vez.

    Por ejemplo, si tiene una instancia de aplicación de consumo A con el proceso de trabajo A que procesa un flujo de datos con 4 particiones, el proceso de trabajo A puede retener los arrendamientos de las particiones 1, 2, 3 y 4 al mismo tiempo. Sin embargo, si tiene dos instancias de aplicaciones de consumo: A y B con el proceso de trabajo A y el proceso de trabajo B, y estas instancias procesan un flujo de datos con 4 particiones, el proceso de trabajo A y el proceso de trabajo B no pueden retener el arrendamiento de la partición 1 al mismo tiempo. Un proceso de trabajo retiene el arrendamiento de una partición concreta hasta que esté listo para dejar de procesar los registros de datos de esta partición o hasta que falle. Cuando un proceso de trabajo deja de ser titular del arrendamiento, otro proceso de trabajo lo acepta y lo retiene.

    Para obtener más información (estos son los KCL repositorios de Java), consulte https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/Amazonaws/services/kinesis/leases/impl/lease.java para la versión 1.x y /blob/master/ /src/main/java/software/amazon/kinesis/leases/Lease.java para la versión 2.x. KCL https://github.com/awslabs/ amazon-kinesis-client amazon-kinesis-client KCL

  • Tabla de arrendamiento: una tabla exclusiva de Amazon DynamoDB que se utiliza para realizar un seguimiento de los fragmentos de KDS un flujo de datos que los trabajadores de la aplicación de consumo están arrendando y procesando. KCL La tabla de arrendamientos debe permanecer sincronizada (dentro de un trabajador y entre todos los trabajadores) con la información más reciente sobre los fragmentos del flujo de datos mientras se ejecuta la aplicación para consumidores. KCL Para obtener más información, consulte Utilice una tabla de arrendamientos para realizar un seguimiento de los fragmentos procesados por la aplicación de consumo KCL.

  • Procesador de registros: la lógica que define la forma en que su aplicación de KCL consumo procesa los datos que obtiene de los flujos de datos. En tiempo de ejecución, una instancia de aplicación de KCL consumo crea una instancia de un trabajador, y este trabajador crea una instancia de un procesador de registros por cada fragmento que tenga en arrendamiento.

Utilice una tabla de arrendamientos para realizar un seguimiento de los fragmentos procesados por la aplicación de consumo KCL

¿Qué es una tabla de arrendamientos

Para cada aplicación de Amazon Kinesis Data StreamsKCL, utiliza una tabla de arrendamiento única (almacenada en una tabla de Amazon DynamoDB) para realizar un seguimiento de los fragmentos KDS de una transmisión de datos que están arrendando y procesando los trabajadores de la aplicación de consumo. KCL

importante

KCLutiliza el nombre de la aplicación de consumo para crear el nombre de la tabla de arrendamiento que utiliza esta aplicación de consumo, por lo que el nombre de cada aplicación de consumidor debe ser único.

Puede consultar la tabla con la consola de Amazon DynamoDB mientras se ejecuta la aplicación de consumo.

Si la tabla de arrendamientos de la aplicación de KCL consumo no existe cuando se inicia la aplicación, uno de los trabajadores crea la tabla de arrendamientos para esta aplicación.

importante

Se le realizará el cobro de los costos de su cuenta asociados a la tabla de DynamoDB, además de los costos propios asociados a Kinesis Data Streams.

Cada fila de la tabla de arrendamiento representa una partición que procesan los procesos de trabajo de la aplicación de consumo. Si su aplicación de KCL consumo procesa solo un flujo de datos, la clave hash de la tabla de arrendamiento es el identificador del fragmento. leaseKey Si es asíProcese varios flujos de datos con la misma aplicación KCL 2.x para consumidores de Java, entonces la estructura del leaseKey se ve así:account-id:StreamName:streamCreationTimestamp:ShardId. Por ejemplo, 111111111:multiStreamTest-1:12345:shardId-000000000336.

Además de la ID del fragmento, cada fila incluye también los siguientes datos:

  • checkpoint: el número secuencial de punto de comprobación más reciente del fragmento. Este valor es único en todas las particiones del flujo de datos.

  • checkpointSubSequenceNúmero: cuando se utiliza la función de agregación de la biblioteca de productores de Kinesis, se trata de una extensión del punto de control que rastrea los registros de los usuarios individuales dentro del registro de Kinesis.

  • leaseCounter: Se utiliza para controlar las versiones de arrendamiento, de modo que los trabajadores puedan detectar que otro trabajador ha contratado su arrendamiento.

  • leaseKey: un identificador único para un arrendamiento. Cada arrendamiento es específico de una partición del flujo de datos y solo lo retiene un proceso de trabajo cada vez.

  • leaseOwner: El trabajador titular de este contrato de arrendamiento.

  • ownerSwitchesSincePunto de control: cuántas veces este contrato de arrendamiento ha cambiado de trabajador desde la última vez que se firmó un punto de control.

  • parentShardId: Se utiliza para garantizar que el fragmento principal se procese por completo antes de que comience el procesamiento en los fragmentos secundarios. Así, se garantiza que los registros se procesen en el mismo orden en el que se introdujeron en la secuencia.

  • hashrange: lo utiliza PeriodicShardSyncManager para ejecutar sincronizaciones periódicas para encontrar las particiones que faltan en la tabla de arrendamiento y crear arrendamientos para ellas si es necesario.

    nota

    Estos datos están presentes en la tabla de arrendamiento de todos los fragmentos que comienzan por KCL 1.14 y 2.3. KCL Para obtener más información sobre PeriodicShardSyncManager y la sincronización periódica entre los arrendamientos y las particiones, consulte Cómo se sincroniza una tabla de arrendamiento con las particiones de una transmisión de datos de Kinesis.

  • childshards: lo utiliza LeaseCleanupManager para revisar el estado de procesamiento de la partición secundaria y decidir si la partición principal se puede eliminar de la tabla de arrendamiento.

    nota

    Estos datos están presentes en la tabla de arrendamientos de todos los fragmentos que comienzan con KCL 1.14 y 2.3. KCL

  • shardID: ID de la partición.

    nota

    Estos datos solo están presentes en la tabla de arrendamiento si es Procese varios flujos de datos con la misma aplicación KCL 2.x para consumidores de Java. Esto solo se admite en la versión KCL 2.x para Java, a partir de la versión KCL 2.3 para Java y versiones posteriores.

  • nombre del flujo El identificador del flujo de datos en el siguiente formato: account-id:StreamName:streamCreationTimestamp.

    nota

    Estos datos solo están presentes en la tabla de arrendamiento si se dedica al Procese varios flujos de datos con la misma aplicación KCL 2.x para consumidores de Java. Esto solo se admite en la versión KCL 2.x para Java, a partir de la versión KCL 2.3 para Java y versiones posteriores.

Rendimiento

Si la aplicación de Amazon Kinesis Data Streams recibe excepciones de rendimiento aprovisionado, debe aumentar el rendimiento aprovisionado para la tabla de DynamoDB. KCLCrea la tabla con un rendimiento aprovisionado de 10 lecturas por segundo y 10 escrituras por segundo, pero puede que esto no sea suficiente para su aplicación. Por ejemplo, si su aplicación de Amazon Kinesis Data Streams crea frecuentemente puntos de comprobación u opera en un flujo que se compone de muchas particiones, es posible que necesite más rendimiento.

Para obtener información sobre el rendimiento aprovisionado en DynamoDB, consulte Modo de capacidad de lectura y escritura y Uso de tablas y datos en la Guía para desarrolladores de Amazon DynamoDB.

Cómo se sincroniza una tabla de arrendamiento con las particiones de una transmisión de datos de Kinesis

Los trabajadores de las aplicaciones de KCL consumo utilizan los arrendamientos para procesar fragmentos de un flujo de datos determinado. La información sobre qué proceso de trabajo está arrendando cada partición en un momento dado se almacena en una tabla de arrendamiento. La tabla de arrendamientos debe permanecer sincronizada con la información más reciente sobre los fragmentos del flujo de datos mientras se ejecuta la aplicación para KCL consumidores. KCLsincroniza la tabla de arrendamiento con la información de los fragmentos adquirida en el servicio Kinesis Data Streams durante el arranque de la aplicación de consumo (ya sea cuando la aplicación de consumidor se inicializa o se reinicia) y también cada vez que un fragmento que se está procesando llega a su fin (refragmentación). En otras palabras, los trabajadores o una aplicación de KCL consumo se sincronizan con el flujo de datos que están procesando durante el arranque inicial de la aplicación de consumo y siempre que la aplicación de consumidor encuentra un evento de refragmentación del flujo de datos.

Sincronización en las versiones KCL 1.0 - 1.13 y 2.0 - 2.2 KCL

En las KCL versiones 1.0 a 1.13 y KCL 2.0 a 2.2, durante el arranque de la aplicación de consumo y también durante cada evento de refragmentación del flujo de datos, KCL sincroniza la tabla de arrendamientos con la información de los fragmentos adquirida en el servicio Kinesis Data Streams invocando la o la detección. ListShards DescribeStream APIs En todas las KCL versiones enumeradas anteriormente, cada trabajador de una aplicación de KCL consumo completa los siguientes pasos para realizar el proceso de sincronización de arrendamiento y fragmentación durante el arranque de la aplicación de consumo y en cada evento de refragmentación de transmisión:

  • Obtiene todos las particiones de datos del flujo que se está procesando.

  • Obtiene todos los arrendamientos de particiones de la tabla de arrendamiento.

  • Filtra cada partición abierta que no tenga arrendamientos en la tabla de arrendamiento.

  • Repite todas las particiones abiertas encontradas y para cada partición abierta sin un elemento principal abierto:

    • Recorre el árbol jerárquico siguiendo la ruta de sus antecesores para determinar si la partición es descendiente. Una partición se considera descendiente si se está procesando una partición anterior (en la tabla de arrendamiento se indica el arrendamiento de la partición anterior) o si se debe procesar una partición anterior (por ejemplo, si la posición inicial es TRIM_HORIZON o AT_TIMESTAMP).

    • Si el fragmento abierto en el contexto es descendiente, KCL Checkpoint el fragmento en función de su posición inicial y, si es necesario, crea arrendamientos para sus progenitores

Sincronización en la versión KCL 2.x, a partir de la versión 2.3 y versiones posteriores KCL

A partir de las últimas versiones compatibles de KCL 2.x (KCL2.3) y posteriores, la biblioteca ahora admite los siguientes cambios en el proceso de sincronización. Estos cambios en la sincronización de arrendamientos y fragmentos reducen significativamente el número de API llamadas que realizan las aplicaciones de KCL consumo al servicio Kinesis Data Streams y optimizan la administración de arrendamientos en su aplicación de consumo. KCL

  • Durante el arranque de la aplicación, si la tabla de arrendamientos está vacía, KCL utiliza la opción de filtrado correspondiente (el parámetro ListShard API de solicitud ShardFilter opcional) para recuperar y crear arrendamientos únicamente para una instantánea de los fragmentos abiertos en el momento especificado por el parámetro. ShardFilter El ShardFilter parámetro permite filtrar la respuesta de. ListShards API La única propiedad obligatoria del parámetro ShardFilter es Type. KCLutiliza la propiedad de Type filtro y los siguientes valores válidos para identificar y devolver una instantánea de los fragmentos abiertos que podrían requerir nuevas concesiones:

    • AT_TRIM_HORIZON: la respuesta incluye todas las particiones que estaban abiertas en TRIM_HORIZON.

    • AT_LATEST: la respuesta incluye solo las particiones actualmente abiertas del flujo de datos.

    • AT_TIMESTAMP: la respuesta incluye todas las particiones cuya marca de tiempo de inicio es anterior o igual a la marca de tiempo dada y cuya marca de tiempo de finalización es posterior o igual que la marca de tiempo dada, o que aún están abiertas.

    ShardFilter se utiliza al crear arrendamientos para una tabla de arrendamiento vacía con el fin de inicializar los arrendamientos de una instantánea de las particiones especificadas en RetrievalConfig#initialPositionInStreamExtended.

    Para obtener más información acerca de ShardFilter, consulte https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html.

  • En lugar de que todos los procesos de trabajo realicen la sincronización entre arrendamientos y particiones para mantener la tabla de arrendamiento actualizada con las particiones más recientes del flujo de datos, un único líder del proceso de trabajo elegido realiza la sincronización entre arrendamientos y particiones.

  • KCL2.3 usa el parámetro de ChildShards retorno de GetRecords y el SubscribeToShard APIs para realizar la sincronización entre arrendamiento y fragmento, como ocurre en SHARD_END el caso de los fragmentos cerrados, lo que permite a un KCL trabajador crear arrendamientos solo para los fragmentos secundarios del fragmento que ha terminado de procesar. Para compartirlo en todas las aplicaciones de consumo, esta optimización de la sincronización entre el arrendamiento y el fragmento utiliza el parámetro de. ChildShards GetRecords API Para las aplicaciones de consumo dedicadas al rendimiento (distribución mejorada), esta optimización de la sincronización entre arrendamiento y fragmento utiliza el parámetro de. ChildShards SubscribeToShard API Para obtener más información, consulte, y. GetRecordsSubscribeToShardsChildShard

  • Con los cambios anteriores, el comportamiento de KCL está pasando del modelo en el que todos los trabajadores aprenden sobre todos los fragmentos existentes al modelo en el que los trabajadores aprenden solo sobre los fragmentos secundarios de los fragmentos que son propiedad de cada trabajador. Por lo tanto, además de la sincronización que se produce durante el arranque de las aplicaciones de consumo y los eventos de refragmentación, KCL ahora también realiza escaneos periódicos adicionales de los fragmentos o arrendamientos para identificar cualquier posible vacío en la tabla de arrendamientos (en otras palabras, para obtener información sobre todos los fragmentos nuevos) a fin de garantizar que se procese todo el rango de hash del flujo de datos y crear arrendamientos para ellos si es necesario. PeriodicShardSyncManageres el componente responsable de realizar escaneos periódicos de los fragmentos o arrendamientos.

    Para obtener más información sobre la versión KCL 2.3, consulte https://github.com/awslabs/amazon-kinesis-client/blob/master/ PeriodicShardSyncManager amazon-kinesis-client /src/main/java/software/amazon/kinesis/leases/ .java #L201 -L213. LeaseManagementConfig

    En la KCL versión 2.3, hay nuevas opciones de configuración disponibles para configurarlas en: PeriodicShardSyncManager LeaseManagementConfig

    Nombre Valor predeterminado Descripción
    leasesRecoveryAuditorExecutionFrequencyMillis

    120 000 (2 minutos)

    Frecuencia (en milisegundos) del trabajo del auditor para buscar arrendamientos parciales en la tabla de arrendamiento. Si el auditor detecta lagunas en los arrendamientos de un flujo, activará la sincronización de las particiones basándose en leasesRecoveryAuditorInconsistencyConfidenceThreshold.

    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    El umbral de confianza para el trabajo de auditor periódico permite determinar si los arrendamientos de un flujo de datos de la tabla de arrendamiento son incoherentes. Si el auditor encuentra varias veces el mismo conjunto de incoherencias consecutivas en un flujo de datos, se activará una sincronización de particiones.

    Ahora también se emiten nuevas CloudWatch métricas para monitorear el estado delPeriodicShardSyncManager. Para obtener más información, consulte PeriodicShardSyncManager.

  • Incluye una optimización de HierarchicalShardSyncer para crear solo arrendamientos para una capa de particiones.

Sincronización en la KCL versión 1.x, a partir de la versión KCL 1.14 y versiones posteriores

A partir de las últimas versiones compatibles de KCL 1.x (KCL1.14) y posteriores, la biblioteca ahora admite los siguientes cambios en el proceso de sincronización. Estos cambios en la sincronización de arrendamientos y fragmentos reducen significativamente el número de API llamadas que realizan las aplicaciones de KCL consumo al servicio Kinesis Data Streams y optimizan la administración de arrendamientos en su aplicación de consumo. KCL

  • Durante el arranque de la aplicación, si la tabla de arrendamientos está vacía, KCL utiliza la opción de filtrado correspondiente (el parámetro ListShard API de solicitud ShardFilter opcional) para recuperar y crear arrendamientos únicamente para una instantánea de los fragmentos abiertos en el momento especificado por el parámetro. ShardFilter El ShardFilter parámetro permite filtrar la respuesta de. ListShards API La única propiedad obligatoria del parámetro ShardFilter es Type. KCLutiliza la propiedad de Type filtro y los siguientes valores válidos para identificar y devolver una instantánea de los fragmentos abiertos que podrían requerir nuevas concesiones:

    • AT_TRIM_HORIZON: la respuesta incluye todas las particiones que estaban abiertas en TRIM_HORIZON.

    • AT_LATEST: la respuesta incluye solo las particiones actualmente abiertas del flujo de datos.

    • AT_TIMESTAMP: la respuesta incluye todas las particiones cuya marca de tiempo de inicio es anterior o igual a la marca de tiempo dada y cuya marca de tiempo de finalización es posterior o igual que la marca de tiempo dada, o que aún están abiertas.

    ShardFilter se utiliza al crear arrendamientos para una tabla de arrendamiento vacía con el fin de inicializar los arrendamientos de una instantánea de las particiones especificadas en KinesisClientLibConfiguration#initialPositionInStreamExtended.

    Para obtener más información acerca de ShardFilter, consulte https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html.

  • En lugar de que todos los procesos de trabajo realicen la sincronización entre arrendamientos y particiones para mantener la tabla de arrendamiento actualizada con las particiones más recientes del flujo de datos, un único líder del proceso de trabajo elegido realiza la sincronización entre arrendamientos y particiones.

  • KCL1.14 usa el parámetro de ChildShards retorno de GetRecords y el SubscribeToShard APIs para realizar la sincronización entre arrendamiento y fragmento, como ocurre en el caso de los fragmentos cerrados, lo que permite a un KCL trabajador crear arrendamientos únicamente SHARD_END para los fragmentos secundarios del fragmento que ha terminado de procesar. Para obtener más información, consulte y. GetRecordsChildShard

  • Con los cambios anteriores, el comportamiento de KCL está pasando del modelo en el que todos los trabajadores aprenden sobre todos los fragmentos existentes al modelo en el que los trabajadores aprenden solo sobre los fragmentos secundarios de los fragmentos que son propiedad de cada trabajador. Por lo tanto, además de la sincronización que se produce durante el arranque de las aplicaciones de consumo y los eventos de refragmentación, KCL ahora también realiza escaneos periódicos adicionales de los fragmentos o arrendamientos para identificar cualquier posible vacío en la tabla de arrendamientos (en otras palabras, para obtener información sobre todos los fragmentos nuevos) a fin de garantizar que se procese todo el rango de hash del flujo de datos y crear arrendamientos para ellos si es necesario. PeriodicShardSyncManageres el componente responsable de realizar escaneos periódicos de los fragmentos o arrendamientos.

    Cuando KinesisClientLibConfiguration#shardSyncStrategyType está establecido en ShardSyncStrategyType.SHARD_END, PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold se utiliza para determinar el umbral del número de exámenes consecutivos que contienen lagunas en la tabla de arrendamiento, tras lo cual se exige la sincronización de las particiones. Cuando KinesisClientLibConfiguration#shardSyncStrategyType se establece en ShardSyncStrategyType.PERIODIC, leasesRecoveryAuditorInconsistencyConfidenceThreshold se ignora.

    Para obtener más información sobre la versión KCL 1.14, consulta PeriodicShardSyncManager https://github.com/awslabs/ amazon-kinesis-client /blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ .java #L987 -L999. KinesisClientLibConfiguration

    En la KCL versión 1.14, hay una nueva opción de configuración disponible para configurar en: PeriodicShardSyncManager LeaseManagementConfig

    Nombre Valor predeterminado Descripción
    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    El umbral de confianza para el trabajo de auditor periódico permite determinar si los arrendamientos de un flujo de datos de la tabla de arrendamiento son incoherentes. Si el auditor encuentra varias veces el mismo conjunto de incoherencias consecutivas en un flujo de datos, se activará una sincronización de particiones.

    Ahora también se emiten nuevas CloudWatch métricas para monitorear el estado delPeriodicShardSyncManager. Para obtener más información, consulte PeriodicShardSyncManager.

  • KCLLa versión 1.14 ahora también admite la reducción de los arrendamientos diferidos. LeaseCleanupManager elimina los arrendamientos de forma asíncrona al llegar a SHARD_END, cuando una partición ha caducado pasado el periodo de retención del flujo de datos o se ha cerrado como resultado de una operación de repartición.

    Están disponibles nuevas opciones de configuración para LeaseCleanupManager.

    Nombre Valor predeterminado Descripción
    leaseCleanupIntervalMillis

    1 minuto

    Intervalo en el que se ejecuta el subproceso de limpieza del arrendamiento.

    completedLeaseCleanupIntervalMillis 5 minutos

    Intervalo para comprobar si un arrendamiento se ha completado o no.

    garbageLeaseCleanupIntervalMillis 30 minutos

    Intervalo en el que se comprueba si un arrendamiento es un elemento no utilizado (es decir, si ha superado el periodo de retención del flujo de datos) o no.

  • Incluye una optimización de KinesisShardSyncer para crear solo arrendamientos para una capa de particiones.

Procese varios flujos de datos con la misma aplicación KCL 2.x para consumidores de Java

En esta sección, se describen los siguientes cambios KCL introducidos en la versión 2.x para Java, que permiten crear aplicaciones de KCL consumo que pueden procesar más de un flujo de datos al mismo tiempo.

importante

El procesamiento multiflujo solo se admite en la versión KCL 2.x para Java, a partir de la versión KCL 2.3 para Java y versiones posteriores.

El procesamiento multiflujo es NOT compatible con cualquier otro lenguaje en el que se pueda implementar la versión KCL 2.x.

El procesamiento multiflujo es NOT compatible con todas las versiones de la versión 1.x. KCL

  • MultistreamTracker interfaz

    Para crear una aplicación de consumo que pueda procesar múltiples transmisiones al mismo tiempo, debe implementar una nueva interfaz llamada MultistreamTracker. Esta interfaz incluye el streamConfigList método que devuelve la lista de flujos de datos y sus configuraciones para que los procese la aplicación de KCL consumo. Tenga en cuenta que los flujos de datos que se procesan se pueden cambiar durante el tiempo de ejecución de la aplicación de consumo. streamConfigListlo llama periódicamente KCL para obtener información sobre los cambios en los flujos de datos que se van a procesar.

    El streamConfigList método rellena la StreamConfiglista.

    package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }

    Tenga en cuenta que los campos StreamIdentifier y InitialPositionInStreamExtended son obligatorios, aunque consumerArn es opcional. Debe proporcionarlo consumerArn únicamente si utiliza la versión KCL 2.x para implementar una aplicación de consumo mejorada y desplegable.

    Para obtener más informaciónStreamIdentifier, consulte https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/ /src/main/java/software/amazon/kinesis/common/ .java #L129 amazon-kinesis-client. StreamIdentifier Para crear una, te recomendamos que crees una instancia multistream a partir de y la que esté disponible en la versión 2.5.0 y versiones posteriores. StreamIdentifier streamArn streamCreationEpoch En las KCL versiones 2.3 y 2.4, que no son compatiblesstreamArm, crea una instancia multisecuencia con este formato. account-id:StreamName:streamCreationTimestamp Este formato quedará obsoleto y dejará de ser compatible a partir de la próxima versión principal.

    MultistreamTracker también incluye una estrategia para eliminar los arrendamientos de flujos antiguos en la tabla de arrendamiento (formerStreamsLeasesDeletionStrategy). Tenga en cuenta que la estrategia CANNOT cambiará durante el tiempo de ejecución de la aplicación para consumidores. Para obtener más información, consulte https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/ amazon-kinesis-client /src/main/java/software/amazon/kinesis/processor/ .java FormerStreamsLeasesDeletionStrategy

  • ConfigsBuilderes una clase que abarca toda la aplicación y que se puede utilizar para especificar todos los ajustes de configuración de la versión 2.x que se utilizarán al crear una aplicación de consumo. KCL KCL ConfigsBuilderla clase ahora es compatible con la interfaz. MultistreamTracker Puede inicializar ConfigsBuilder cualquiera de las dos con el nombre del flujo de datos desde el que desee consumir registros:

    /** * Constructor to initialize ConfigsBuilder with StreamName * @param streamName * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }

    O bien, puede inicializarlo ConfigsBuilder MultiStreamTracker si desea implementar una aplicación para KCL consumidores que procese varios flujos al mismo tiempo.

    * Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
  • Con el soporte multiflujo implementado para su aplicación de KCL consumo, cada fila de la tabla de arrendamiento de la aplicación ahora contiene el ID del fragmento y el nombre de la secuencia de los múltiples flujos de datos que procesa esta aplicación.

  • Cuando se implementa el soporte multiflujo para su aplicación de KCL consumo, leaseKey adopta la siguiente estructura:. account-id:StreamName:streamCreationTimestamp:ShardId Por ejemplo, 111111111:multiStreamTest-1:12345:shardId-000000000336.

    importante

    Si su aplicación de KCL consumo actual está configurada para procesar solo un flujo de datos, leaseKey (que es la clave hash de la tabla de arrendamiento) es el ID del fragmento. Si reconfigura esta aplicación de KCL consumo existente para procesar varios flujos de datos, no tendrá éxito en su tabla de arrendamientos, ya que, al ser compatible con varios flujos, la leaseKey estructura debe ser la siguiente: account-id:StreamName:StreamCreationTimestamp:ShardId

Utilícela KCL con el registro de esquemas AWS Glue

Puede integrar sus transmisiones de datos de Kinesis con el registro de AWS Glue esquemas. El registro de AWS Glue esquemas le permite descubrir, controlar y desarrollar los esquemas de forma centralizada y, al mismo tiempo, garantizar que los datos generados se validen continuamente mediante un esquema registrado. Un esquema define la estructura y el formato de un registro de datos. Un esquema es una especificación versionada para publicación, consumo o almacenamiento de confianza de datos. El registro AWS Glue de esquemas le permite mejorar la end-to-end calidad y el gobierno de los datos en sus aplicaciones de streaming. Para obtener más información, consulte AWS Glue Schema Registry. Una de las formas de configurar esta integración es a través KCL de Java.

importante

Actualmente, la integración de Kinesis Data Streams AWS Glue y Schema Registry solo se admite para las transmisiones de datos de Kinesis que KCL utilizan consumidores 2.3 implementados en Java. No se proporciona soporte multilingüe. KCLNo se admiten los consumidores de la versión 1.0. KCLNo se admiten los consumidores 2.x anteriores a la KCL 2.3.

Para obtener instrucciones detalladas sobre cómo configurar la integración de Kinesis Data Streams con Schema Registry mediante KCL el, consulte la sección «Interacción con los datos mediante KPL las bibliotecasKCL/» en Caso de uso: integración de Amazon Kinesis Data Streams con el registro de esquemas de AWS Glue.