Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Tutorial: Menggunakan Lambda dengan Kinesis Data Streams
Dalam tutorial ini, Anda membuat fungsi Lambda untuk menggunakan kejadian dari aliran data Amazon Kinesis.
-
Aplikasi kustom menulis rekaman ke aliran.
-
AWS Lambda melakukan polling terhadap aliran dan memanggil fungsi Lambda Anda ketika mendeteksi rekaman baru di aliran.
-
AWS Lambda menjalankan fungsi Lambda dengan mengasumsikan peran eksekusi yang Anda tetapkan pada saat membuat fungsi Lambda.
Prasyarat
Tutorial ini mengasumsikan bahwa Anda memiliki pengetahuan tentang operasi Lambda dan konsol Lambda dasar. Jika belum, ikuti petunjuk di Membuat fungsi Lambda dengan konsol untuk membuat fungsi Lambda pertama Anda.
Untuk menyelesaikan langkah-langkah berikut, Anda memerlukan AWS CLI versi 2. Perintah dan output yang diharapkan dicantumkan dalam blok terpisah:
aws --version
Anda akan melihat output berikut:
aws-cli/2.13.27 Python/3.11.6 Linux/4.14.328-248.540.amzn2.x86_64 exe/x86_64.amzn.2
Untuk perintah panjang, karakter escape (\
) digunakan untuk memisahkan perintah menjadi beberapa baris.
Di Linux dan macOS, gunakan shell dan manajer paket pilihan Anda.
Di Windows, beberapa CLI perintah Bash yang biasa Anda gunakan dengan Lambda (zip
seperti) tidak didukung oleh terminal bawaan sistem operasi. Untuk mendapatkan Ubuntu dan Bash versi terintegrasi Windows, instal Windows Subsystem for Linux. Contoh CLI perintah dalam panduan ini menggunakan pemformatan Linux. Perintah yang menyertakan JSON dokumen inline harus diformat ulang jika Anda menggunakan Windows. CLI
Buat peran eksekusi
Buat peran eksekusi yang memberikan izin kepada fungsi Anda untuk mengakses AWS sumber daya.
Untuk membuat peran eksekusi
-
Buka halaman peran di IAM konsol.
-
Pilih Buat peran.
-
Buat peran dengan properti berikut.
-
Entitas tepercaya – AWS Lambda.
-
Izin — AWSLambdaKinesisExecutionRole.
-
Nama peran – lambda-kinesis-role
.
AWSLambdaKinesisExecutionRoleKebijakan memiliki izin yang diperlukan oleh fungsi untuk membaca item dari Kinesis dan menulis log CloudWatch ke Logs.
Buat fungsi
Buat fungsi Lambda yang memproses pesan Kinesis Anda. Kode fungsi mencatat ID peristiwa dan data peristiwa dari catatan Kinesis ke CloudWatch Log.
Tutorial ini menggunakan runtime Node.js 18.x, tetapi kami juga menyediakan kode contoh dalam bahasa runtime lainnya. Anda dapat memilih tab di kotak berikut untuk melihat kode runtime yang Anda minati. JavaScript Kode yang akan Anda gunakan dalam langkah ini adalah pada contoh pertama yang ditunjukkan di JavaScripttab.
- .NET
-
- AWS SDK for .NET
-
Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori contoh Nirserver.
Mengkonsumsi acara Kinesis dengan menggunakan Lambda. NET.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace KinesisIntegrationSampleCode;
public class Function
{
// Powertools Logger requires an environment variables against your function
// POWERTOOLS_SERVICE_NAME
[Logging(LogEvent = true)]
public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context)
{
if (evnt.Records.Count == 0)
{
Logger.LogInformation("Empty Kinesis Event received");
return;
}
foreach (var record in evnt.Records)
{
try
{
Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
string data = await GetRecordDataAsync(record.Kinesis, context);
Logger.LogInformation($"Data: {data}");
// TODO: Do interesting work based on the new data
}
catch (Exception ex)
{
Logger.LogError($"An error occurred {ex.Message}");
throw;
}
}
Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
}
private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
{
byte[] bytes = record.Data.ToArray();
string data = Encoding.UTF8.GetString(bytes);
await Task.CompletedTask; //Placeholder for actual async work
return data;
}
}
- Go
-
- SDKuntuk Go V2
-
Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori contoh Nirserver.
Mengkonsumsi acara Kinesis dengan Lambda menggunakan Go.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main
import (
"context"
"log"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
)
func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error {
if len(kinesisEvent.Records) == 0 {
log.Printf("empty Kinesis event received")
return nil
}
for _, record := range kinesisEvent.Records {
log.Printf("processed Kinesis event with EventId: %v", record.EventID)
recordDataBytes := record.Kinesis.Data
recordDataText := string(recordDataBytes)
log.Printf("record data: %v", recordDataText)
// TODO: Do interesting work based on the new data
}
log.Printf("successfully processed %v records", len(kinesisEvent.Records))
return nil
}
func main() {
lambda.Start(handler)
}
- Java
-
- SDKuntuk Java 2.x
-
Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori contoh Nirserver.
Mengkonsumsi acara Kinesis dengan Lambda menggunakan Java.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package example;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
public class Handler implements RequestHandler<KinesisEvent, Void> {
@Override
public Void handleRequest(final KinesisEvent event, final Context context) {
LambdaLogger logger = context.getLogger();
if (event.getRecords().isEmpty()) {
logger.log("Empty Kinesis Event received");
return null;
}
for (KinesisEvent.KinesisEventRecord record : event.getRecords()) {
try {
logger.log("Processed Event with EventId: "+record.getEventID());
String data = new String(record.getKinesis().getData().array());
logger.log("Data:"+ data);
// TODO: Do interesting work based on the new data
}
catch (Exception ex) {
logger.log("An error occurred:"+ex.getMessage());
throw ex;
}
}
logger.log("Successfully processed:"+event.getRecords().size()+" records");
return null;
}
}
- JavaScript
-
- SDKuntuk JavaScript (v3)
-
Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori contoh Nirserver.
Mengkonsumsi acara Kinesis dengan menggunakan Lambda. JavaScript
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
for (const record of event.Records) {
try {
console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
const recordData = await getRecordDataAsync(record.kinesis);
console.log(`Record Data: ${recordData}`);
// TODO: Do interesting work based on the new data
} catch (err) {
console.error(`An error occurred ${err}`);
throw err;
}
}
console.log(`Successfully processed ${event.Records.length} records.`);
};
async function getRecordDataAsync(payload) {
var data = Buffer.from(payload.data, "base64").toString("utf-8");
await Promise.resolve(1); //Placeholder for actual async work
return data;
}
Mengkonsumsi acara Kinesis dengan menggunakan Lambda. TypeScript
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
KinesisStreamEvent,
Context,
KinesisStreamHandler,
KinesisStreamRecordPayload,
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";
const logger = new Logger({
logLevel: "INFO",
serviceName: "kinesis-stream-handler-sample",
});
export const functionHandler: KinesisStreamHandler = async (
event: KinesisStreamEvent,
context: Context
): Promise<void> => {
for (const record of event.Records) {
try {
logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
const recordData = await getRecordDataAsync(record.kinesis);
logger.info(`Record Data: ${recordData}`);
// TODO: Do interesting work based on the new data
} catch (err) {
logger.error(`An error occurred ${err}`);
throw err;
}
logger.info(`Successfully processed ${event.Records.length} records.`);
}
};
async function getRecordDataAsync(
payload: KinesisStreamRecordPayload
): Promise<string> {
var data = Buffer.from(payload.data, "base64").toString("utf-8");
await Promise.resolve(1); //Placeholder for actual async work
return data;
}
- PHP
-
- SDKuntuk PHP
-
Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori contoh Nirserver.
Mengkonsumsi acara Kinesis dengan menggunakan Lambda. PHP
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php
# using bref/bref and bref/logger for simplicity
use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Kinesis\KinesisHandler;
use Bref\Logger\StderrLogger;
require __DIR__ . '/vendor/autoload.php';
class Handler extends KinesisHandler
{
private StderrLogger $logger;
public function __construct(StderrLogger $logger)
{
$this->logger = $logger;
}
/**
* @throws JsonException
* @throws \Bref\Event\InvalidLambdaEvent
*/
public function handleKinesis(KinesisEvent $event, Context $context): void
{
$this->logger->info("Processing records");
$records = $event->getRecords();
foreach ($records as $record) {
$data = $record->getData();
$this->logger->info(json_encode($data));
// TODO: Do interesting work based on the new data
// Any exception thrown will be logged and the invocation will be marked as failed
}
$totalRecords = count($records);
$this->logger->info("Successfully processed $totalRecords records");
}
}
$logger = new StderrLogger();
return new Handler($logger);
- Python
-
- SDKfor Python (Boto3)
-
Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori contoh Nirserver.
Mengkonsumsi acara Kinesis dengan Lambda menggunakan Python.
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import base64
def lambda_handler(event, context):
for record in event['Records']:
try:
print(f"Processed Kinesis Event - EventID: {record['eventID']}")
record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8')
print(f"Record Data: {record_data}")
# TODO: Do interesting work based on the new data
except Exception as e:
print(f"An error occurred {e}")
raise e
print(f"Successfully processed {len(event['Records'])} records.")
- Ruby
-
- SDKfor Ruby
-
Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori contoh Nirserver.
Mengkonsumsi acara Kinesis dengan Lambda menggunakan Ruby.
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'
def lambda_handler(event:, context:)
event['Records'].each do |record|
begin
puts "Processed Kinesis Event - EventID: #{record['eventID']}"
record_data = get_record_data_async(record['kinesis'])
puts "Record Data: #{record_data}"
# TODO: Do interesting work based on the new data
rescue => err
$stderr.puts "An error occurred #{err}"
raise err
end
end
puts "Successfully processed #{event['Records'].length} records."
end
def get_record_data_async(payload)
data = Base64.decode64(payload['data']).force_encoding('UTF-8')
# Placeholder for actual async work
# You can use Ruby's asynchronous programming tools like async/await or fibers here.
return data
end
- Rust
-
- SDKuntuk Rust
-
Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori contoh Nirserver.
Mengkonsumsi acara Kinesis dengan Lambda menggunakan Rust.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::event::kinesis::KinesisEvent;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> {
if event.payload.records.is_empty() {
tracing::info!("No records found. Exiting.");
return Ok(());
}
event.payload.records.iter().for_each(|record| {
tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default());
let record_data = std::str::from_utf8(&record.kinesis.data);
match record_data {
Ok(data) => {
// log the record data
tracing::info!("Data: {}", data);
}
Err(e) => {
tracing::error!("Error: {}", e);
}
}
});
tracing::info!(
"Successfully processed {} records",
event.payload.records.len()
);
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();
run(service_fn(function_handler)).await
}
Untuk membuat fungsi
-
Buat direktori untuk proyek, dan kemudian beralih ke direktori itu.
mkdir kinesis-tutorial
cd kinesis-tutorial
-
Salin JavaScript kode sampel ke file baru bernamaindex.js
.
-
Buat paket deployment.
zip function.zip index.js
-
Buat fungsi Lambda dengan perintah create-function
.
aws lambda create-function --function-name ProcessKinesisRecords \
--zip-file fileb://function.zip --handler index.handler --runtime nodejs18.x \
--role arn:aws:iam::111122223333
:role/lambda-kinesis-role
Tes fungsi Lambda
Panggil fungsi Lambda Anda secara manual dengan menggunakan perintah dan invoke
AWS Lambda CLI kejadian Kinesis sampel berikut.
Untuk menguji fungsi Lambda
-
Salin berikut ini JSON ke dalam file dan simpan sebagaiinput.txt
.
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
"approximateArrivalTimestamp": 1545084650.987
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role",
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream"
}
]
}
-
Gunakan perintah invoke
untuk mengirim kejadian ke fungsi.
aws lambda invoke --function-name ProcessKinesisRecords \
--cli-binary-format raw-in-base64-out \
--payload file://input.txt outputfile.txt
cli-binary-formatOpsi ini diperlukan jika Anda menggunakan AWS CLI versi 2. Untuk menjadikan ini pengaturan default, jalankanaws configure set cli-binary-format raw-in-base64-out
. Untuk informasi selengkapnya, lihat opsi baris perintah global yang AWS CLI didukung di Panduan AWS Command Line Interface Pengguna untuk Versi 2.
Respons disimpan ke out.txt
.
Gunakan perintah create-stream
untuk membuat aliran.
aws kinesis create-stream --stream-name lambda-stream --shard-count 1
Jalankan describe-stream
perintah berikut untuk mendapatkan aliranARN.
aws kinesis describe-stream --stream-name lambda-stream
Anda akan melihat output berikut:
{
"StreamDescription": {
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920746074317682119384634633455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610"
}
}
],
"StreamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream",
"StreamName": "lambda-stream",
"StreamStatus": "ACTIVE",
"RetentionPeriodHours": 24,
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"EncryptionType": "NONE",
"KeyId": null,
"StreamCreationTimestamp": 1544828156.0
}
}
Anda menggunakan aliran ARN di langkah berikutnya untuk Anda mengasosiasikan aliran dengan fungsi Lambda Anda.
Jalankan perintah AWS CLI add-event-source
berikut.
aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream \
--batch-size 100 --starting-position LATEST
Catat ID pemetaan untuk penggunaan selanjutnya. Anda bisa mendapatkan daftar pemetaan sumber kejadian dengan menjalankan perintah list-event-source-mappings
.
aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream
Dalam respons, Anda dapat memverifikasi nilai status adalah enabled
. Pemetaan sumber peristiwa dapat dinonaktifkan untuk menjeda polling sementara tanpa kehilangan catatan.
Untuk menguji pemetaan sumber kejadian, tambahkan rekaman kejadian ke aliran Kinesis Anda. --data
Nilai adalah string yang CLI dienkodekan ke base64 sebelum mengirimkannya ke Kinesis. Anda dapat menjalankan perintah yang sama lebih dari satu kali untuk menambahkan beberapa rekaman ke aliran.
aws kinesis put-record --stream-name lambda-stream --partition-key 1 \
--data "Hello, this is a test."
Lambda menggunakan peran eksekusi untuk membaca rekaman dari aliran. Kemudian, itu akan memanggil fungsi Lambda Anda, dengan mengirimkan batch rekaman. Fungsi mendekodekan data dari setiap rekaman dan memasukkannya ke log, dengan mengirimkan output ke CloudWatch Logs. Lihat log di konsol CloudWatch .
Bersihkan sumber daya Anda
Sekarang Anda dapat menghapus sumber daya yang Anda buat untuk tutorial ini, kecuali Anda ingin mempertahankannya. Dengan menghapus AWS sumber daya yang tidak lagi Anda gunakan, Anda mencegah biaya yang tidak perlu untuk Anda Akun AWS.
Untuk menghapus peran eksekusi
-
Buka halaman Peran IAM konsol.
-
Pilih peran eksekusi yang Anda buat.
-
Pilih Hapus.
-
Masukkan nama peran di kolom input teks dan pilih Hapus.
Untuk menghapus fungsi Lambda
-
Buka halaman Fungsi di konsol Lambda.
-
Pilih fungsi yang Anda buat.
-
Pilih Tindakan, Hapus.
-
Ketik delete
kolom input teks dan pilih Hapus.