本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
教學課程:搭配 Kinesis Data Streams 使用 Lambda
在本教學課程中,您會建立 Lambda 函數來取用來自 Amazon Kinesis 資料串流的事件。
-
自訂應用程式將記錄寫入串流。
-
AWS Lambda 會輪詢串流,並在偵測到串流中的新記錄時,叫用您的 Lambda 函數。
-
AWS Lambda 會執行 Lambda 函數,方法是假設您在建立 Lambda 函數時指定的執行角色。
必要條件
此教學課程假設您具備基本的 Lambda 操作知識並了解 Lambda 主控台。若您尚未了解,請遵循 使用主控台建立一個 Lambda 函數 中的指示,建立您的第一個 Lambda 函數。
若要完成下列步驟,您需要 AWS Command Line Interface (AWS CLI) 版本 2。命令和預期的輸出會列在不同的區塊中:
aws --version
您應該會看到下列輸出:
aws-cli/2.13.27 Python/3.11.6 Linux/4.14.328-248.540.amzn2.x86_64 exe/x86_64.amzn.2
對於長命令,逸出字元 (\
) 用於將命令分割為多行。
在 Linux 和 macOS 上,使用您偏好的 shell 和套件軟體管理工具。
在 Windows 中,作業系統的內建終端機不支援您常與 Lambda 搭配使用的某些 Bash CLI命令 (例如 zip
)。若要取得 Ubuntu 和 Bash 的 Windows 整合版本,請安裝適用於 Linux 的 Windows 子系統。本指南中的範例CLI命令使用 Linux 格式。如果您使用 Windows ,則必須重新格式化包含內嵌JSON文件的命令CLI。
建立執行角色
建立 執行角色,讓您的函數有權存取 AWS 資源。
若要建立執行角色
-
在IAM主控台中開啟角色頁面。
-
選擇 建立角色。
-
建立具備下列屬性的角色。
此AWSLambdaKinesisExecutionRole政策具有 函數從 Kinesis 讀取項目並將日誌寫入 CloudWatch Logs 所需的許可。
建立函數
建立可處理 Kinesis 訊息的 Lambda 函數。函數程式碼會將 Kinesis 記錄的事件 ID 和事件資料記錄到 CloudWatch Logs。
本教學課程使用 Node.js 18.x 執行期,但我們也有提供其他執行期語言的範例程式碼。您可以在下列方塊中選取索引標籤,查看您感興趣的執行期程式碼。您將在此步驟中使用的 JavaScript 程式碼位於JavaScript索引標籤中顯示的第一個範例。
- .NET
-
- AWS SDK for .NET
-
還有更多 。 GitHub尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 使用 Lambda 來使用 Kinesis 事件NET。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace KinesisIntegrationSampleCode;
public class Function
{
// Powertools Logger requires an environment variables against your function
// POWERTOOLS_SERVICE_NAME
[Logging(LogEvent = true)]
public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context)
{
if (evnt.Records.Count == 0)
{
Logger.LogInformation("Empty Kinesis Event received");
return;
}
foreach (var record in evnt.Records)
{
try
{
Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
string data = await GetRecordDataAsync(record.Kinesis, context);
Logger.LogInformation($"Data: {data}");
// TODO: Do interesting work based on the new data
}
catch (Exception ex)
{
Logger.LogError($"An error occurred {ex.Message}");
throw;
}
}
Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
}
private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
{
byte[] bytes = record.Data.ToArray();
string data = Encoding.UTF8.GetString(bytes);
await Task.CompletedTask; //Placeholder for actual async work
return data;
}
}
- Go
-
- SDK for Go V2
-
還有更多 。 GitHub尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 Go 搭配 Lambda 來使用 Kinesis 事件。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main
import (
"context"
"log"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
)
func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error {
if len(kinesisEvent.Records) == 0 {
log.Printf("empty Kinesis event received")
return nil
}
for _, record := range kinesisEvent.Records {
log.Printf("processed Kinesis event with EventId: %v", record.EventID)
recordDataBytes := record.Kinesis.Data
recordDataText := string(recordDataBytes)
log.Printf("record data: %v", recordDataText)
// TODO: Do interesting work based on the new data
}
log.Printf("successfully processed %v records", len(kinesisEvent.Records))
return nil
}
func main() {
lambda.Start(handler)
}
- Java
-
- SDK 適用於 Java 2.x
-
還有更多 。 GitHub尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 Java 搭配 Lambda 來使用 Kinesis 事件。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package example;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
public class Handler implements RequestHandler<KinesisEvent, Void> {
@Override
public Void handleRequest(final KinesisEvent event, final Context context) {
LambdaLogger logger = context.getLogger();
if (event.getRecords().isEmpty()) {
logger.log("Empty Kinesis Event received");
return null;
}
for (KinesisEvent.KinesisEventRecord record : event.getRecords()) {
try {
logger.log("Processed Event with EventId: "+record.getEventID());
String data = new String(record.getKinesis().getData().array());
logger.log("Data:"+ data);
// TODO: Do interesting work based on the new data
}
catch (Exception ex) {
logger.log("An error occurred:"+ex.getMessage());
throw ex;
}
}
logger.log("Successfully processed:"+event.getRecords().size()+" records");
return null;
}
}
- JavaScript
-
- SDK 適用於 JavaScript (v3)
-
還有更多 。 GitHub尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 搭配 Lambda 使用 Kinesis 事件 JavaScript。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
for (const record of event.Records) {
try {
console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
const recordData = await getRecordDataAsync(record.kinesis);
console.log(`Record Data: ${recordData}`);
// TODO: Do interesting work based on the new data
} catch (err) {
console.error(`An error occurred ${err}`);
throw err;
}
}
console.log(`Successfully processed ${event.Records.length} records.`);
};
async function getRecordDataAsync(payload) {
var data = Buffer.from(payload.data, "base64").toString("utf-8");
await Promise.resolve(1); //Placeholder for actual async work
return data;
}
使用 搭配 Lambda 使用 Kinesis 事件 TypeScript。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
KinesisStreamEvent,
Context,
KinesisStreamHandler,
KinesisStreamRecordPayload,
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";
const logger = new Logger({
logLevel: "INFO",
serviceName: "kinesis-stream-handler-sample",
});
export const functionHandler: KinesisStreamHandler = async (
event: KinesisStreamEvent,
context: Context
): Promise<void> => {
for (const record of event.Records) {
try {
logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
const recordData = await getRecordDataAsync(record.kinesis);
logger.info(`Record Data: ${recordData}`);
// TODO: Do interesting work based on the new data
} catch (err) {
logger.error(`An error occurred ${err}`);
throw err;
}
logger.info(`Successfully processed ${event.Records.length} records.`);
}
};
async function getRecordDataAsync(
payload: KinesisStreamRecordPayload
): Promise<string> {
var data = Buffer.from(payload.data, "base64").toString("utf-8");
await Promise.resolve(1); //Placeholder for actual async work
return data;
}
- PHP
-
- 適用於 PHP 的 SDK
-
還有更多 。 GitHub尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 搭配 Lambda 使用 Kinesis 事件PHP。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php
# using bref/bref and bref/logger for simplicity
use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Kinesis\KinesisHandler;
use Bref\Logger\StderrLogger;
require __DIR__ . '/vendor/autoload.php';
class Handler extends KinesisHandler
{
private StderrLogger $logger;
public function __construct(StderrLogger $logger)
{
$this->logger = $logger;
}
/**
* @throws JsonException
* @throws \Bref\Event\InvalidLambdaEvent
*/
public function handleKinesis(KinesisEvent $event, Context $context): void
{
$this->logger->info("Processing records");
$records = $event->getRecords();
foreach ($records as $record) {
$data = $record->getData();
$this->logger->info(json_encode($data));
// TODO: Do interesting work based on the new data
// Any exception thrown will be logged and the invocation will be marked as failed
}
$totalRecords = count($records);
$this->logger->info("Successfully processed $totalRecords records");
}
}
$logger = new StderrLogger();
return new Handler($logger);
- Python
-
- SDK for Python (Boto3)
-
還有更多 。 GitHub尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 Python 搭配 Lambda 來使用 Kinesis 事件。
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import base64
def lambda_handler(event, context):
for record in event['Records']:
try:
print(f"Processed Kinesis Event - EventID: {record['eventID']}")
record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8')
print(f"Record Data: {record_data}")
# TODO: Do interesting work based on the new data
except Exception as e:
print(f"An error occurred {e}")
raise e
print(f"Successfully processed {len(event['Records'])} records.")
- Ruby
-
- SDK for Ruby
-
還有更多 。 GitHub尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 Ruby 搭配 Lambda 使用 Kinesis 事件。
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'
def lambda_handler(event:, context:)
event['Records'].each do |record|
begin
puts "Processed Kinesis Event - EventID: #{record['eventID']}"
record_data = get_record_data_async(record['kinesis'])
puts "Record Data: #{record_data}"
# TODO: Do interesting work based on the new data
rescue => err
$stderr.puts "An error occurred #{err}"
raise err
end
end
puts "Successfully processed #{event['Records'].length} records."
end
def get_record_data_async(payload)
data = Base64.decode64(payload['data']).force_encoding('UTF-8')
# Placeholder for actual async work
# You can use Ruby's asynchronous programming tools like async/await or fibers here.
return data
end
- Rust
-
- SDK for Rust
-
還有更多 。 GitHub尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 Rust 搭配 Lambda 使用 Kinesis 事件。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::event::kinesis::KinesisEvent;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> {
if event.payload.records.is_empty() {
tracing::info!("No records found. Exiting.");
return Ok(());
}
event.payload.records.iter().for_each(|record| {
tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default());
let record_data = std::str::from_utf8(&record.kinesis.data);
match record_data {
Ok(data) => {
// log the record data
tracing::info!("Data: {}", data);
}
Err(e) => {
tracing::error!("Error: {}", e);
}
}
});
tracing::info!(
"Successfully processed {} records",
event.payload.records.len()
);
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();
run(service_fn(function_handler)).await
}
建立函數
-
建立專案的目錄,然後切換至該目錄。
mkdir kinesis-tutorial
cd kinesis-tutorial
-
將範例 JavaScript 程式碼複製到名為 的新檔案中index.js
。
-
建立部署套件。
zip function.zip index.js
-
使用 create-function
命令建立一個 Lambda 函數。
aws lambda create-function --function-name ProcessKinesisRecords \
--zip-file fileb://function.zip --handler index.handler --runtime nodejs18.x \
--role arn:aws:iam::111122223333
:role/lambda-kinesis-role
測試 Lambda 函數
使用 invoke
AWS Lambda CLI命令和範例 Kinesis 事件手動叫用 Lambda 函數。
測試 Lambda 函數
-
將下列項目複製到 JSON 檔案,並將其儲存為 input.txt
。
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
"approximateArrivalTimestamp": 1545084650.987
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role",
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream"
}
]
}
-
使用 invoke
命令來傳送事件到函數。
aws lambda invoke --function-name ProcessKinesisRecords \
--cli-binary-format raw-in-base64-out \
--payload file://input.txt outputfile.txt
如果您使用的是 第 2 AWS CLI 版,則需要 cli-binary-format選項。若要讓此成為預設的設定,請執行 aws configure set cli-binary-format raw-in-base64-out
。若要取得更多資訊,請參閱《AWS Command Line Interface 使用者指南第 2 版》中 AWS CLI 支援的全域命令列選項。
回應已儲存至 out.txt
。
使用 create-stream
命令來建立串流。
aws kinesis create-stream --stream-name lambda-stream --shard-count 1
執行下列describe-stream
命令以取得串流 ARN。
aws kinesis describe-stream --stream-name lambda-stream
您應該會看到下列輸出:
{
"StreamDescription": {
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920746074317682119384634633455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610"
}
}
],
"StreamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream",
"StreamName": "lambda-stream",
"StreamStatus": "ACTIVE",
"RetentionPeriodHours": 24,
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"EncryptionType": "NONE",
"KeyId": null,
"StreamCreationTimestamp": 1544828156.0
}
}
您可以在下一個步驟ARN中使用串流,將串流與 Lambda 函數建立關聯。
執行下列 AWS CLI add-event-source
命令。
aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream \
--batch-size 100 --starting-position LATEST
記下映射 ID 以供後續使用。您可以執行 list-event-source-mappings
命令來取得事件來源映射的清單。
aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream
在回應中,您可以確認狀態值為 enabled
。您可以停用事件來源映射來暫時暫停輪詢,而不會遺失任何記錄。
若要測試事件來源映射,請新增事件記錄到您的 Kinesis 串流。此--data
值是字串,在傳送至 Kinesis CLI 之前, 會編碼為 base64。您可以執行相同命令一次以上,以新增多筆記錄到串流中。
aws kinesis put-record --stream-name lambda-stream --partition-key 1 \
--data "Hello, this is a test."
Lambda 使用執行角色自串流讀取記錄。接著,它調用您的 Lambda 函數,以記錄批次來傳遞。函數會解碼每個記錄的資料並將其記錄下來,並將輸出傳送至 CloudWatch Logs。在 CloudWatch 主控台中檢視日誌。
清除您的資源
除非您想要保留為此教學課程建立的資源,否則您現在便可刪除。透過刪除不再使用 AWS 的資源,您可以避免不必要的 費用 AWS 帳戶。
刪除執行角色
-
開啟IAM主控台的角色頁面。
-
選取您建立的執行角色。
-
選擇 刪除 。
-
在文字輸入欄位中輸入角色的名稱,然後選擇 刪除 。
若要刪除 Lambda 函數
-
開啟 Lambda 主控台中的 函數頁面。
-
選擇您建立的函數。
-
選擇 Actions (動作)、Delete (刪除)。
-
在文字輸入欄位中輸入 delete
,然後選擇 刪除 。