Uso di Lambda con Amazon MSK - AWS Lambda

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Uso di Lambda con Amazon MSK

Amazon Managed Streaming for Apache Kafka (Amazon MSK) è un servizio completamente gestito che consente di creare ed eseguire applicazioni che utilizzano Apache Kafka per elaborare i dati in streaming. Amazon MSK semplifica la configurazione, il dimensionamento e la gestione dei cluster che eseguono Kafka. Amazon MSK semplifica inoltre la configurazione dell'applicazione per più zone di disponibilità e per la sicurezza con AWS Identity and Access Management (IAM). Amazon MSK supporta più versioni open-source di Kafka.

Amazon MSK come origine eventi funziona in modo simile all'utilizzo di Amazon Simple Queue Service (Amazon SQS) o Amazon Kinesis. Lambda interroga internamente i nuovi messaggi dell'origine eventi, quindi richiama in modo sincrono la funzione Lambda di destinazione. Lambda legge i messaggi in batch e li fornisce alla funzione come payload di evento. La dimensione massima del batch è configurabile (l'impostazione predefinita è 100 messaggi). Per ulteriori informazioni, consulta Comportamento di batching.

Nota

Anche se le funzioni Lambda generalmente prevedono un timeout massimo di 15 minuti, gli strumenti di mappatura dell'origine degli eventi per Amazon MSK, Apache Kafka autogestito, Amazon DocumentDB e Amazon MQ per ActiveMQ e RabbitMQ supportano solo funzioni con timeout massimi di 14 minuti. Questa limitazione garantisce che lo strumento di mappatura dell'origine degli eventi possa gestire correttamente errori di funzioni e nuovi tentativi.

Lambda legge i messaggi in sequenza per ogni partizione. Un singolo payload Lambda può contenere messaggi provenienti da più partizioni. Dopo che Lambda ha elaborato ogni batch, esegue il commit degli offset dei messaggi in quel batch. Se la funzione restituisce un errore per uno qualsiasi dei messaggi di un batch, Lambda ritenta l'intero batch di messaggi fino a quando l'elaborazione non riesce o i messaggi scadono.

avvertimento

Le mappature delle sorgenti degli eventi Lambda elaborano ogni evento almeno una volta e può verificarsi un'elaborazione duplicata dei record. Per evitare potenziali problemi legati agli eventi duplicati, ti consigliamo vivamente di rendere idempotente il codice della funzione. Per ulteriori informazioni, consulta Come posso rendere idempotente la mia funzione Lambda nel Knowledge Center. AWS

Per un esempio di come configurare Amazon MSK come origine di eventi, consulta Using Amazon MSK come origine di eventi per AWS Lambda sul AWS Compute Blog. Consulta Integrazione di Amazon MSK Lambda nei laboratori Amazon MSK per un tutorial completo.

Esempio di evento

Lambda invia il batch di messaggi nel parametro evento quando richiama la funzione. Il payload evento contiene un array di messaggi. Ogni elemento dell'array contiene i dettagli dell'argomento e dell'identificatore della partizione Amazon MSK, insieme a una data/ora e a un messaggio con codifica base64.

{ "eventSource":"aws:kafka", "eventSourceArn":"arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

Autenticazione cluster MSK

Lambda ha bisogno dell'autorizzazione per accedere al cluster Amazon MSK, recuperare registri ed eseguire altri processi. Amazon MSK supporta diverse opzioni per il controllo dell'accesso del client al cluster MSK.

Accesso non autenticato

Se nessun client accede al cluster tramite Internet, è possibile utilizzare l'accesso non autenticato.

Autenticazione SASL/SCRAM

Amazon MSK supporta l'autenticazione SASL/SCRAM (Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism) con crittografia Transport Layer Security (TLS). Per consentire a Lambda di connettersi al cluster, è necessario archiviare le credenziali di autenticazione (nome utente e password) in un luogo segreto. AWS Secrets Manager

Per ulteriori informazioni sull'uso di Secrets Manager, consulta Autenticazione nome utente e password con AWS Secrets Manager nella Guida per gli sviluppatori di Amazon Managed Streaming for Apache Kafka.

Amazon MSK non supporta l'autenticazione SASL/PLAIN.

Autenticazione basata su ruoli IAM

È possibile utilizzare IAM per autenticare l'identità dei client che si connettono al cluster MSK. Se l'autenticazione IAM è attiva sul tuo cluster MSK e non fornisci un segreto per l'autenticazione, Lambda utilizza automaticamente l'autenticazione IAM. Per creare e implementare policy basate su utenti o ruoli, utilizza l'API o la console IAM. Per ulteriori informazioni, consulta il controllo accessi IAM nella Guida per sviluppatori Amazon Managed Streaming for Apache Kafka.

Per consentire a Lambda di connettersi al cluster MSK, leggere i registri ed eseguire altre operazioni richieste, aggiungere le seguenti autorizzazioni al ruolo di esecuzione della funzione.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-uuid", "arn:aws:kafka:region:account-id:topic/cluster-name/cluster-uuid/topic-name", "arn:aws:kafka:region:account-id:group/cluster-name/cluster-uuid/consumer-group-id" ] } ] }

È possibile assegnare queste autorizzazioni a un cluster, un argomento e un gruppo specifici. Per ulteriori informazioni, consulta le operazioni Kafka di Amazon MSK nella Guida per sviluppatori Amazon Managed Streaming for Apache Kafka.

Autenticazione TLS reciproca

MTLS (Mutual TLS) fornisce l'autenticazione bidirezionale tra client e server. Il client invia un certificato al server affinché il server verifichi il client e il server invia un certificato al client affinché il client verifichi il server.

Per Amazon MSK, Lambda funge da cliente. È possibile configurare un certificato client (come segreto in Secrets Manager) per autenticare Lambda con i broker nel cluster MSK. Il certificato client deve essere firmato da una CA nell'archivio trust del server. Il cluster MSK invia un certificato server a Lambda per autenticare i broker con Lambda. Il certificato del server deve essere firmato da un'autorità di certificazione (CA) presente nel AWS trust store.

Per istruzioni su come generare un certificato client, consulta Introduzione dell'autenticazione TLS reciproca per Amazon MSK come origine eventi.

Amazon MSK non supporta i certificati server autofirmati poiché tutti i broker di Amazon MSK utilizzano certificati pubblici firmati dalle autorità di certificazione di Amazon Trust Services, su cui Lambda fa affidamento per impostazione predefinita.

Per ulteriori informazioni, su mTLS per Amazon MSK, consulta Autenticazione TLS reciproca nella Guida per sviluppatori Amazon Managed Streaming for Apache Kafka.

Configurazione del segreto mTLS

Il segreto CLIENT_CERTIFICATE_TLS_AUTH richiede un campo certificato e un campo chiave privata. Per una chiave privata crittografata, il segreto richiede una password per chiave privata. Il certificato e la chiave privata devono essere in formato PEM.

Nota

Lambda supporta il PBES1 (ma non PBES2) come algoritmi di crittografia a chiave privata.

Il campo certificato deve contenere un elenco di certificati, a partire dal certificato client, seguito da qualsiasi certificato intermedio, per finire con il certificato root. Ogni certificato deve iniziare su una nuova riga con la struttura seguente:

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager supporta segreti fino a 65.536 byte, che è uno spazio sufficiente per lunghe catene di certificati.

La chiave privata deve essere in formato PKCS #8, con la struttura seguente:

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

Per una chiave privata crittografata, utilizza la struttura seguente:

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

Nell'esempio seguente viene mostrato il contenuto di un segreto per l'autenticazione mTLS utilizzando una chiave privata crittografata. Per una chiave privata crittografata, includi una password per chiave privata nel segreto.

{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

Come Lambda sceglie un broker bootstrap

Lambda sceglie un broker bootstrap in base ai metodi di autenticazione disponibili nel tuo cluster e se fornisci un segreto per l'autenticazione. Se fornisci un segreto per mTLS o SASL/SCRAM, Lambda sceglie automaticamente quel metodo di autenticazione. Se non fornisci un segreto, Lambda seleziona il metodo di autenticazione più forte attivo sul tuo cluster. Di seguito è riportato l'ordine di priorità in cui Lambda seleziona un broker, dall'autenticazione più forte a quella più debole:

  • mTLS (segreto fornito per mTLS)

  • SASL/SCRAM (segreto fornito per SASL/SCRAM)

  • IAM SASL (nessun segreto fornito e autenticazione IAM attiva)

  • TLS non autenticato (nessun segreto fornito e autenticazione IAM non attiva)

  • Testo semplice (nessun segreto fornito e autenticazione IAM e TLS non autenticato non attivi)

Nota

Se Lambda non riesce a connettersi al tipo di broker più sicuro, non proverà a connettersi a un tipo di broker diverso (più debole). Se vuoi che Lambda scelga un tipo di broker più debole, disattiva tutti i metodi di autenticazione più forti sul tuo cluster.

Gestione dell'accesso e delle autorizzazioni API

Oltre ad accedere al cluster Amazon MSK, la funzione necessita di autorizzazioni per eseguire varie operazioni dell'API Amazon MSK. Aggiungi queste autorizzazioni al ruolo di esecuzione della funzione. Se gli utenti hanno bisogno di accedere a una qualsiasi delle operazioni dell'API Amazon MSK, aggiungi le autorizzazioni richieste alla policy di identità per l'utente o il ruolo.

Puoi aggiungere manualmente ciascuna delle seguenti autorizzazioni al tuo ruolo di esecuzione. In alternativa, puoi allegare la policy AWS gestita AWSLambdaMSKExecutionRoleal tuo ruolo di esecuzione. La policy AWSLambdaMSKExecutionRole contiene tutte le azioni API e le autorizzazioni VPC richieste elencate di seguito.

Autorizzazioni del ruolo di esecuzione della funzione Lambda necessarie

Per creare e archiviare i log in un gruppo di log in Amazon CloudWatch Logs, la funzione Lambda deve disporre delle seguenti autorizzazioni nel ruolo di esecuzione:

Affinché Lambda possa accedere al cluster Amazon MSK per tuo conto, la funzione Lambda deve disporre delle seguenti autorizzazioni per il ruolo di esecuzione:

Devi solo aggiungere uno dei seguenti: kafka:DescribeCluster o kafka:DescribeClusterV2. Sui cluster MSK con provisioning funzionano entrambe le autorizzazioni. Per i cluster MSK serverless è necessario utilizzare kafka:DescribeClusterV2.

Nota

Lambda alla fine prevede di rimuovere l'autorizzazione kafka:DescribeCluster dalla policy associata gestita da AWSLambdaMSKExecutionRole. Se utilizzi questa policy, sarebbe opportuno migrare tutte le applicazioni tramite kafka:DescribeCluster in modo da utilizzare kafka:DescribeClusterV2 al suo posto.

Autorizzazioni VPC

Se solo gli utenti all'interno di un VPC possono accedere al cluster Amazon MSK, la funzione Lambda deve disporre dell'autorizzazione ad accedere alle risorse di Amazon VPC. Queste risorse includono la VPC, le sottoreti, i gruppi di sicurezza e le interfacce di rete. Per accedere a queste risorse, il ruolo di esecuzione della funzione deve disporre delle seguenti autorizzazioni. Queste autorizzazioni sono incluse nella politica AWSLambdaMSKExecutionRole AWS gestita.

Autorizzazioni facoltative per la funzione Lambda

La funzione Lambda potrebbe richiedere autorizzazioni per:

  • Accedi al tuo segreto SCRAM, se utilizzi l'autenticazione SASL/SCRAM.

  • Descrivere il segreto di Secrets Manager.

  • Accedi alla tua AWS Key Management Service (AWS KMS) chiave gestita dal cliente.

  • Invia i record delle chiamate non riuscite a una destinazione.

Secrets Manager e AWS KMS autorizzazioni

A seconda del tipo di controllo degli accessi che stai configurando per i tuoi broker Amazon MSK, la tua funzione Lambda potrebbe aver bisogno dell'autorizzazione per accedere al tuo segreto SCRAM (se usi l'autenticazione SASL/SCRAM) o al segreto di Secrets Manager per decrittografare la chiave gestita dal cliente. AWS KMS Per accedere a queste risorse, il ruolo di esecuzione della funzione deve disporre delle seguenti autorizzazioni:

Aggiunta di autorizzazioni al ruolo di esecuzione

Segui questi passaggi per aggiungere la policy AWS gestita AWSLambdaMSKExecutionRoleal tuo ruolo di esecuzione utilizzando la console IAM.

Per aggiungere una policy AWS gestita
  1. Aprire la pagina Policies (Policy) nella console IAM.

  2. Nella casella di ricerca inserisci il nome della policy (AWSLambdaMSKExecutionRole).

  3. Seleziona la policy dall'elenco, quindi scegli Policy actions (Azioni delle policy), Attach (Allega).

  4. Alla pagina Attach policy (Allega policy), seleziona il ruolo di esecuzione dall'elenco, quindi scegli Attach policy (Allega policy).

Concessione di accesso agli utenti con una policy IAM

Per impostazione predefinita, gli utenti e i ruoli non sono autorizzati a eseguire le operazioni API Amazon MSK. Per concedere l'accesso agli utenti dell'organizzazione o dell'account, è possibile aggiungere una policy basata sull'identità. Per ulteriori informazioni, consulta Esempi di policy basate sull'identità Amazon MSK nella Guida per gli sviluppatori di Amazon Managed Streaming for Apache Kafka.

Errori di autenticazione e autorizzazione

Se manca una delle autorizzazioni necessarie per consumare i dati dal cluster Amazon MSK, Lambda visualizza uno dei seguenti messaggi di errore nella mappatura delle origini degli eventi in Risultato. LastProcessing

Il cluster non è riuscito ad autorizzare Lambda

Per SASL/SCRAM o mTLS, questo errore indica che l'utente fornito non dispone di tutte le seguenti autorizzazioni della lista di controllo accessi Kafka (ACL) richieste:

  • DescribeConfigs Cluster

  • Descrivi il gruppo

  • Leggi il gruppo

  • Descrivi l'argomento

  • Leggi l'argomento

Per il controllo accessi IAM, al ruolo di esecuzione della funzione manca una o più autorizzazioni necessarie per accedere al gruppo o all'argomento. Rivedi l'elenco delle autorizzazioni richieste in Autenticazione basata su ruoli IAM.

Quando si creano ACL Kafka o una policy IAM con le autorizzazioni del cluster Kafka richieste, è necessario specificare l'argomento e il gruppo come risorse. Il nome dell'argomento deve corrispondere all'argomento nella mappatura dell'origine eventi. Il nome del gruppo deve corrispondere all'UUID della mappatura dell'origine eventi.

Dopo avere aggiunto le autorizzazioni richieste al ruolo di esecuzione, potrebbero essere necessari alcuni minuti affinché le modifiche entrino in vigore.

Autenticazione SASL non riuscita

Per SASL/SCRAM, questo errore indica che il nome utente e la password forniti non sono validi.

Per il controllo accessi IAM, al ruolo di esecuzione manca l'autorizzazione kafka-cluster:Connect per il cluster MSK. Aggiungi questa autorizzazione al ruolo e specifica l'Amazon Resource Name (ARN) del cluster come risorsa.

Potresti visualizzare questo errore in modo intermittente. Il cluster rifiuta le connessioni dopo che il numero di connessioni TCP supera la quota del servizio Amazon MSK. Lambda cessa e ritenta finché una connessione non ha esito positivo. Dopo che Lambda si connette al cluster e ha eseguito il polling dei registri, l'ultimo risultato di elaborazione cambia in OK.

Il server non è riuscito ad autenticare Lambda

Questo errore indica che i broker Amazon MSK Kafka non sono riusciti ad autenticarsi con Lambda. Questo errore può verificarsi per uno dei seguenti motivi:

  • Non è stato fornito un certificato client per l'autenticazione mTLS.

  • È stato fornito un certificato client, ma i broker non sono configurati per l'utilizzo di mTLS.

  • Un certificato client non è attendibile per i broker.

Il certificato o la chiave privata forniti non sono validi

Questo errore indica che il consumatore Amazon MSK non ha potuto utilizzare il certificato o la chiave privata forniti. Assicurati che il certificato e la chiave utilizzino il formato PEM e che la crittografia della chiave privata utilizzi un algoritmo PBES1.

Configurazione della rete

Affinché Lambda utilizzi il cluster Kafka come origine di eventi, Lambda deve accedere all'Amazon VPC in cui risiede il cluster. Ti consigliamo di implementare endpoint AWS PrivateLink VPC per Lambda per accedere al tuo VPC. Distribuisci endpoint per Lambda e (). AWS Security Token Service AWS STS Se il broker utilizza l'autenticazione, implementa anche un endpoint VPC per Secrets Manager. Se hai configurato una destinazione in caso di errore, implementa anche un endpoint VPC per il servizio di destinazione.

In alternativa, verifica che il VPC associato al cluster Kafka includa un gateway NAT per sottorete pubblica. Per ulteriori informazioni, consulta Abilita l'accesso a Internet per le funzioni Lambda connesse a VPC.

Se utilizzi endpoint VPC, devi anche configurarli in modo da abilitare i nomi DNS privati.

Quando si crea una mappatura dell'origine degli eventi per un cluster MSK, Lambda verifica se le interfacce di rete elastiche (ENI) sono già presenti per le sottoreti e i gruppi di sicurezza del VPC del cluster. Se Lambda trova ENI esistenti, tenta di riutilizzarli. Altrimenti, Lambda crea nuovi ENI per connettersi all'origine dell'evento e richiamare la tua funzione.

Nota

Le funzioni Lambda vengono sempre eseguite all'interno di VPC di proprietà del servizio Lambda. Questi VPC vengono gestiti automaticamente dal servizio e non sono visibili ai clienti. Puoi anche collegare la tua funzione a un Amazon VPC. In entrambi i casi, la configurazione VPC della funzione non influisce sulla mappatura delle sorgenti degli eventi. Solo la configurazione del VPC dell'origine dell'evento determina il modo in cui Lambda si connette alla fonte dell'evento.

La configurazione di Amazon VPC è individuabile tramite l'API Amazon MSK. Non è necessario configurarlo durante l'installazione utilizzando il comando create-event-source-mapping.

Per ulteriori informazioni sulla configurazione della rete, consulta Configurazione AWS Lambda con un cluster Apache Kafka all'interno di un VPC sul blog di Compute. AWS

Regole del gruppo di sicurezza VPC

Configura i gruppi di sicurezza per l'Amazon VPC contenente il tuo cluster con le seguenti regole (come minimo):

  • Regole in entrata: consenti tutto il traffico sulla porta del broker Amazon MSK (9092 per testo normale, 9094 per TLS, 9096 per SASL, 9098 per IAM) per i gruppi di sicurezza specificati per l'origine eventi.

  • Regole in uscita: consenti tutto il traffico sulla porta 443 per tutte le destinazioni. Consenti tutto il traffico sulla porta del broker Amazon MSK (9092 per testo normale, 9094 per TLS, 9096 per SASL, 9098 per IAM) per i gruppi di sicurezza specificati per l'origine eventi.

  • Se si utilizzano endpoint VPC anziché gateway NAT, i gruppi di sicurezza associati agli endpoint VPC devono consentire tutto il traffico in entrata sulla porta 443 dai gruppi di sicurezza dell'origine eventi.

Uso di endpoint VPC

Quando utilizzi gli endpoint VPC, le chiamate API per richiamare la tua funzione vengono instradate attraverso questi endpoint utilizzando gli ENI. Il responsabile del servizio Lambda deve chiamare sts:AssumeRole e lambda:InvokeFunction attivare tutti i ruoli e le funzioni che utilizzano tali ENI.

Per impostazione predefinita, gli endpoint VPC dispongono di policy IAM aperte. La migliore pratica consiste nel limitare queste policy per consentire solo a soggetti specifici di eseguire le azioni necessarie utilizzando quell'endpoint. Per garantire che la mappatura delle sorgenti degli eventi sia in grado di richiamare la funzione Lambda, la policy degli endpoint VPC deve consentire al principio del servizio Lambda di chiamare e. sts:AssumeRole lambda:InvokeFunction La limitazione delle policy degli endpoint VPC per consentire solo le chiamate API provenienti dall'organizzazione impedisce il corretto funzionamento della mappatura delle sorgenti degli eventi.

I seguenti esempi di policy degli endpoint VPC mostrano come concedere l'accesso richiesto al principale del servizio Lambda per gli endpoint Lambda e Lambda. AWS STS

Esempio Politica degli endpoint VPC - endpoint AWS STS
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
Esempio Politica degli endpoint VPC - Endpoint Lambda
{ "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }

Se il tuo broker Kafka utilizza l'autenticazione, puoi anche limitare la policy degli endpoint VPC per l'endpoint Secrets Manager. Per chiamare l'API Secrets Manager, Lambda utilizza il ruolo della funzione, non il responsabile del servizio Lambda. L'esempio seguente mostra una policy per gli endpoint di Secrets Manager.

Esempio Politica degli endpoint VPC - Endpoint Secrets Manager
{ "Statement": [ { "Action": "secretsmanager:GetSecretValue", "Effect": "Allow", "Principal": { "AWS": [ "customer_function_execution_role_arn" ] }, "Resource": "customer_secret_arn" } ] }

Se hai configurato una destinazione in caso di errore, Lambda utilizza anche il ruolo della tua funzione per chiamare una delle s3:PutObject due sqs:sendMessage o utilizzare gli sns:Publish ENI gestiti da Lambda.

Aggiunta di Amazon MSK come origine eventi

Per creare una mappatura dell'origine eventi, aggiungi il cluster Amazon MSK come trigger della funzione Lambda utilizzando la console Lambda, un SDK AWS o AWS Command Line Interface (AWS CLI). Tieni presente che quando aggiungi Amazon MSK come trigger, Lambda assume le impostazioni VPC del cluster Amazon MSK, non le impostazioni VPC della funzione Lambda.

Questa sezione descrive come creare una mappatura dell'origine eventi utilizzando la console Lambda e AWS CLI.

Prerequisiti

  • Un cluster Amazon MSK e un argomento Kafka. Per ulteriori informazioni, consulta Nozioni di base per l'uso di Amazon MSK nella Guida per gli sviluppatori di Amazon Managed Streaming for Apache Kafka.

  • Un ruolo di esecuzione con autorizzazione ad accedere alle AWS risorse utilizzate dal cluster MSK.

ID gruppo di consumer personalizzabile

Quando configuri Kafka come origine eventi, puoi specificare un ID gruppo di consumer. Questo ID gruppo di consumer è un identificatore esistente per il gruppo di consumer Kafka a cui desideri che la tua funzione Lambda aderisca. Puoi utilizzare questa funzione per migrare senza problemi qualsiasi configurazione di elaborazione dei record Kafka in corso da altri utenti a Lambda.

Se specifichi l'ID gruppo di consumer e sono presenti altri sondaggi attivi all'interno di quel gruppo di consumer, Kafka distribuisce i messaggi a tutti i consumer. In altre parole, Lambda non riceve tutti i messaggi relativi all'argomento Kafka. Se desideri che Lambda gestisca tutti i messaggi dell'argomento, disattiva tutti gli altri sondaggi in quel gruppo di consumer.

Inoltre, se specifichi un ID gruppo di consumer e Kafka trova un gruppo di consumer esistente valido con lo stesso ID, Lambda ignora il parametro StartingPosition per la mappatura dell'origine eventi. Inizia invece ad elaborare i record in base alla compensazione impegnata del gruppo di consumer. Se specifichi un ID gruppo di consumer e Kafka non riesce a trovare un gruppo di consumer esistente, Lambda configura l'origine eventi con la StartingPosition specificata.

L'ID gruppo di consumer deve essere univoco tra tutte le origini eventi Kafka. Dopo aver creato una mappatura dell'origine eventi Kafka con l'ID gruppo di consumer specificato, non sarà più possibile aggiornare questo valore.

Aggiunta di un trigger Amazon MSK (console)

Segui questi passaggi per aggiungere il cluster Amazon MSK e un argomento Kafka come trigger per la funzione Lambda.

Per aggiungere un trigger Amazon MSK alla funzione Lambda (console)
  1. Aprire la pagina Functions (Funzioni) della console Lambda.

  2. Scegliere il nome della funzione Lambda.

  3. In Panoramica delle funzioni, scegliere Aggiungi trigger.

  4. In Trigger configuration (Configurazione trigger), effettua le operazioni seguenti:

    1. Seleziona il tipo di trigger MSK.

    2. Per MSK cluster (Cluster MSK) seleziona il cluster.

    3. Per Batch Size (Dimensione batch), immettere il numero massimo di messaggi da recuperare in un singolo batch.

    4. Per Batch window (Finestra batch), immetti il tempo massimo in secondi per la raccolta dei registri da parte di Lambda prima di richiamare la funzione.

    5. Per Topic name (Nome argomento) immetti un nome per l'argomento Kafka.

    6. (Facoltativo) Per Consumer group ID (ID gruppo di consumer), inserisci l'ID di un gruppo di consumer Kafka a cui aderire.

    7. (Facoltativo) Per Posizione di inizio, scegli Più recente per iniziare a leggere il flusso dal record più recente, Orizzonte di taglio per iniziare dal primo record disponibile o In corrispondenza del timestamp per specificare un timestamp da cui iniziare la lettura.

    8. (Facoltativo) Per Authentication (Autenticazione), scegli la chiave segreta per l'autenticazione con i broker nel cluster MSK.

    9. Per creare il trigger in uno stato disabilitato per il test (scelta consigliata), deselezionare Enable trigger (Abilita trigger). Oppure, per attivare immediatamente il trigger, selezionareAbilita trigger.

  5. Per creare il trigger, scegli Add (Aggiungi).

Aggiunta di un trigger Amazon MSK (AWS CLI)

Usa i seguenti AWS CLI comandi di esempio per creare e visualizzare un trigger Amazon MSK per la tua funzione Lambda.

Creare un trigger utilizzando il AWS CLI

Esempio — Creazione di uno strumento di mappatura dell'origine degli eventi per cluster che utilizzano l'autenticazione IAM

L'esempio seguente utilizza il create-event-source-mapping AWS CLI comando per mappare una funzione Lambda denominata my-kafka-function a un argomento di Kafka denominato. AWSKafkaTopic La posizione iniziale dell'argomento è impostata su LATEST. Quando il cluster utilizza l'autenticazione basata sui ruoli IAM, non è necessario un oggetto di configurazione. SourceAccess Esempio:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
Esempio — Creazione di uno strumento di mappatura dell'origine degli eventi per cluster che utilizzano l'autenticazione SASL/SCRAM

Se il cluster utilizza l'autenticazione SASL/SCRAM, è necessario includere un oggetto di SourceAccessconfigurazione che specifichi e un ARN segreto di SASL_SCRAM_512_AUTH Secrets Manager.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
Esempio — Creazione di uno strumento di mappatura dell'origine degli eventi per cluster che utilizzano l'autenticazione mTLS

Se il cluster utilizza l'autenticazione MTLS, è necessario includere un oggetto di SourceAccessconfigurazione che specifichi CLIENT_CERTIFICATE_TLS_AUTH e un ARN segreto di Secrets Manager.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'

Per ulteriori informazioni, consulta la documentazione di riferimento dell'CreateEventSourceMappingAPI.

Visualizzazione dello stato utilizzando il AWS CLI

L'esempio seguente utilizza il get-event-source-mapping AWS CLI comando per descrivere lo stato della mappatura dell'origine degli eventi creata.

aws lambda get-event-source-mapping \ --uuid 6d9bce8e-836b-442c-8070-74e77903c815

Creazione di strumenti di mappatura dell'origine degli eventi multi-account

È possibile utilizzare la connettività privata multi-VPC per connettere una funzione Lambda a un cluster MSK assegnato in un altro Account AWS. Utilizza la connettività multi-VPC AWS PrivateLink, che mantiene tutto il traffico all'interno della AWS rete.

Nota

Non è possibile creare strumenti di mappatura dell'origine degli eventi multi-account per i cluster MSK serverless.

Per creare uno strumento di mappatura dell'origine degli eventi multi-account, è necessario innanzitutto configurare la connettività multi-VPC per il cluster MSK. Quando crei lo strumento di mappatura dell'origine degli eventi, utilizza l'ARN della connessione VPC gestita anziché l'ARN del cluster, come illustrato negli esempi seguenti. L'CreateEventSourceMappingoperazione varia anche a seconda del tipo di autenticazione utilizzato dal cluster MSK.

Esempio — Creazione di uno strumento di mappatura dell'origine degli eventi multi-account per cluster che utilizzano l'autenticazione IAM

Quando il cluster utilizza l'autenticazione basata sui ruoli IAM, non è necessario un oggetto di configurazione. SourceAccess Esempio:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
Esempio — Creazione di uno strumento di mappatura dell'origine degli eventi multi-account per cluster che utilizzano l'autenticazione SASL/SCRAM

Se il cluster utilizza l'autenticazione SASL/SCRAM, è necessario includere un oggetto di SourceAccessconfigurazione che specifichi e un ARN segreto di SASL_SCRAM_512_AUTH Secrets Manager.

Esistono due modi per utilizzare i segreti per lo strumento di mappatura dell'origine degli eventi Amazon MSK multi-account con l'autenticazione SASL/SCRAM:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
Esempio — Creazione di uno strumento di mappatura dell'origine degli eventi multi-account per cluster che utilizzano l'autenticazione mTLS

Se il cluster utilizza l'autenticazione MTLS, è necessario includere un oggetto di SourceAccessconfigurazione che specifichi CLIENT_CERTIFICATE_TLS_AUTH e un ARN segreto di Secrets Manager. Il segreto può essere archiviato nell'account del cluster o nell'account della funzione Lambda.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'

Destinazioni in caso di errore

Per mantenere i record delle chiamate non riuscite allo strumento di mappatura dell'origine degli eventi, aggiungi una destinazione allo strumento di mappatura dell'origine degli eventi della funzione. Ogni record inviato alla destinazione è un documento JSON con metadati sulla chiamata non riuscita. Puoi configurare qualsiasi argomento Amazon SNS, coda Amazon SQS o bucket S3 come destinazione. Il tuo ruolo di esecuzione deve avere le autorizzazioni per la destinazione:

Inoltre, se hai configurato una chiave KMS sulla destinazione, Lambda necessita delle seguenti autorizzazioni, a seconda del tipo di destinazione:

Per configurare una destinazione in caso di errore tramite la console, completa i seguenti passaggi:

  1. Aprire la pagina Funzioni della console Lambda.

  2. Scegliere una funzione.

  3. In Function overview (Panoramica delle funzioni), scegliere Add destination (Aggiungi destinazione).

  4. Per Origine, scegli Chiamata allo strumento di mappatura dell'origine degli eventi.

  5. Per Strumento di mappatura dell'origine degli eventi, scegli un'origine dell'evento configurata per questa funzione.

  6. Per Condizione, seleziona In caso di errore. Per le chiamate allo strumento di mappatura dell'origine degli eventi, questa è l'unica condizione accettata.

  7. Per Tipo di destinazione, scegli il tipo di destinazione a cui Lambda deve inviare i record di chiamata.

  8. Per Destination (Destinazione), scegliere una risorsa.

  9. Scegliere Save (Salva).

Puoi anche configurare una destinazione AWS CLI in caso di errore utilizzando. Ad esempio, il seguente comando create-event-source-mapping aggiunge una mappatura dell'origine degli eventi con una destinazione SQS in caso di errore a: MyFunction

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

Il seguente comando update-event-source-mapping aggiunge una destinazione S3 in caso di errore all'origine dell'evento associata all'input: uuid

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'

Per rimuovere una destinazione, fornisci una stringa vuota come argomento del parametro destination-config:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

Record di invocazione di esempio SNS e SQS

L'esempio seguente mostra il contenuto che Lambda invia a un argomento SNS o una coda SQS di destinazione per una chiamata non riuscita all'origine dell'evento Kafka. Ciascuna delle chiavi in recordsInfo contiene sia l'argomento sia la partizione di Kafka, separati da un trattino. Ad esempio, per la chiave "Topic-0", Topic è l'argomento di Kafka e 0 è la partizione. Per ogni argomento e partizione, è possibile utilizzare i dati di offset e timestamp per individuare i record di chiamata originali.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }

Record di invocazione di esempio di destinazione S3

Se le destinazioni sono S3, Lambda invia alla destinazione l'intero record di chiamata insieme ai metadati. L'esempio seguente mostra ciò che Lambda invia a un bucket S3 di destinazione per una chiamata non riuscita all'origine dell'evento Kafka. Oltre a tutti i campi dell'esempio precedente per le destinazioni SQS e SNS, il campo payload contiene il record di chiamata originale come stringa JSON con escape.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
Suggerimento

Ti consigliamo di abilitare il controllo delle versioni S3 sul bucket di destinazione.

Scalabilità automatica dell'origine eventi Amazon MSK

Quando si crea inizialmente una fonte evento Amazon MSK, Lambda assegna un consumatore per elaborare tutte le partizioni dell'argomento Kafka. Ogni consumatore ha più processori in esecuzione in parallelo per gestire carichi di lavoro più elevati. Inoltre, Lambda aumenta o diminuisce automaticamente il numero di consumatori in base al carico di lavoro. Per preservare l'ordinamento dei messaggi in ogni partizione, il numero massimo di consumatori è un consumatore per ogni partizione dell'argomento.

Ogni minuto, Lambda valuta il ritardo dell'offset del consumatore di tutte le partizioni dell'argomento. Se il ritardo è troppo alto, la partizione sta ricevendo messaggi più velocemente di quanto Lambda possa elaborarli. Se necessario, Lambda aggiunge o rimuove i consumer dall'argomento. Il processo di dimensionamento di aggiunta o rimozione dei consumatori avviene entro tre minuti dalla valutazione.

Se la funzione Lambda di destinazione è limitata, Lambda riduce il numero di consumer. Questa operazione riduce il carico di lavoro sulla funzione riducendo il numero di messaggi che i consumer possono recuperare e inviare alla funzione.

Per monitorare il throughput dell'argomento Kafka, visualizza il parametro del ritardo dell'offset che Lambda emette mentre la funzione elabora i registri.

Per controllare quante chiamate di funzioni si verificano in parallelo, è inoltre possibile monitorare i parametri di concorrenza per la funzione.

Posizioni di partenza di polling e flussi

Tieni presente che il polling dei flussi durante la creazione e gli aggiornamenti dello strumento di mappatura dell’origine degli eventi alla fine è coerente.

  • Durante la creazione dello strumento di mappatura dell'origine degli eventi, potrebbero essere necessari alcuni minuti per l'avvio degli eventi di polling dal flusso.

  • Durante gli aggiornamenti dello strumento di mappatura dell'origine degli eventi, potrebbero essere necessari alcuni minuti per l'avvio degli eventi di polling dal flusso.

Questo comportamento implica che se specifichi LATEST come posizione iniziale del flusso, lo strumento di mappatura dell'origine degli eventi potrebbe perdere eventi durante la creazione o gli aggiornamenti. Per non perdere alcun evento, specifica la posizione iniziale del flusso come TRIM_HORIZON o AT_TIMESTAMP.

CloudWatch Metriche Amazon

Lambda emette il parametro OffsetLag mentre la funzione elabora i registri. Il valore di questo parametro è la differenza di offset tra l'ultimo registro scritto nell'argomento dell'origine eventi Kafka e l'ultimo registro elaborato da Lambda. Puoi utilizzare OffsetLag per stimare la latenza tra il momento in cui un registro viene aggiunto e il momento in cui il gruppo di consumer lo elabora.

Una tendenza in aumento in OffsetLag può indicare problemi con i sondaggi nel gruppo di consumer della funzione. Per ulteriori informazioni, consulta Utilizzo dei parametri delle funzioni Lambda.

Parametri di configurazione di Amazon MSK

Tutti i tipi di sorgenti di eventi Lambda condividono le stesse operazioni CreateEventSourceMappinge quelle dell'UpdateEventSourceMappingAPI. Tuttavia, solo alcuni dei parametri si applicano ad Amazon MSK.

Parametri dell'origine eventi applicabili ad Amazon MSK
Parametro Obbligatorio Predefinito Note

AmazonManagedKafkaEventSourceConfig

N

Contiene il ConsumerGroupId campo, che per impostazione predefinita è un valore univoco.

Può essere impostato solo su Create

BatchSize

N

100

Massimo: 10.000

Abilitato

N

Abilitato

EventSourceArn

Y

Può essere impostato solo su Create

FunctionName

Y

FilterCriteria

N

Filtro eventi Lambda

MaximumBatchingWindowInSecondi

N

500 ms

Comportamento di batching

SourceAccessConfigurazioni

N

Nessuna credenziale

Credenziali di autenticazione SASL/SCRAM o CLIENT_CERTIFICATE_TLS_AUTH (MutualTLS) per la tua origine eventi

StartingPosition

Y

AT_TIMESTAMP, TRIM_HORIZON o LATEST

Può essere impostato solo su Create

StartingPositionTimestamp

N

Obbligatorio se StartingPosition è impostato su AT_TIMESTAMP

Argomenti

Y

Nome argomento Kafka

Può essere impostato solo su Create