教學課程:搭配 Kinesis Data Streams 使用 Lambda
在本教學課程中,您會建立 Lambda 函數,以取用 Amazon Kinesis 資料串流中的事件。
AWS Lambda 會輪詢串流,並在偵測到串流中的新記錄時,叫用您的 Lambda 函數。
AWS Lambda 會執行 Lambda 函數,方法是假設您在建立 Lambda 函數時指定的執行角色。
如果您尚未安裝 AWS Command Line Interface,請依照安裝或更新最新版本 AWS CLI中的步驟進行安裝。
本教學課程需使用命令列終端機或 Shell 來執行命令。在 Linux 和 macOS 中,使用您偏好的 Shell 和套件管理工具。
在 Windows 中,作業系統的內建終端不支援您常與 Lambda 搭配使用的某些 Bash CLI 命令 (例如 zip
)。若要取得 Ubuntu 和 Bash 的 Windows 整合版本,請安裝適用於 Linux 的 Windows 子系統。
建立 執行角色,讓您的 函數存取 AWS 資源。
在 IAM 主控台中開啟角色頁面。
AWSLambdaKinesisExecutionRole 政策具備函數自 Kinesis 讀取項目以及寫入日誌到 CloudWatch Logs 時所需的許可。
建立可處理 Kinesis 訊息的 Lambda 函數。函數程式碼會將 Kinesis 記錄的事件 ID 和事件資料記錄到 CloudWatch Logs 中。
本教學課程使用 Node.js 18.x 執行期,但我們也有提供其他執行期語言的範例程式碼。您可以在下列方塊中選取索引標籤,查看您感興趣的執行期程式碼。此步驟要使用的 JavaScript 程式碼,位於 JavaScript 索引標籤中顯示的第一個範例。
- .NET
- AWS SDK for .NET
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 .NET 搭配 Lambda 來使用 Kinesis 事件。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace KinesisIntegrationSampleCode;
public class Function
// Powertools Logger requires an environment variables against your function
[Logging(LogEvent = true)]
public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context)
if (evnt.Records.Count == 0)
Logger.LogInformation("Empty Kinesis Event received");
foreach (var record in evnt.Records)
Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
string data = await GetRecordDataAsync(record.Kinesis, context);
Logger.LogInformation($"Data: {data}");
// TODO: Do interesting work based on the new data
catch (Exception ex)
Logger.LogError($"An error occurred {ex.Message}");
Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
byte[] bytes = record.Data.ToArray();
string data = Encoding.UTF8.GetString(bytes);
await Task.CompletedTask; //Placeholder for actual async work
return data;
- Go
- SDK for Go V2
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 Go 搭配 Lambda 來使用 Kinesis 事件。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main
import (
func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error {
if len(kinesisEvent.Records) == 0 {
log.Printf("empty Kinesis event received")
return nil
for _, record := range kinesisEvent.Records {
log.Printf("processed Kinesis event with EventId: %v", record.EventID)
recordDataBytes := record.Kinesis.Data
recordDataText := string(recordDataBytes)
log.Printf("record data: %v", recordDataText)
// TODO: Do interesting work based on the new data
log.Printf("successfully processed %v records", len(kinesisEvent.Records))
return nil
func main() {
- Java
- SDK for Java 2.x
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 Java 搭配 Lambda 來使用 Kinesis 事件。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package example;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
public class Handler implements RequestHandler<KinesisEvent, Void> {
public Void handleRequest(final KinesisEvent event, final Context context) {
LambdaLogger logger = context.getLogger();
if (event.getRecords().isEmpty()) {
logger.log("Empty Kinesis Event received");
return null;
for (KinesisEvent.KinesisEventRecord record : event.getRecords()) {
try {
logger.log("Processed Event with EventId: "+record.getEventID());
String data = new String(record.getKinesis().getData().array());
logger.log("Data:"+ data);
// TODO: Do interesting work based on the new data
catch (Exception ex) {
logger.log("An error occurred:"+ex.getMessage());
throw ex;
logger.log("Successfully processed:"+event.getRecords().size()+" records");
return null;
- JavaScript
- SDK for JavaScript (v3)
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 JavaScript 搭配 Lambda 來使用 Kinesis 事件。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
for (const record of event.Records) {
try {
console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
const recordData = await getRecordDataAsync(record.kinesis);
console.log(`Record Data: ${recordData}`);
// TODO: Do interesting work based on the new data
} catch (err) {
console.error(`An error occurred ${err}`);
throw err;
console.log(`Successfully processed ${event.Records.length} records.`);
async function getRecordDataAsync(payload) {
var data = Buffer.from(payload.data, "base64").toString("utf-8");
await Promise.resolve(1); //Placeholder for actual async work
return data;
使用 TypeScript 搭配 Lambda 來使用 Kinesis 事件。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";
const logger = new Logger({
logLevel: "INFO",
serviceName: "kinesis-stream-handler-sample",
export const functionHandler: KinesisStreamHandler = async (
event: KinesisStreamEvent,
context: Context
): Promise<void> => {
for (const record of event.Records) {
try {
logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
const recordData = await getRecordDataAsync(record.kinesis);
logger.info(`Record Data: ${recordData}`);
// TODO: Do interesting work based on the new data
} catch (err) {
logger.error(`An error occurred ${err}`);
throw err;
logger.info(`Successfully processed ${event.Records.length} records.`);
async function getRecordDataAsync(
payload: KinesisStreamRecordPayload
): Promise<string> {
var data = Buffer.from(payload.data, "base64").toString("utf-8");
await Promise.resolve(1); //Placeholder for actual async work
return data;
- SDK for PHP
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 PHP 搭配 Lambda 來使用 Kinesis 事件。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
# using bref/bref and bref/logger for simplicity
use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Kinesis\KinesisHandler;
use Bref\Logger\StderrLogger;
require __DIR__ . '/vendor/autoload.php';
class Handler extends KinesisHandler
private StderrLogger $logger;
public function __construct(StderrLogger $logger)
$this->logger = $logger;
* @throws JsonException
* @throws \Bref\Event\InvalidLambdaEvent
public function handleKinesis(KinesisEvent $event, Context $context): void
$this->logger->info("Processing records");
$records = $event->getRecords();
foreach ($records as $record) {
$data = $record->getData();
// TODO: Do interesting work based on the new data
// Any exception thrown will be logged and the invocation will be marked as failed
$totalRecords = count($records);
$this->logger->info("Successfully processed $totalRecords records");
$logger = new StderrLogger();
return new Handler($logger);
- Python
- SDK for Python (Boto3)
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 Python 搭配 Lambda 來使用 Kinesis 事件。
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import base64
def lambda_handler(event, context):
for record in event['Records']:
print(f"Processed Kinesis Event - EventID: {record['eventID']}")
record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8')
print(f"Record Data: {record_data}")
# TODO: Do interesting work based on the new data
except Exception as e:
print(f"An error occurred {e}")
raise e
print(f"Successfully processed {len(event['Records'])} records.")
- Ruby
- SDK for Ruby
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 Ruby 搭配 Lambda 來使用 Kinesis 事件。
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'
def lambda_handler(event:, context:)
event['Records'].each do |record|
puts "Processed Kinesis Event - EventID: #{record['eventID']}"
record_data = get_record_data_async(record['kinesis'])
puts "Record Data: #{record_data}"
# TODO: Do interesting work based on the new data
rescue => err
$stderr.puts "An error occurred #{err}"
raise err
puts "Successfully processed #{event['Records'].length} records."
def get_record_data_async(payload)
data = Base64.decode64(payload['data']).force_encoding('UTF-8')
# Placeholder for actual async work
# You can use Ruby's asynchronous programming tools like async/await or fibers here.
return data
- Rust
- SDK for Rust
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 Rust 搭配 Lambda 來使用 Kinesis 事件。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::event::kinesis::KinesisEvent;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> {
if event.payload.records.is_empty() {
tracing::info!("No records found. Exiting.");
return Ok(());
event.payload.records.iter().for_each(|record| {
tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default());
let record_data = std::str::from_utf8(&record.kinesis.data);
match record_data {
Ok(data) => {
// log the record data
tracing::info!("Data: {}", data);
Err(e) => {
tracing::error!("Error: {}", e);
"Successfully processed {} records",
async fn main() -> Result<(), Error> {
// disable printing the name of the module in every log line.
// disabling time is handy because CloudWatch will add the ingestion time.
mkdir kinesis-tutorial
cd kinesis-tutorial
將範例 JavaScript 程式碼複製到名為 index.js
zip function.zip index.js
使用 create-function
命令建立一個 Lambda 函數。
aws lambda create-function --function-name ProcessKinesisRecords \
--zip-file fileb://function.zip --handler index.handler --runtime nodejs18.x \
--role arn:aws:iam::111122223333
測試 Lambda 函數
使用 invoke
AWS Lambda CLI 命令和範例 Kinesis 事件手動叫用 Lambda 函數。
測試 Lambda 函數
將以下 JSON 複製到一個檔案中並儲存為 input.txt
"Records": [
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
"approximateArrivalTimestamp": 1545084650.987
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role",
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream"
使用 invoke
aws lambda invoke --function-name ProcessKinesisRecords \
--cli-binary-format raw-in-base64-out \
--payload file://input.txt outputfile.txt
如果您使用的是第 2 AWS CLI 版,則 cli-binary-format選項為必要項目。若要讓此成為預設的設定,請執行 aws configure set cli-binary-format raw-in-base64-out
。若要取得更多資訊,請參閱《AWS Command Line Interface 使用者指南第 2 版》中 AWS CLI 支援的全域命令列選項。
回應已儲存至 out.txt
使用 create-stream
aws kinesis create-stream --stream-name lambda-stream --shard-count 1
執行下列 describe-stream
命令以取得串流 ARN。
aws kinesis describe-stream --stream-name lambda-stream
"StreamDescription": {
"Shards": [
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920746074317682119384634633455"
"SequenceNumberRange": {
"StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610"
"StreamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream",
"StreamName": "lambda-stream",
"StreamStatus": "ACTIVE",
"RetentionPeriodHours": 24,
"EnhancedMonitoring": [
"ShardLevelMetrics": []
"EncryptionType": "NONE",
"KeyId": null,
"StreamCreationTimestamp": 1544828156.0
在下一個步驟中,您會使用串流 ARN 與您的 Lambda 函數的串流建立關聯。
執行下列 AWS CLI add-event-source
aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream \
--batch-size 100 --starting-position LATEST
記下映射 ID 以供後續使用。您可以執行 list-event-source-mappings
aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream
在回應中,您可以確認狀態值為 enabled
若要測試事件來源映射,請新增事件記錄到您的 Kinesis 串流。--data
值是一個字串,CLI 會先將該字串編碼為 base64,再將它傳送到 Kinesis。您可以執行相同命令一次以上,以新增多筆記錄到串流中。
aws kinesis put-record --stream-name lambda-stream --partition-key 1 \
--data "Hello, this is a test."
Lambda 使用執行角色自串流讀取記錄。接著,它調用您的 Lambda 函數,以記錄批次來傳遞。函數會解碼來自每個記錄的資料並加以記錄,將輸出傳送至 CloudWatch Logs。在 CloudWatch 主控台中檢視日誌。
除非您想要保留為此教學課程建立的資源,否則您現在便可刪除。透過刪除不再使用 AWS 的資源,您可以避免不必要的費用 AWS 帳戶。
開啟 IAM 主控台中的 角色頁面 。
在文字輸入欄位中輸入角色的名稱,然後選擇 刪除 。
若要刪除 Lambda 函數
開啟 Lambda 主控台中的 函數頁面。
選擇 Actions (動作)、Delete (刪除)。
在文字輸入欄位中輸入 confirm
,然後選擇 刪除 。