從 Amazon MSK 觸發程序叫用 Lambda 函數 - AWS SDK 程式碼範例

文件 AWS SDK AWS 範例 SDK 儲存庫中有更多可用的 GitHub 範例。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

從 Amazon MSK 觸發程序叫用 Lambda 函數

下列程式碼範例示範如何實作 Lambda 函數,該函數接收從 Amazon MSK 叢集接收記錄所觸發的事件。函數會擷取 MSK 承載並記錄記錄內容。

.NET
AWS SDK for .NET
注意

還有更多 on GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 .MSK 搭配 Lambda 使用 Amazon NET 事件。

using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.KafkaEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace MSKLambda; public class Function { /// <param name="input">The event for the Lambda function handler to process.</param> /// <param name="context">The ILambdaContext that provides methods for logging and describing the Lambda environment.</param> /// <returns></returns> public void FunctionHandler(KafkaEvent evnt, ILambdaContext context) { foreach (var record in evnt.Records) { Console.WriteLine("Key:" + record.Key); foreach (var eventRecord in record.Value) { var valueBytes = eventRecord.Value.ToArray(); var valueText = Encoding.UTF8.GetString(valueBytes); Console.WriteLine("Message:" + valueText); } } } }
Go
SDK for Go V2
注意

還有更多 on GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Go 搭配 Lambda 使用 Amazon MSK 事件。

package main import ( "encoding/base64" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(event events.KafkaEvent) { for key, records := range event.Records { fmt.Println("Key:", key) for _, record := range records { fmt.Println("Record:", record) decodedValue, _ := base64.StdEncoding.DecodeString(record.Value) message := string(decodedValue) fmt.Println("Message:", message) } } } func main() { lambda.Start(handler) }
Java
Java 2.x 的 SDK
注意

還有更多 on GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Java 搭配 Lambda 使用 Amazon MSK 事件。

import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KafkaEvent; import com.amazonaws.services.lambda.runtime.events.KafkaEvent.KafkaEventRecord; import java.util.Base64; import java.util.Map; public class Example implements RequestHandler<KafkaEvent, Void> { @Override public Void handleRequest(KafkaEvent event, Context context) { for (Map.Entry<String, java.util.List<KafkaEventRecord>> entry : event.getRecords().entrySet()) { String key = entry.getKey(); System.out.println("Key: " + key); for (KafkaEventRecord record : entry.getValue()) { System.out.println("Record: " + record); byte[] value = Base64.getDecoder().decode(record.getValue()); String message = new String(value); System.out.println("Message: " + message); } } return null; } }
JavaScript
SDK for JavaScript (v3)
注意

還有更多 on GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Lambda usingMSK 使用 Amazon JavaScript 事件。

exports.handler = async (event) => { // Iterate through keys for (let key in event.records) { console.log('Key: ', key) // Iterate through records event.records[key].map((record) => { console.log('Record: ', record) // Decode base64 const msg = Buffer.from(record.value, 'base64').toString() console.log('Message:', msg) }) } }
PHP
適用於 PHP 的 SDK
注意

還有更多 on GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 MSK 搭配 Lambda 使用 Amazon PHP 事件。

<?php // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 // using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\Kafka\KafkaEvent; use Bref\Event\Handler as StdHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler implements StdHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handle(mixed $event, Context $context): void { $kafkaEvent = new KafkaEvent($event); $this->logger->info("Processing records"); $records = $kafkaEvent->getRecords(); foreach ($records as $record) { try { $key = $record->getKey(); $this->logger->info("Key: $key"); $values = $record->getValue(); $this->logger->info(json_encode($values)); foreach ($values as $value) { $this->logger->info("Value: $value"); } } catch (Exception $e) { $this->logger->error($e->getMessage()); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords records"); } } $logger = new StderrLogger(); return new Handler($logger);
Python
Python 的 SDK (Boto3)
注意

還有更多 on GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Python 搭配 Lambda 使用 Amazon MSK 事件。

import base64 def lambda_handler(event, context): # Iterate through keys for key in event['records']: print('Key:', key) # Iterate through records for record in event['records'][key]: print('Record:', record) # Decode base64 msg = base64.b64decode(record['value']).decode('utf-8') print('Message:', msg)
Ruby
Ruby 的 SDK
注意

還有更多 on GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Ruby 搭配 Lambda 使用 Amazon MSK 事件。

require 'base64' def lambda_handler(event:, context:) # Iterate through keys event['records'].each do |key, records| puts "Key: #{key}" # Iterate through records records.each do |record| puts "Record: #{record}" # Decode base64 msg = Base64.decode64(record['value']) puts "Message: #{msg}" end end end