Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Procesamiento multiflujo con KCL
En esta sección se describen los cambios necesarios en el KCL que permiten crear aplicaciones de consumo de KCL que puedan procesar más de un flujo de datos al mismo tiempo.
importante
-
El procesamiento multiflujo solo se admite en KCL 2.3 o versiones posteriores.
-
Los consumidores de KCL que estén escritos en lenguajes distintos de Java y que funcionen con ellos no admiten el procesamiento multiflujo.
multilangdaemon
-
El procesamiento multiflujo no es compatible con ninguna versión de KCL 1.x.
-
MultistreamTracker interfaz
-
Para crear una aplicación de consumo que pueda procesar múltiples transmisiones al mismo tiempo, debe implementar una nueva interfaz llamada MultistreamTracker
. Esta interfaz incluye el método streamConfigList
que devuelve la lista de flujos de datos y sus configuraciones para que los procese la aplicación de consumo de KCL. Tenga en cuenta que los flujos de datos que se procesan se pueden cambiar durante el tiempo de ejecución de la aplicación para consumidores.streamConfigList
KCL lo consulta periódicamente para obtener información sobre los cambios en los flujos de datos que va a procesar. -
streamConfigList
Rellena la lista. StreamConfig
package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }
-
Los campos
StreamIdentifier
yInitialPositionInStreamExtended
son obligatorios, aunqueconsumerArn
son opcionales. Debe proporcionarlosconsumerArn
únicamente si utiliza KCL para implementar una aplicación de usuario desplegable mejorada. -
Para obtener más información al respecto
StreamIdentifier
, consulte https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java #L129.Para crear una StreamIdentifier
, le recomendamos que cree una instancia multiflujo a partir de lastreamArn
y lastreamCreationEpoch
que está disponible en KCL 2.5.0 o versiones posteriores. En las versiones KCL 2.3 y 2.4, que no son compatibles constreamArm
, cree una instancia multiflujo con el formatoaccount-id:StreamName:streamCreationTimestamp
. Este formato quedará obsoleto y dejará de ser compatible a partir de la próxima versión principal. -
MultistreamTracker también incluye una estrategia para eliminar las concesiones de transmisiones antiguas de la tabla de arrendamientos (). formerStreamsLeases DeletionStrategy Tenga en cuenta que la estrategia NO SE PUEDE cambiar durante el tiempo de ejecución de la aplicación de consumo. Para obtener más información, consulta https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b//amazon-kinesis-client
.java. src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy
-
O bien, puede inicializarlo ConfigsBuilder MultiStreamTracker
si desea implementar una aplicación de consumo de KCL que procese varios flujos al mismo tiempo.
* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
-
Gracias a la compatibilidad con múltiples flujos de datos implementada en su aplicación de consumo de KCL, cada fila de la tabla de arrendamiento de la aplicación ahora contiene el identificador del fragmento y el nombre del flujo de los múltiples flujos de datos que procesa esta aplicación.
-
Cuando se implementa el soporte de transmisión múltiple para su aplicación de consumo de KCL, LeaseKey adopta la siguiente estructura:.
account-id:StreamName:streamCreationTimestamp:ShardId
Por ejemplo,111111111:multiStreamTest-1:12345:shardId-000000000336
.
importante
Si su aplicación de consumo de KCL existente está configurada para procesar solo un flujo de datos, leaseKey
(que es la clave de partición de la tabla de arrendamiento) es el ID del fragmento. Si reconfigura una aplicación de consumo de KCL existente para procesar varios flujos de datos, la tabla de arrendamientos se verá afectada, ya que la leaseKey
estructura debe ser la siguiente: account-id:StreamName:StreamCreationTimestamp:ShardId
para admitir múltiples flujos.