

文档 AWS SDK 示例 GitHub 存储库中还有更多 [S AWS DK 示例](https://github.com/awsdocs/aws-doc-sdk-examples)。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 通过 Amazon MSK 触发器调用 Lambda 函数
<a name="lambda_example_serverless_MSK_Lambda_section"></a>

以下代码示例展示了如何实现 Lambda 函数，该函数接收通过接收来自 Amazon MSK 集群的记录而触发的事件。该函数检索 MSK 有效负载，并记录下记录内容。

------
#### [ .NET ]

**适用于 .NET 的 SDK**  
 还有更多相关信息 GitHub。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
通过 .NET 将 Amazon MSK 事件与 Lambda 结合使用。  

```
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KafkaEvents;


// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace MSKLambda;

public class Function
{
    
    
    /// <param name="input">The event for the Lambda function handler to process.</param>
    /// <param name="context">The ILambdaContext that provides methods for logging and describing the Lambda environment.</param>
    /// <returns></returns>
    public void FunctionHandler(KafkaEvent evnt, ILambdaContext context)
    {

        foreach (var record in evnt.Records)
        {
            Console.WriteLine("Key:" + record.Key); 
            foreach (var eventRecord in record.Value)
            {
                var valueBytes = eventRecord.Value.ToArray();    
                var valueText = Encoding.UTF8.GetString(valueBytes);
                
                Console.WriteLine("Message:" + valueText);
            }
        }
    }
    

}
```

------
#### [ Go ]

**适用于 Go 的 SDK V2**  
 还有更多相关信息 GitHub。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
通过 Go 将 Amazon MSK 事件与 Lambda 结合使用。  

```
package main

import (
	"encoding/base64"
	"fmt"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(event events.KafkaEvent) {
	for key, records := range event.Records {
		fmt.Println("Key:", key)

		for _, record := range records {
			fmt.Println("Record:", record)

			decodedValue, _ := base64.StdEncoding.DecodeString(record.Value)
			message := string(decodedValue)
			fmt.Println("Message:", message)
		}
	}
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**适用于 Java 的 SDK 2.x**  
 还有更多相关信息 GitHub。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
通过 Java 将 Amazon MSK 事件与 Lambda 结合使用。  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KafkaEvent;
import com.amazonaws.services.lambda.runtime.events.KafkaEvent.KafkaEventRecord;

import java.util.Base64;
import java.util.Map;

public class Example implements RequestHandler<KafkaEvent, Void> {

    @Override
    public Void handleRequest(KafkaEvent event, Context context) {
        for (Map.Entry<String, java.util.List<KafkaEventRecord>> entry : event.getRecords().entrySet()) {
            String key = entry.getKey();
            System.out.println("Key: " + key);

            for (KafkaEventRecord record : entry.getValue()) {
                System.out.println("Record: " + record);

                byte[] value = Base64.getDecoder().decode(record.getValue());
                String message = new String(value);
                System.out.println("Message: " + message);
            }
        }

        return null;
    }
}
```

------
#### [ JavaScript ]

**适用于 JavaScript (v3) 的软件开发工具包**  
 还有更多相关信息 GitHub。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 Lambda JavaScript 使用亚马逊 MSK 事件。  

```
exports.handler = async (event) => {
    // Iterate through keys
    for (let key in event.records) {
      console.log('Key: ', key)
      // Iterate through records
      event.records[key].map((record) => {
        console.log('Record: ', record)
        // Decode base64
        const msg = Buffer.from(record.value, 'base64').toString()
        console.log('Message:', msg)
      }) 
    }
}
```
使用 Lambda TypeScript 使用亚马逊 MSK 事件。  

```
import { MSKEvent, Context } from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "msk-handler-sample",
});

export const handler = async (
  event: MSKEvent,
  context: Context
): Promise<void> => {
  for (const [topic, topicRecords] of Object.entries(event.records)) {
    logger.info(`Processing key: ${topic}`);

    // Process each record in the partition
    for (const record of topicRecords) {
      try {
        // Decode the message value from base64
        const decodedMessage = Buffer.from(record.value, 'base64').toString();

        logger.info({
          message: decodedMessage
        });
      }
      catch (error) {
        logger.error('Error processing event', { error });
        throw error;
      }
    };
  }
}
```

------
#### [ PHP ]

**适用于 PHP 的 SDK**  
 还有更多相关信息 GitHub。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
通过 PHP 将 Amazon MSK 事件与 Lambda 结合使用。  

```
<?php
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

// using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\Kafka\KafkaEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler implements StdHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handle(mixed $event, Context $context): void
    {
        $kafkaEvent = new KafkaEvent($event);
        $this->logger->info("Processing records");
        $records = $kafkaEvent->getRecords();

        foreach ($records as $record) {
            try {
                $key = $record->getKey();
                $this->logger->info("Key: $key");

                $values = $record->getValue();
                $this->logger->info(json_encode($values));

                foreach ($values as $value) {
                    $this->logger->info("Value: $value");
                }
                
            } catch (Exception $e) {
                $this->logger->error($e->getMessage());
            }
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
通过 Python 将 Amazon MSK 事件与 Lambda 结合使用。  

```
import base64

def lambda_handler(event, context):
    # Iterate through keys
    for key in event['records']:
        print('Key:', key)
        # Iterate through records
        for record in event['records'][key]:
            print('Record:', record)
            # Decode base64
            msg = base64.b64decode(record['value']).decode('utf-8')
            print('Message:', msg)
```

------
#### [ Ruby ]

**适用于 Ruby 的 SDK**  
 还有更多相关信息 GitHub。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
通过 Ruby 将 Amazon MSK 事件与 Lambda 结合使用。  

```
require 'base64'

def lambda_handler(event:, context:)
  # Iterate through keys
  event['records'].each do |key, records|
    puts "Key: #{key}"

    # Iterate through records
    records.each do |record|
      puts "Record: #{record}"

      # Decode base64
      msg = Base64.decode64(record['value'])
      puts "Message: #{msg}"
    end
  end
end
```

------
#### [ Rust ]

**适用于 Rust 的 SDK**  
 还有更多相关信息 GitHub。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
通过 Rust 将 Amazon MSK 事件与 Lambda 结合使用。  

```
use aws_lambda_events::event::kafka::KafkaEvent;
use lambda_runtime::{run, service_fn, tracing, Error, LambdaEvent};
use base64::prelude::*;
use serde_json::{Value};
use tracing::{info};

/// Pre-Requisites:
/// 1. Install Cargo Lambda - see https://www.cargo-lambda.info/guide/getting-started.html
/// 2. Add packages tracing, tracing-subscriber, serde_json, base64
///
/// This is the main body for the function.
/// Write your code inside it.
/// There are some code example in the following URLs:
/// - https://github.com/awslabs/aws-lambda-rust-runtime/tree/main/examples
/// - https://github.com/aws-samples/serverless-rust-demo/

async fn function_handler(event: LambdaEvent<KafkaEvent>) -> Result<Value, Error> {

    let payload = event.payload.records;

    for (_name, records) in payload.iter() {

        for record in records {

         let record_text = record.value.as_ref().ok_or("Value is None")?;
         info!("Record: {}", &record_text);

         // perform Base64 decoding
         let record_bytes = BASE64_STANDARD.decode(record_text)?;
         let message = std::str::from_utf8(&record_bytes)?;
         
         info!("Message: {}", message);
        }

    }

    Ok(().into())
}

#[tokio::main]
async fn main() -> Result<(), Error> {

    // required to enable CloudWatch error logging by the runtime
    tracing::init_default_subscriber();
    info!("Setup CW subscriber!");

    run(service_fn(function_handler)).await
}
```

------