Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Tutoriel : Utilisation de Lambda avec les flux de données Kinesis
Dans ce tutoriel, vous créez une fonction Lambda pour consommer des événements à partir d’un flux de données Amazon Kinesis.
-
L’application personnalisée écrit les enregistrements dans le flux.
-
AWS Lambda interroge le flux et invoque votre fonction Lambda quand il y détecte de nouveaux enregistrements.
-
AWS Lambda exécute la fonction Lambda en endossant le rôle d’exécution que vous avez spécifié au moment de sa création.
Prérequis
Ce didacticiel suppose que vous avez quelques connaissances des opérations Lambda de base et de la console Lambda. Si ce n’est déjà fait, suivez les instructions fournies dans Créer une fonction Lambda à l'aide de la console pour créer votre première fonction Lambda.
Pour effectuer les étapes suivantes, vous avez besoin de l’AWS CLI version 2. Les commandes et la sortie attendue sont répertoriées dans des blocs distincts :
aws --version
Vous devriez voir la sortie suivante:
aws-cli/2.13.27 Python/3.11.6 Linux/4.14.328-248.540.amzn2.x86_64 exe/x86_64.amzn.2
Pour les commandes longues, un caractère d’échappement (\
) est utilisé pour les fractionner en plusieurs lignes.
Sur Linux et macOS, utilisez votre gestionnaire de shell et de package préféré.
Sous Windows, certaines commandes CLI Bash que vous utilisez couramment avec Lambda (par exemple zip
) ne sont pas prises en charge par les terminaux intégrés du système d’exploitation. Installez le sous-système Windows pour Linux afin d’obtenir une version intégrée à Windows d’Ubuntu et Bash. Les exemples de commandes CLI de ce guide utilisent le formatage Linux. Les commandes qui incluent des documents JSON en ligne doivent être reformatées si vous utilisez la CLI Windows.
Créer le rôle d’exécution
Créez le rôle d’exécution qui donne à votre fonction l’autorisation d’accéder aux ressources AWS.
Pour créer un rôle d’exécution
-
Ouvrez la page Roles (Rôles) dans la console IAM.
-
Sélectionnez Créer un rôle.
-
Créez un rôle avec les propriétés suivantes :
-
Entité de confiance – AWS Lambda.
-
Autorisations – AWSLambdaKinesisExecutionRole.
-
Nom de rôle – lambda-kinesis-role
.
La stratégie AWSLambdaKinesisExecutionRole possède les autorisations dont la fonction a besoin pour lire les éléments dans Kinesis et écrire des journaux dans CloudWatch Logs.
Créer la fonction
Créez une fonction Lambda qui traite vos messages Kinesis. Le code de la fonction enregistre l’ID de l’événement et les données de l’événement de l’enregistrement Kinesis dans les journaux CloudWatch.
Ce didacticiel utilise l’exécution Node.js 18.x, mais nous avons également fourni des exemples de code dans d’autres langages d’exécution. Vous pouvez sélectionner l’onglet dans la zone suivante pour voir le code de l’exécution qui vous intéresse. Le code JavaScript que vous allez utiliser dans cette étape se trouve dans le premier exemple affiché dans l’onglet JavaScript.
- .NET
-
- AWS SDK for .NET
-
Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.
Consommation d’un événement Kinesis avec Lambda à l’aide de .NET.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace KinesisIntegrationSampleCode;
public class Function
{
// Powertools Logger requires an environment variables against your function
// POWERTOOLS_SERVICE_NAME
[Logging(LogEvent = true)]
public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context)
{
if (evnt.Records.Count == 0)
{
Logger.LogInformation("Empty Kinesis Event received");
return;
}
foreach (var record in evnt.Records)
{
try
{
Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
string data = await GetRecordDataAsync(record.Kinesis, context);
Logger.LogInformation($"Data: {data}");
// TODO: Do interesting work based on the new data
}
catch (Exception ex)
{
Logger.LogError($"An error occurred {ex.Message}");
throw;
}
}
Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
}
private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
{
byte[] bytes = record.Data.ToArray();
string data = Encoding.UTF8.GetString(bytes);
await Task.CompletedTask; //Placeholder for actual async work
return data;
}
}
- Go
-
- Kit SDK for Go V2
-
Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.
Consommation d’un événement Kinesis avec Lambda à l’aide de Go.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main
import (
"context"
"log"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
)
func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error {
if len(kinesisEvent.Records) == 0 {
log.Printf("empty Kinesis event received")
return nil
}
for _, record := range kinesisEvent.Records {
log.Printf("processed Kinesis event with EventId: %v", record.EventID)
recordDataBytes := record.Kinesis.Data
recordDataText := string(recordDataBytes)
log.Printf("record data: %v", recordDataText)
// TODO: Do interesting work based on the new data
}
log.Printf("successfully processed %v records", len(kinesisEvent.Records))
return nil
}
func main() {
lambda.Start(handler)
}
- Java
-
- SDK pour Java 2.x
-
Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.
Consommation d’un événement Kinesis avec Lambda à l’aide de Java.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package example;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
public class Handler implements RequestHandler<KinesisEvent, Void> {
@Override
public Void handleRequest(final KinesisEvent event, final Context context) {
LambdaLogger logger = context.getLogger();
if (event.getRecords().isEmpty()) {
logger.log("Empty Kinesis Event received");
return null;
}
for (KinesisEvent.KinesisEventRecord record : event.getRecords()) {
try {
logger.log("Processed Event with EventId: "+record.getEventID());
String data = new String(record.getKinesis().getData().array());
logger.log("Data:"+ data);
// TODO: Do interesting work based on the new data
}
catch (Exception ex) {
logger.log("An error occurred:"+ex.getMessage());
throw ex;
}
}
logger.log("Successfully processed:"+event.getRecords().size()+" records");
return null;
}
}
- JavaScript
-
- Kit SDK pour JavaScript (v3)
-
Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.
Consommation d’un événement Kinesis avec Lambda à l’aide de JavaScript.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
for (const record of event.Records) {
try {
console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
const recordData = await getRecordDataAsync(record.kinesis);
console.log(`Record Data: ${recordData}`);
// TODO: Do interesting work based on the new data
} catch (err) {
console.error(`An error occurred ${err}`);
throw err;
}
}
console.log(`Successfully processed ${event.Records.length} records.`);
};
async function getRecordDataAsync(payload) {
var data = Buffer.from(payload.data, "base64").toString("utf-8");
await Promise.resolve(1); //Placeholder for actual async work
return data;
}
Consommation d’un événement Kinesis avec Lambda à l’aide de TypeScript.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
KinesisStreamEvent,
Context,
KinesisStreamHandler,
KinesisStreamRecordPayload,
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";
const logger = new Logger({
logLevel: "INFO",
serviceName: "kinesis-stream-handler-sample",
});
export const functionHandler: KinesisStreamHandler = async (
event: KinesisStreamEvent,
context: Context
): Promise<void> => {
for (const record of event.Records) {
try {
logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
const recordData = await getRecordDataAsync(record.kinesis);
logger.info(`Record Data: ${recordData}`);
// TODO: Do interesting work based on the new data
} catch (err) {
logger.error(`An error occurred ${err}`);
throw err;
}
logger.info(`Successfully processed ${event.Records.length} records.`);
}
};
async function getRecordDataAsync(
payload: KinesisStreamRecordPayload
): Promise<string> {
var data = Buffer.from(payload.data, "base64").toString("utf-8");
await Promise.resolve(1); //Placeholder for actual async work
return data;
}
- PHP
-
- Kit SDK pour PHP
-
Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.
Consommation d’un événement Kinesis avec Lambda à l’aide de PHP.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php
# using bref/bref and bref/logger for simplicity
use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Kinesis\KinesisHandler;
use Bref\Logger\StderrLogger;
require __DIR__ . '/vendor/autoload.php';
class Handler extends KinesisHandler
{
private StderrLogger $logger;
public function __construct(StderrLogger $logger)
{
$this->logger = $logger;
}
/**
* @throws JsonException
* @throws \Bref\Event\InvalidLambdaEvent
*/
public function handleKinesis(KinesisEvent $event, Context $context): void
{
$this->logger->info("Processing records");
$records = $event->getRecords();
foreach ($records as $record) {
$data = $record->getData();
$this->logger->info(json_encode($data));
// TODO: Do interesting work based on the new data
// Any exception thrown will be logged and the invocation will be marked as failed
}
$totalRecords = count($records);
$this->logger->info("Successfully processed $totalRecords records");
}
}
$logger = new StderrLogger();
return new Handler($logger);
- Python
-
- SDK pour Python (Boto3)
-
Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.
Consommation d’un événement Kinesis avec Lambda à l’aide de Python.
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import base64
def lambda_handler(event, context):
for record in event['Records']:
try:
print(f"Processed Kinesis Event - EventID: {record['eventID']}")
record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8')
print(f"Record Data: {record_data}")
# TODO: Do interesting work based on the new data
except Exception as e:
print(f"An error occurred {e}")
raise e
print(f"Successfully processed {len(event['Records'])} records.")
- Ruby
-
- Kit SDK pour Ruby
-
Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.
Consommation d’un événement Kinesis avec Lambda à l’aide de Ruby.
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'
def lambda_handler(event:, context:)
event['Records'].each do |record|
begin
puts "Processed Kinesis Event - EventID: #{record['eventID']}"
record_data = get_record_data_async(record['kinesis'])
puts "Record Data: #{record_data}"
# TODO: Do interesting work based on the new data
rescue => err
$stderr.puts "An error occurred #{err}"
raise err
end
end
puts "Successfully processed #{event['Records'].length} records."
end
def get_record_data_async(payload)
data = Base64.decode64(payload['data']).force_encoding('UTF-8')
# Placeholder for actual async work
# You can use Ruby's asynchronous programming tools like async/await or fibers here.
return data
end
- Rust
-
- SDK pour Rust
-
Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.
Consommation d’un événement Kinesis avec Lambda à l’aide de Rust.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::event::kinesis::KinesisEvent;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> {
if event.payload.records.is_empty() {
tracing::info!("No records found. Exiting.");
return Ok(());
}
event.payload.records.iter().for_each(|record| {
tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default());
let record_data = std::str::from_utf8(&record.kinesis.data);
match record_data {
Ok(data) => {
// log the record data
tracing::info!("Data: {}", data);
}
Err(e) => {
tracing::error!("Error: {}", e);
}
}
});
tracing::info!(
"Successfully processed {} records",
event.payload.records.len()
);
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();
run(service_fn(function_handler)).await
}
Pour créer la fonction
-
Créez un répertoire pour le projet, puis passez à ce répertoire.
mkdir kinesis-tutorial
cd kinesis-tutorial
-
Copiez l’exemple de code JavaScript dans un nouveau fichier nommé index.js
.
-
Créez un package de déploiement.
zip function.zip index.js
-
Créez une fonction Lambda à l’aide de la commande create-function
.
aws lambda create-function --function-name ProcessKinesisRecords \
--zip-file fileb://function.zip --handler index.handler --runtime nodejs18.x \
--role arn:aws:iam::111122223333
:role/lambda-kinesis-role
Test de la fonction Lambda
Invoquez manuellement la fonction Lambda à l’aide de la commande CLI invoke
AWS Lambda et d’un exemple d’événement Kinesis.
Pour tester la fonction Lambda
-
Copiez le code JSON suivant dans un fichier et enregistrez-le sous le nom input.txt
.
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
"approximateArrivalTimestamp": 1545084650.987
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role",
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream"
}
]
}
-
Utilisez la commande invoke
pour envoyer l’événement à la fonction.
aws lambda invoke --function-name ProcessKinesisRecords \
--cli-binary-format raw-in-base64-out \
--payload file://input.txt outputfile.txt
L’option cli-binary-format est obligatoire si vous utilisez AWS CLI version 2. Pour faire de ce paramètre le paramètre par défaut, exécutez aws configure set cli-binary-format raw-in-base64-out
. Pour plus d’informations, consultez les options de ligne de commande globales prises en charge par l’AWS CLI dans le Guide de l’utilisateur AWS Command Line Interface version 2.
La réponse est enregistrée dans out.txt
.
Pour créer un flux, utilisez la commande create-stream
.
aws kinesis create-stream --stream-name lambda-stream --shard-count 1
Exécutez la commande describe-stream
suivante pour obtenir l’ARN du flux.
aws kinesis describe-stream --stream-name lambda-stream
Vous devriez voir la sortie suivante:
{
"StreamDescription": {
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920746074317682119384634633455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610"
}
}
],
"StreamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream",
"StreamName": "lambda-stream",
"StreamStatus": "ACTIVE",
"RetentionPeriodHours": 24,
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"EncryptionType": "NONE",
"KeyId": null,
"StreamCreationTimestamp": 1544828156.0
}
}
Vous utilisez l’ARN du flux à l’étape suivante pour associer le flux à la fonction Lambda.
Exécutez la commande AWS CLI add-event-source
suivante.
aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream \
--batch-size 100 --starting-position LATEST
Notez l’ID de mappage pour une utilisation ultérieure. Pour obtenir une liste des mappages de source d’événement, exécutez la commande suivante list-event-source-mappings
.
aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream
Dans la réponse, vous pouvez vérifier que la valeur d’état indique enabled
. Les mappages de source d’événement peuvent être désactivés pour suspendre temporairement l’interrogation, ce qui entraîne la perte d’enregistrements.
Pour tester le mappage de source d’événement, ajoutez des enregistrements d’événements à votre flux Kinesis. La valeur --data
est une chaîne que la commande CLI encode en base 64 avant de l’envoyer à Kinesis. Vous pouvez exécuter la même commande plus d’une fois pour ajouter plusieurs enregistrements dans le flux.
aws kinesis put-record --stream-name lambda-stream --partition-key 1 \
--data "Hello, this is a test."
Lambda utilise le rôle d’exécution pour lire les enregistrements du flux. Ensuite, il invoque votre fonction Lambda en transmettant des lots d’enregistrements. La fonction décode les données de chaque enregistrement et les consigne, en envoyant le résultat à CloudWatch Logs. Affichez les journaux dans la console CloudWatch.
Nettoyage de vos ressources
Vous pouvez maintenant supprimer les ressources que vous avez créées pour ce didacticiel, sauf si vous souhaitez les conserver. En supprimant des ressources AWS que vous n’utilisez plus, vous évitez les frais superflus pour votre Compte AWS.
Pour supprimer le rôle d’exécution
-
Ouvrez la page Roles (Rôles) de la console IAM.
-
Sélectionnez le rôle d’exécution que vous avez créé.
-
Sélectionnez Delete (Supprimer).
-
Saisissez le nom du rôle dans le champ de saisie de texte et choisissez Delete (Supprimer).
Pour supprimer la fonction Lambda
-
Ouvrez la page Functions (Fonctions) de la console Lambda.
-
Sélectionnez la fonction que vous avez créée.
-
Sélectionnez Actions, Supprimer.
-
Saisissez delete
dans la zone de saisie de texte et choisissez Delete (Supprimer).
Pour supprimer le flux Kinesis
-
Connectez-vous à la AWS Management Console et ouvrez la console Kinesis à partir de l’adresse https://console.aws.amazon.com/kinesis.
-
Sélectionnez le flux que vous avez créé.
-
Sélectionnez Actions, Supprimer.
-
Saisissez delete
dans le champ de saisie de texte.
-
Sélectionnez Supprimer.