Uso del filtrado de eventos con una fuente de eventos de Kinesis - AWS Lambda

Uso del filtrado de eventos con una fuente de eventos de Kinesis

Puede utilizar el filtrado de eventos para controlar qué registros de un flujo o una cola envía Lambda a su función. Para obtener información general sobre cómo funciona el filtrado de eventos, consulte Controle qué eventos envía Lambda a la función.

Esta sección se centra en el filtrado de eventos para las fuentes de eventos de Kinesis.

Conceptos básicos del filtrado de eventos de Kinesis

Supongamos que un productor incluye datos con formato JSON en su flujo de datos de Kinesis. Un registro de ejemplo tendría el siguiente aspecto, con los datos JSON convertidos en una cadena codificada en Base64 en el campo data.

{ "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "eyJSZWNvcmROdW1iZXIiOiAiMDAwMSIsICJUaW1lU3RhbXAiOiAieXl5eS1tbS1kZFRoaDptbTpzcyIsICJSZXF1ZXN0Q29kZSI6ICJBQUFBIn0=", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }

Siempre que los datos que el productor incluya en el flujo sean JSON válidos, puede usar el filtrado de eventos para filtrar registros mediante la clave data. Supongamos que un productor incluye registros en su flujo de Kinesis en el siguiente formato JSON.

{ "record": 12345, "order": { "type": "buy", "stock": "ANYCO", "quantity": 1000 } }

Para filtrar solo los registros en los que el tipo de pedido sea “comprar”, el objeto FilterCriteria sería el siguiente.

{ "Filters": [ { "Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }" } ] }

Para mayor claridad, este es el valor del Pattern del filtro ampliado en JSON no cifrado.

{ "data": { "order": { "type": [ "buy" ] } } }

Puede agregar el filtro mediante la consola, la AWS CLI o una plantilla de AWS SAM.

Console

Para agregar este filtro mediante la consola, siga las instrucciones que se indican en Adjuntar criterios de filtro a una asignación de origen de eventos (consola) e ingrese la siguiente cadena para los Criterios de filtro.

{ "data" : { "order" : { "type" : [ "buy" ] } } }
AWS CLI

Para crear una nueva asignación de orígenes de eventos con estos criterios de filtro mediante la AWS Command Line Interface (AWS CLI), ejecute el siguiente comando.

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/my-stream \ --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'

Para agregar estos criterios de filtro a una asignación de orígenes de eventos existente, ejecute el siguiente comando.

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
AWS SAM

Para agregar este filtro mediante AWS SAM, agregue el siguiente fragmento a la plantilla YAML de su origen de eventos.

FilterCriteria: Filters: - Pattern: '{ "data" : { "order" : { "type" : [ "buy" ] } } }'

Para filtrar correctamente los eventos de orígenes de Kinesis, tanto el campo de datos como los criterios de filtro del campo de datos deben estar en un formato JSON válido. Si el formato JSON de alguno de los campos no es válido, Lambda elimina el mensaje o genera una excepción. En la siguiente tabla se resume el comportamiento específico:

Formato de los datos entrantes Formato del patrón de filtro para las propiedades de datos Acción resultante

JSON válido

JSON válido

Lambda filtra en función de los criterios de filtro.

JSON válido

Sin patrón de filtro para las propiedades de datos

Lambda filtra (solo en las demás propiedades de metadatos) en función de los criterios de filtro.

JSON válido

No JSON

Lambda genera una excepción al crear o actualizar la asignación de origen de eventos. El formato JSON del patrón de filtro de las propiedades de datos debe ser válido.

No JSON

JSON válido

Lambda elimina el registro.

No JSON

Sin patrón de filtro para las propiedades de datos

Lambda filtra (solo en las demás propiedades de metadatos) en función de los criterios de filtro.

No JSON

No JSON

Lambda genera una excepción al crear o actualizar la asignación de origen de eventos. El formato JSON del patrón de filtro de las propiedades de datos debe ser válido.

Filtrado de registros agregados de Kinesis

Con Kinesis, puede agregar varios registros en un solo registro de Kinesis Data Streams para aumentar el rendimiento de sus datos. Lambda solo puede aplicar criterios de filtro a los registros agregados cuando se utiliza la distribución mejorada de Kinesis. El filtrado de registros agregados con Kinesis estándar no es compatible. Al utilizar la distribución mejorada, usted configura un consumidor de rendimiento dedicado de Kinesis para que actúe como desencadenador de la función de Lambda. A continuación, Lambda filtra los registros agregados y solo pasa los registros que cumplen los criterios de filtro.

Para obtener más información sobre la agregación de registros de Kinesis, consulte la sección Agregación de la página de conceptos clave de la Kinesis Producer Library (KPL). Para obtener más información sobre el uso de Lambda con la expansión mejorada de Kinesis, consulte Aumento del rendimiento del procesamiento de transmisiones en tiempo real con la distribución mejorada de Amazon Kinesis Data Streams y AWS Lambda en el blog de informática de AWS.