Acquisizione di batch scartati per un'origine eventi Amazon MSK - AWS Lambda

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Acquisizione di batch scartati per un'origine eventi Amazon MSK

Per mantenere i record delle chiamate non riuscite allo strumento di mappatura dell'origine degli eventi, aggiungi una destinazione allo strumento di mappatura dell'origine degli eventi della funzione. Ogni record inviato alla destinazione è un documento JSON contenente i metadati relativi alla chiamata non riuscita. Per le destinazioni Amazon S3, Lambda invia insieme ai metadati anche l'intero record di invocazione. Puoi configurare come destinazione qualsiasi argomento Amazon SNS, coda Amazon SQS o bucket S3.

Con le destinazioni Amazon S3, puoi utilizzare la funzionalità Notifiche eventi Amazon S3 per ricevere le notifiche relative a quando gli oggetti vengono caricati nel bucket S3 di destinazione. Puoi anche configurare Notifiche eventi S3 per richiamare un'altra funzione Lambda per eseguire l'elaborazione automatica su batch non riusciti.

Il tuo ruolo di esecuzione deve avere le autorizzazioni per la destinazione:

È necessario implementare un endpoint VPC per il servizio di destinazione in errore all'interno del VPC del cluster Amazon MSK.

Inoltre, se hai configurato una chiave KMS sulla destinazione, Lambda necessita delle seguenti autorizzazioni, a seconda del tipo di destinazione:

  • Se hai abilitato la crittografia con la tua chiave KMS per una destinazione S3, devi specificare kms:GenerateDataKey. Se la chiave KMS e il bucket S3 di destinazione si trovano in un account diverso dalla funzione Lambda e dal ruolo di esecuzione, configura la chiave KMS in modo che consideri attendibile il ruolo di esecuzione per consentire kms:GenerateDataKey.

  • Se hai abilitato la crittografia con la tua chiave KMS per la destinazione SQS, devi specificare kms:Decrypt e kms:GenerateDataKey. Se la chiave KMS e la coda SQS di destinazione si trovano in un account diverso dalla funzione Lambda e dal ruolo di esecuzione, configura la chiave KMS in modo che consideri attendibile il ruolo di esecuzione per consentire kms:Decrypt, kms:GenerateDataKey, kms:DescribeKey e kms:ReEncrypt.

  • Se hai abilitato la crittografia con la tua chiave KMS per la destinazione SNS, devi specificare kms:Decrypt e kms:GenerateDataKey. Se la chiave KMS e l'argomento SNS di destinazione si trovano in un account diverso dalla funzione Lambda e dal ruolo di esecuzione, configura la chiave KMS in modo che consideri attendibile il ruolo di esecuzione per consentire kms:Decrypt, kms:GenerateDataKey, kms:DescribeKey e kms:ReEncrypt.

Configurazione delle destinazioni in errore per uno strumento di mappatura dell'origine degli eventi Amazon MSK

Per configurare una destinazione in caso di errore tramite la console, completa i seguenti passaggi:

  1. Aprire la pagina Functions (Funzioni) della console Lambda.

  2. Scegliere una funzione.

  3. In Function overview (Panoramica delle funzioni), scegliere Add destination (Aggiungi destinazione).

  4. Per Origine, scegli Chiamata allo strumento di mappatura dell'origine degli eventi.

  5. Per Strumento di mappatura dell'origine degli eventi, scegli un'origine dell'evento configurata per questa funzione.

  6. Per Condizione, seleziona In caso di errore. Per le chiamate allo strumento di mappatura dell'origine degli eventi, questa è l'unica condizione accettata.

  7. Per Tipo di destinazione, scegli il tipo di destinazione a cui Lambda deve inviare i record di chiamata.

  8. Per Destination (Destinazione), scegliere una risorsa.

  9. Seleziona Salva.

È inoltre possibile configurare una destinazione in errore utilizzando la AWS CLI. Ad esempio, il seguente comando create-event-source-mapping aggiunge uno strumento di mappatura dell'origine degli eventi con una destinazione SQS in errore a MyFunction:

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

Il seguente comando update-event-source-mapping aggiunge una destinazione S3 in errore all'origine eventi Kafka associata all'input uuid:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'

Per rimuovere una destinazione, fornisci una stringa vuota come argomento del parametro destination-config:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

Best practice di sicurezza per destinazioni Amazon S3

L'eliminazione di un bucket S3 configurato come destinazione senza rimuovere la destinazione dalla configurazione della funzione può creare un rischio per la sicurezza. Se un altro utente conosce il nome del bucket di destinazione, può ricreare il bucket nel proprio Account AWS. I record delle invocazioni non riuscite verranno inviati al relativo bucket, esponendo potenzialmente i dati della tua funzione.

avvertimento

Per garantire che i record di invocazione della tua funzione non possano essere inviati a un bucket S3 in un altro Account AWS, aggiungi una condizione al ruolo di esecuzione della funzione che limiti le autorizzazioni s3:PutObject ai bucket del tuo account.

Di seguito viene illustrato un esempio di policy IAM che limita le autorizzazioni s3:PutObject della funzione ai bucket presenti nell'account. Questa policy fornisce inoltre a Lambda l'autorizzazione s3:ListBucket necessaria per utilizzare un bucket S3 come destinazione.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3BucketResourceAccountWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:ListBucket" ], "Resource": "arn:aws:s3:::*/*", "Condition": { "StringEquals": { "s3:ResourceAccount": "111122223333" } } } ] }

Per aggiungere una policy di autorizzazioni al ruolo di esecuzione della funzione utilizzando la AWS Management Console o la AWS CLI, consulta le istruzioni nelle seguenti procedure:

Console
Per aggiungere una policy di autorizzazioni al ruolo di esecuzione di una funzione (console)
  1. Aprire la pagina Functions (Funzioni) della console Lambda.

  2. Selezionare la funzione Lambda di cui si desidera modificare il ruolo di esecuzione.

  3. Nella scheda Configurazione scegli Autorizzazioni.

  4. Nella scheda Ruolo di esecuzione, seleziona il nome del ruolo della funzione per aprire la pagina della console IAM del ruolo.

  5. Aggiungi una policy di autorizzazioni di base al ruolo completando le seguenti operazioni:

    1. Nel riquadro Policy di autorizzazioni, scegli Aggiungi autorizzazioni, poi Crea policy in linea.

    2. Nell'editor delle policy, seleziona JSON.

    3. Incolla la policy che desideri aggiungere nell'editor (sostituendo il codice JSON esistente), quindi scegli Avanti.

    4. In Dettagli della policy, specifica un nome per la policy.

    5. Scegli Crea policy.

AWS CLI
Per aggiungere una policy di autorizzazioni al ruolo di esecuzione di una funzione (CLI)
  1. Crea un documento di policy JSON con le autorizzazioni richieste e salvalo in una directory locale.

  2. Utilizza il comando IAM di put-role-policy CLI per aggiungere le autorizzazioni per il ruolo di esecuzione di una funzione. Esegui il comando seguente dalla directory in cui hai salvato il documento di policy JSON e sostituisci il nome del ruolo, il nome della policy e il documento di policy con i tuoi valori.

    aws iam put-role-policy \ --role-name my_lambda_role \ --policy-name LambdaS3DestinationPolicy \ --policy-document file://my_policy.json

Record di invocazione SNS e SQS di esempio

L'esempio seguente mostra il contenuto che Lambda invia a un argomento SNS o una coda SQS di destinazione per una chiamata non riuscita all'origine dell'evento Kafka. Ciascuna delle chiavi in recordsInfo contiene sia l'argomento sia lo shard di Kafka, separati da un trattino. Ad esempio, per la chiave "Topic-0", Topic è l'argomento di Kafka e 0 è lo shard. Per ogni argomento e partizione, è possibile utilizzare i dati di offset e timestamp per individuare i record di chiamata originali.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }

Record di invocazione di esempio di destinazione S3

Se le destinazioni sono S3, Lambda invia alla destinazione l'intero record di chiamata insieme ai metadati. L'esempio seguente mostra ciò che Lambda invia a un bucket S3 di destinazione per una chiamata non riuscita all'origine dell'evento Kafka. Oltre a tutti i campi dell'esempio precedente per le destinazioni SQS e SNS, il campo payload contiene il record di chiamata originale come stringa JSON con escape.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
Suggerimento

Ti consigliamo di abilitare il controllo delle versioni S3 sul bucket di destinazione.