教學課程:搭配 Kinesis Data Streams 使用 Lambda - AWS Lambda

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

教學課程:搭配 Kinesis Data Streams 使用 Lambda

在本教學課程中,您會建立 Lambda 函數來取用來自 Amazon Kinesis 資料串流的事件。

  1. 自訂應用程式將記錄寫入串流。

  2. AWS Lambda 會輪詢串流,並在偵測到串流中的新記錄時,叫用您的 Lambda 函數。

  3. AWS Lambda 會執行 Lambda 函數,方法是假設您在建立 Lambda 函數時指定的執行角色。

必要條件

此教學課程假設您具備基本的 Lambda 操作知識並了解 Lambda 主控台。若您尚未了解,請遵循 使用主控台建立一個 Lambda 函數 中的指示,建立您的第一個 Lambda 函數。

若要完成下列步驟,您需要 AWS Command Line Interface (AWS CLI) 版本 2。命令和預期的輸出會列在不同的區塊中:

aws --version

您應該會看到下列輸出:

aws-cli/2.13.27 Python/3.11.6 Linux/4.14.328-248.540.amzn2.x86_64 exe/x86_64.amzn.2

對於長命令,逸出字元 (\) 用於將命令分割為多行。

在 Linux 和 macOS 上,使用您偏好的 shell 和套件軟體管理工具。

注意

在 Windows 中,作業系統的內建終端機不支援您常與 Lambda 搭配使用的某些 Bash CLI命令 (例如 zip)。若要取得 Ubuntu 和 Bash 的 Windows 整合版本,請安裝適用於 Linux 的 Windows 子系統。本指南中的範例CLI命令使用 Linux 格式。如果您使用 Windows ,則必須重新格式化包含內嵌JSON文件的命令CLI。

建立執行角色

建立 執行角色,讓您的函數有權存取 AWS 資源。

若要建立執行角色
  1. 在IAM主控台中開啟角色頁面

  2. 選擇 建立角色

  3. 建立具備下列屬性的角色。

    • 信任實體 - AWS Lambda

    • 許可AWSLambdaKinesisExecutionRole

    • 角色名稱 - lambda-kinesis-role

AWSLambdaKinesisExecutionRole政策具有 函數從 Kinesis 讀取項目並將日誌寫入 CloudWatch Logs 所需的許可。

建立函數

建立可處理 Kinesis 訊息的 Lambda 函數。函數程式碼會將 Kinesis 記錄的事件 ID 和事件資料記錄到 CloudWatch Logs。

本教學課程使用 Node.js 18.x 執行期,但我們也有提供其他執行期語言的範例程式碼。您可以在下列方塊中選取索引標籤,查看您感興趣的執行期程式碼。您將在此步驟中使用的 JavaScript 程式碼位於JavaScript索引標籤中顯示的第一個範例。

.NET
AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 使用 Lambda 來使用 Kinesis 事件NET。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.KinesisEvents; using AWS.Lambda.Powertools.Logging; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace KinesisIntegrationSampleCode; public class Function { // Powertools Logger requires an environment variables against your function // POWERTOOLS_SERVICE_NAME [Logging(LogEvent = true)] public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context) { if (evnt.Records.Count == 0) { Logger.LogInformation("Empty Kinesis Event received"); return; } foreach (var record in evnt.Records) { try { Logger.LogInformation($"Processed Event with EventId: {record.EventId}"); string data = await GetRecordDataAsync(record.Kinesis, context); Logger.LogInformation($"Data: {data}"); // TODO: Do interesting work based on the new data } catch (Exception ex) { Logger.LogError($"An error occurred {ex.Message}"); throw; } } Logger.LogInformation($"Successfully processed {evnt.Records.Count} records."); } private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context) { byte[] bytes = record.Data.ToArray(); string data = Encoding.UTF8.GetString(bytes); await Task.CompletedTask; //Placeholder for actual async work return data; } }
Go
SDK for Go V2
注意

還有更多 。 GitHub尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Go 搭配 Lambda 來使用 Kinesis 事件。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "log" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error { if len(kinesisEvent.Records) == 0 { log.Printf("empty Kinesis event received") return nil } for _, record := range kinesisEvent.Records { log.Printf("processed Kinesis event with EventId: %v", record.EventID) recordDataBytes := record.Kinesis.Data recordDataText := string(recordDataBytes) log.Printf("record data: %v", recordDataText) // TODO: Do interesting work based on the new data } log.Printf("successfully processed %v records", len(kinesisEvent.Records)) return nil } func main() { lambda.Start(handler) }
Java
SDK 適用於 Java 2.x
注意

還有更多 。 GitHub尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Java 搭配 Lambda 來使用 Kinesis 事件。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package example; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.LambdaLogger; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; public class Handler implements RequestHandler<KinesisEvent, Void> { @Override public Void handleRequest(final KinesisEvent event, final Context context) { LambdaLogger logger = context.getLogger(); if (event.getRecords().isEmpty()) { logger.log("Empty Kinesis Event received"); return null; } for (KinesisEvent.KinesisEventRecord record : event.getRecords()) { try { logger.log("Processed Event with EventId: "+record.getEventID()); String data = new String(record.getKinesis().getData().array()); logger.log("Data:"+ data); // TODO: Do interesting work based on the new data } catch (Exception ex) { logger.log("An error occurred:"+ex.getMessage()); throw ex; } } logger.log("Successfully processed:"+event.getRecords().size()+" records"); return null; } }
JavaScript
SDK 適用於 JavaScript (v3)
注意

還有更多 。 GitHub尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 搭配 Lambda 使用 Kinesis 事件 JavaScript。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 exports.handler = async (event, context) => { for (const record of event.Records) { try { console.log(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); console.log(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { console.error(`An error occurred ${err}`); throw err; } } console.log(`Successfully processed ${event.Records.length} records.`); }; async function getRecordDataAsync(payload) { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }

使用 搭配 Lambda 使用 Kinesis 事件 TypeScript。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { KinesisStreamEvent, Context, KinesisStreamHandler, KinesisStreamRecordPayload, } from "aws-lambda"; import { Buffer } from "buffer"; import { Logger } from "@aws-lambda-powertools/logger"; const logger = new Logger({ logLevel: "INFO", serviceName: "kinesis-stream-handler-sample", }); export const functionHandler: KinesisStreamHandler = async ( event: KinesisStreamEvent, context: Context ): Promise<void> => { for (const record of event.Records) { try { logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); logger.info(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { logger.error(`An error occurred ${err}`); throw err; } logger.info(`Successfully processed ${event.Records.length} records.`); } }; async function getRecordDataAsync( payload: KinesisStreamRecordPayload ): Promise<string> { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }
PHP
適用於 PHP 的 SDK
注意

還有更多 。 GitHub尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 搭配 Lambda 使用 Kinesis 事件PHP。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 <?php # using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\Kinesis\KinesisEvent; use Bref\Event\Kinesis\KinesisHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler extends KinesisHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handleKinesis(KinesisEvent $event, Context $context): void { $this->logger->info("Processing records"); $records = $event->getRecords(); foreach ($records as $record) { $data = $record->getData(); $this->logger->info(json_encode($data)); // TODO: Do interesting work based on the new data // Any exception thrown will be logged and the invocation will be marked as failed } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords records"); } } $logger = new StderrLogger(); return new Handler($logger);
Python
SDK for Python (Boto3)
注意

還有更多 。 GitHub尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Python 搭配 Lambda 來使用 Kinesis 事件。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import base64 def lambda_handler(event, context): for record in event['Records']: try: print(f"Processed Kinesis Event - EventID: {record['eventID']}") record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8') print(f"Record Data: {record_data}") # TODO: Do interesting work based on the new data except Exception as e: print(f"An error occurred {e}") raise e print(f"Successfully processed {len(event['Records'])} records.")
Ruby
SDK for Ruby
注意

還有更多 。 GitHub尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Ruby 搭配 Lambda 使用 Kinesis 事件。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 require 'aws-sdk' def lambda_handler(event:, context:) event['Records'].each do |record| begin puts "Processed Kinesis Event - EventID: #{record['eventID']}" record_data = get_record_data_async(record['kinesis']) puts "Record Data: #{record_data}" # TODO: Do interesting work based on the new data rescue => err $stderr.puts "An error occurred #{err}" raise err end end puts "Successfully processed #{event['Records'].length} records." end def get_record_data_async(payload) data = Base64.decode64(payload['data']).force_encoding('UTF-8') # Placeholder for actual async work # You can use Ruby's asynchronous programming tools like async/await or fibers here. return data end
Rust
SDK for Rust
注意

還有更多 。 GitHub尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Rust 搭配 Lambda 使用 Kinesis 事件。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::event::kinesis::KinesisEvent; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> { if event.payload.records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(()); } event.payload.records.iter().for_each(|record| { tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default()); let record_data = std::str::from_utf8(&record.kinesis.data); match record_data { Ok(data) => { // log the record data tracing::info!("Data: {}", data); } Err(e) => { tracing::error!("Error: {}", e); } } }); tracing::info!( "Successfully processed {} records", event.payload.records.len() ); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }
建立函數
  1. 建立專案的目錄,然後切換至該目錄。

    mkdir kinesis-tutorial cd kinesis-tutorial
  2. 將範例 JavaScript 程式碼複製到名為 的新檔案中index.js

  3. 建立部署套件。

    zip function.zip index.js
  4. 使用 create-function 命令建立一個 Lambda 函數。

    aws lambda create-function --function-name ProcessKinesisRecords \ --zip-file fileb://function.zip --handler index.handler --runtime nodejs18.x \ --role arn:aws:iam::111122223333:role/lambda-kinesis-role

測試 Lambda 函數

使用 invoke AWS Lambda CLI命令和範例 Kinesis 事件手動叫用 Lambda 函數。

測試 Lambda 函數
  1. 將下列項目複製到 JSON 檔案,並將其儲存為 input.txt

    { "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream" } ] }
  2. 使用 invoke 命令來傳送事件到函數。

    aws lambda invoke --function-name ProcessKinesisRecords \ --cli-binary-format raw-in-base64-out \ --payload file://input.txt outputfile.txt

    如果您使用的是 第 2 AWS CLI 版,則需要 cli-binary-format選項。若要讓此成為預設的設定,請執行 aws configure set cli-binary-format raw-in-base64-out。若要取得更多資訊,請參閱《AWS Command Line Interface 使用者指南第 2 版》AWS CLI 支援的全域命令列選項

    回應已儲存至 out.txt

建立 Kinesis 串流

使用 create-stream 命令來建立串流。

aws kinesis create-stream --stream-name lambda-stream --shard-count 1

執行下列describe-stream命令以取得串流 ARN。

aws kinesis describe-stream --stream-name lambda-stream

您應該會看到下列輸出:

{ "StreamDescription": { "Shards": [ { "ShardId": "shardId-000000000000", "HashKeyRange": { "StartingHashKey": "0", "EndingHashKey": "340282366920746074317682119384634633455" }, "SequenceNumberRange": { "StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610" } } ], "StreamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream", "StreamName": "lambda-stream", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 24, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "KeyId": null, "StreamCreationTimestamp": 1544828156.0 } }

您可以在下一個步驟ARN中使用串流,將串流與 Lambda 函數建立關聯。

在 AWS Lambda中新增事件來源

執行下列 AWS CLI add-event-source 命令。

aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream \ --batch-size 100 --starting-position LATEST

記下映射 ID 以供後續使用。您可以執行 list-event-source-mappings 命令來取得事件來源映射的清單。

aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream

在回應中,您可以確認狀態值為 enabled。您可以停用事件來源映射來暫時暫停輪詢,而不會遺失任何記錄。

測試設定

若要測試事件來源映射,請新增事件記錄到您的 Kinesis 串流。此--data值是字串,在傳送至 Kinesis CLI 之前, 會編碼為 base64。您可以執行相同命令一次以上,以新增多筆記錄到串流中。

aws kinesis put-record --stream-name lambda-stream --partition-key 1 \ --data "Hello, this is a test."

Lambda 使用執行角色自串流讀取記錄。接著,它調用您的 Lambda 函數,以記錄批次來傳遞。函數會解碼每個記錄的資料並將其記錄下來,並將輸出傳送至 CloudWatch Logs。在 CloudWatch 主控台中檢視日誌。

清除您的資源

除非您想要保留為此教學課程建立的資源,否則您現在便可刪除。透過刪除不再使用 AWS 的資源,您可以避免不必要的 費用 AWS 帳戶。

刪除執行角色
  1. 開啟IAM主控台的角色頁面

  2. 選取您建立的執行角色。

  3. 選擇 刪除

  4. 在文字輸入欄位中輸入角色的名稱,然後選擇 刪除

若要刪除 Lambda 函數
  1. 開啟 Lambda 主控台中的 函數頁面

  2. 選擇您建立的函數。

  3. 選擇 Actions (動作)、Delete (刪除)。

  4. 在文字輸入欄位中輸入 delete,然後選擇 刪除

刪除 Kinesis 串流
  1. 登入 AWS Management Console 並在 https://console.aws.amazon.com/kinesis 開啟 Kinesis 主控台。

  2. 選取您建立的串流。

  3. 選擇 動作刪除

  4. 在文字輸入欄位中輸入 delete

  5. 選擇 刪除