

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# SelfManagedKafka
<a name="sam-property-function-selfmanagedkafka"></a>

描述 `SelfManagedKafka` 事件源类型的对象。*有关更多信息，请参阅《开发人员指南》中的[AWS Lambda 与自行管理的 Apache Kafka 配合使用](https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html)。AWS Lambda *

AWS Serverless Application Model (AWS SAM) 在设置此事件类型时生成[https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html)资源。

要使用架构注册表，您需要为函数定义特定的 IAM 角色权限。有关所需配置的示例，请参阅[使用 IAM 角色完成设置](sam-property-function-msk.md#sam-property-function-msk-example-complete)。

## 语法
<a name="sam-property-function-selfmanagedkafka-syntax"></a>

要在 AWS SAM 模板中声明此实体，请使用以下语法。

### YAML
<a name="sam-property-function-selfmanagedkafka-syntax.yaml"></a>

```
  [BatchSize](#sam-function-selfmanagedkafka-batchsize): Integer
  [BisectBatchOnFunctionError](#sam-function-selfmanagedkafka-bisectbatchonfunctionerror): Boolean
  [ConsumerGroupId](#sam-function-selfmanagedkafka-consumergroupid): String
  DestinationConfig: [DestinationConfig](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-destinationconfig.html)
  [Enabled](#sam-function-selfmanagedkafka-enabled): Boolean
  [FilterCriteria](#sam-function-selfmanagedkafka-filtercriteria): [FilterCriteria](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-filtercriteria.html)
  [KafkaBootstrapServers](#sam-function-selfmanagedkafka-kafkabootstrapservers): List
  [FunctionResponseTypes](#sam-function-selfmanagedkafka-functionresponsetypes): List
  KmsKeyArn: String
  [LoggingConfig](#sam-function-selfmanagedkafka-loggingconfig): LoggingConfig
  [MaximumRecordAgeInSeconds](#sam-function-selfmanagedkafka-maximumrecordageinseconds): Integer
  [MaximumRetryAttempts](#sam-function-selfmanagedkafka-maximumretryattempts): Integer
  [MetricsConfig](#sam-function-selfmanagedkafka-metricsconfig): MetricsConfig
  [ProvisionedPollerConfig](#sam-function-selfmanagedkafka-provisionedpollerconfig): [ProvisionedPollerConfig](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-provisionedpollerconfig)
  [SchemaRegistryConfig](#sam-function-selfmanagedkafka-schemaregistryconfig): [SchemaRegistryConfig](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-schemaregistryconfig.html)
  [SourceAccessConfigurations](#sam-function-selfmanagedkafka-sourceaccessconfigurations): [SourceAccessConfigurations](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-sourceaccessconfigurations)
  StartingPosition: String
  StartingPositionTimestamp: Double
  [Topics](#sam-function-selfmanagedkafka-topics): List
```

## Properties
<a name="sam-property-function-selfmanagedkafka-properties"></a>

 `BatchSize`   <a name="sam-function-selfmanagedkafka-batchsize"></a>
Lambda 从流中提取并发送到函数的每个批处理中的最大记录数。  
*类型*：整数  
*必需*：否  
*默认值*：100  
*CloudFormation 兼容性*：此属性直接传递给`AWS::Lambda::EventSourceMapping`资源的`[BatchSize](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-batchsize)`属性。  
*最小值*：`1`  
*最大值*：`10000`

 `BisectBatchOnFunctionError`   <a name="sam-function-selfmanagedkafka-bisectbatchonfunctionerror"></a>
如果函数返回错误，则将批次拆分为两批并重试。  
*类型*：布尔值  
*必需*：否  
*CloudFormation 兼容性*：此属性直接传递给`AWS::Lambda::EventSourceMapping`资源的`[BisectBatchOnFunctionError](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-bisectbatchonfunctionerror)`属性。

 `ConsumerGroupId`   <a name="sam-function-selfmanagedkafka-consumergroupid"></a>
用于配置如何从 Kafka 主题中读取事件的字符串。  
*类型*：字符串  
*必需*：否  
*CloudFormation 兼容性*：此属性直接传递给`AWS::Lambda::EventSourceMapping`资源的`[SelfManagedKafkaEventSourceConfig](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-selfmanagedkafkaeventsourceconfig)`属性。

 `DestinationConfig`   <a name="sam-function-selfmanagedkafka-destinationconfig"></a>
一个配置对象，用于在 Lambda 处理事件后指定事件目的地。  
使用此属性指定来自管理 Kafka 事件源的失败调用的目的地。  
*类型*：[DestinationConfig](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-destinationconfig.html)  
*必需*：否  
*CloudFormation 兼容性*：此属性直接传递给`AWS::Lambda::EventSourceMapping`资源的`[ DestinationConfig](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-destinationconfig)`属性。

 `Enabled`   <a name="sam-function-selfmanagedkafka-enabled"></a>
禁用事件源映射以暂停轮询和调用。  
*类型*：布尔值  
*必需*：否  
*CloudFormation 兼容性*：此属性直接传递给`AWS::Lambda::EventSourceMapping`资源的`[Enabled](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-enabled)`属性。

 `FilterCriteria`   <a name="sam-function-selfmanagedkafka-filtercriteria"></a>
定义用于确定 Lambda 是否应处理事件的条件的对象。有关更多信息，请参阅《AWS Lambda 开发人员指南》**中的 [AWS Lambda 事件筛选](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html)。  
*类型*：[FilterCriteria](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-filtercriteria.html)  
*必需*：否  
*CloudFormation 兼容性*：此属性直接传递给`AWS::Lambda::EventSourceMapping`资源的`[FilterCriteria](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-filtercriteria)`属性。

 `KafkaBootstrapServers`   <a name="sam-function-selfmanagedkafka-kafkabootstrapservers"></a>
Kafka 代理的引导服务器列表。包括端口，例如 `broker.example.com:xxxx`  
*类型*：列表  
*必需*：否  
*CloudFormation 兼容性*：此属性是独有的 AWS SAM ，没有 CloudFormation 等效属性。

 `FunctionResponseTypes`   <a name="sam-function-selfmanagedkafka-functionresponsetypes"></a>
当前应用于事件源映射的响应类型的列表。有关详细信息，请参阅《AWS Lambda 开发人员指南》**中的[报告批处理项目失败](https://docs.aws.amazon.com/lambda/latest/dg/kafka-retry-configurations.html)。  
*有效值*：`ReportBatchItemFailures`  
*类型*：列表  
*必需*：否  
*CloudFormation 兼容性*：此属性直接传递给`AWS::Lambda::EventSourceMapping`资源的`[FunctionResponseTypes](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-functionresponsetypes)`属性。

 `KmsKeyArn`   <a name="sam-function-selfmanagedkafka-kmskeyarn"></a>
用于加密与此事件相关信息的密钥的 Amazon 资源名称（ARN）。  
*类型*：字符串  
*必需*：否  
*CloudFormation 兼容性*：此属性直接传递给`AWS::Lambda::EventSourceMapping`资源的`[KmsKeyArn](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-kmskeyarn)`属性。

 `LoggingConfig`   <a name="sam-function-selfmanagedkafka-loggingconfig"></a>
您的事件源的日志配置。  
*类型*：[LoggingConfig](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-loggingconfig)  
*必需*：否  
*CloudFormation 兼容性*：此属性直接传递给`AWS::Lambda::EventSourceMapping`资源的`[LoggingConfig](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-loggingconfig)`属性。

 `MaximumRecordAgeInSeconds`   <a name="sam-function-selfmanagedkafka-maximumrecordageinseconds"></a>
Lambda 发送到函数以进行处理的记录的最长期限。  
*类型*：整数  
*必需*：否  
*CloudFormation 兼容性*：此属性直接传递给`AWS::Lambda::EventSourceMapping`资源的`[MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumrecordageinseconds)`属性。

 `MetricsConfig`   <a name="sam-function-selfmanagedkafka-metricsconfig"></a>
您的事件源的指标配置。  
*类型*：[MetricsConfig](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-metricsconfig)  
*必需*：否  
*CloudFormation 兼容性*：此属性直接传递给`AWS::Lambda::EventSourceMapping`资源的`[MetricsConfig](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-metricsconfig)`属性。

 `MaximumRetryAttempts`   <a name="sam-function-selfmanagedkafka-maximumretryattempts"></a>
在函数返回错误时重试的最大次数。  
*类型*：整数  
*必需*：否  
*CloudFormation 兼容性*：此属性直接传递给`AWS::Lambda::EventSourceMapping`资源的`[MaximumRetryAttempts](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumretryattempts)`属性。

 `ProvisionedPollerConfig`   <a name="sam-function-selfmanagedkafka-provisionedpollerconfig"></a>
用于增加计算事件源映射所使用的轮询器数量的配置。此配置允许最少 1 个轮询器和最多 2000 个轮询器。有关具体示例，请参阅 [ProvisionedPollerConfig 示例](#sam-property-function-selfmanagedkafka-example-provisionedpollerconfig)。  
*类型*：[ProvisionedPollerConfig](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-provisionedpollerconfig)  
*必需*：否  
*CloudFormation 兼容性*：此属性直接传递给`AWS::Lambda::EventSourceMapping`资源的`[ProvisionedPollerConfig](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-provisionedpollerconfig)`属性。

`SchemaRegistryConfig`  <a name="sam-function-selfmanagedkafka-schemaregistryconfig"></a>
将架构注册表与自托管 Kafka 事件源配合使用的配置。  
此功能需要配置 `ProvisionedPollerConfig`。
*类型*：[SchemaRegistryConfig](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-schemaregistryconfig)  
*必需*：否  
*CloudFormation 兼容性：*此属性直接传递给`AWS::Lambda::EventSourceMapping`资源的`[SelfManagedKafkaEventSourceConfig](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-selfmanagedkafkaeventsourceconfig)`属性。

 `SourceAccessConfigurations`   <a name="sam-function-selfmanagedkafka-sourceaccessconfigurations"></a>
用于保护与定义事件源的身份验证协议数组 VPC 组件或虚拟化主机。  
*有效值*：`BASIC_AUTH | CLIENT_CERTIFICATE_TLS_AUTH | SASL_SCRAM_256_AUTH | SASL_SCRAM_512_AUTH | SERVER_ROOT_CA_CERTIFICATE`  
*类型*：[SourceAccessConfiguration](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-sourceaccessconfiguration) 列表  
*是否必需*：是  
*CloudFormation 兼容性：*此属性是`AWS::Lambda::EventSourceMapping`资源`[SelfManagedKafkaEventSourceConfig](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-selfmanagedkafkaeventsourceconfig)`属性的一部分。

 `StartingPosition`   <a name="sam-function-selfmanagedkafka-startingposition"></a>
在流中开始读取数据的位置。  
+ `AT_TIMESTAMP` – 指定开始读取记录的时间。
+ `LATEST` - 仅读取新记录。
+ `TRIM_HORIZON` - 处理所有可用的记录。
*有效值*：`AT_TIMESTAMP` \$1 `LATEST` \$1 `TRIM_HORIZON`  
*类型*：字符串  
*必需*：否  
*CloudFormation 兼容性*：此属性直接传递给`AWS::Lambda::EventSourceMapping`资源的`[StartingPosition](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-startingposition)`属性。

 `StartingPositionTimestamp`   <a name="sam-function-selfmanagedkafka-startingpositiontimestamp"></a>
开始读取的时间（以 Unix 时间秒为单位） 在 `StartingPosition` 被指定为 `AT_TIMESTAMP` 的情况下定义 `StartingPositionTimestamp`。  
*类型*：双精度  
*必需*：否  
*CloudFormation 兼容性*：此属性直接传递给`AWS::Lambda::EventSourceMapping`资源的`[StartingPositionTimestamp](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-startingpositiontimestamp)`属性。

 `Topics`   <a name="sam-function-selfmanagedkafka-topics"></a>
Kafka 主题的名称。  
*类型*：列表  
*是否必需*：是  
*CloudFormation 兼容性*：此属性直接传递给`AWS::Lambda::EventSourceMapping`资源的`[Topics](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-topics)`属性。

## 示例
<a name="sam-property-function-selfmanagedkafka--examples"></a>

### 通过 IAM 角色完成设置
<a name="sam-property-function-selfmanagedkafka-example-complete"></a>

以下示例展示了完整的配置，包括使用架构注册表所需的 IAM 角色配置：

```
Parameters:
  PreCreatedSubnetOne:
    Type: String
  PreCreatedSubnetTwo:
    Type: String

Resources:
  MyLambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
        - Action: [sts:AssumeRole]
          Effect: Allow
          Principal:
            Service: [lambda.amazonaws.com]
      Policies:
      - PolicyName: KafkaAuthPolicy
        PolicyDocument:
          Statement:
          - Action: [secretsmanager:GetSecretValue, kms:Decrypt]
            Effect: "Allow"
            Resource: ['arn:aws:secretsmanager:us-west-2:123456789012:secret:kafkaSecret-******',
                        'arn:aws:kms:us-west-2:123456789012:key/keyId']
      - PolicyName: ENIPolicy
        PolicyDocument:
          Statement:
          - Action: [ec2:CreateNetworkInterface,
              ec2:DescribeNetworkInterfaces, ec2:DescribeVpcs, ec2:DeleteNetworkInterface,
              ec2:DescribeSubnets, ec2:DescribeSecurityGroups]
            Effect: Allow
            Resource: '*'
      - PolicyName: SchemaRegistryPolicy
        PolicyDocument:
          Statement:
          - Action: [glue:GetRegistry]
            Effect: Allow
            Resource: 'arn:aws:glue:{region}:{account-id}:registry/{registry-name}'
      - PolicyName: SchemaVersionsPolicy
        PolicyDocument:
          Statement:
          - Action: [glue:GetSchemaVersions]
            Effect: Allow
            Resource: '*'
      ManagedPolicyArns:
      - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Tags:
      - {Value: SAM, Key: lambda:createdBy}

  MyKafkaProcessor:
    Type: AWS::Serverless::Function
    Properties:
      Runtime: nodejs18.x
      Handler: index.handler
      CodeUri: ${codeuri}
      Role:
        Fn::GetAtt: [MyLambdaExecutionRole, Arn]
      Events:
        SelfManagedKafkaEvent:
          Type: SelfManagedKafka
          Properties:
            KafkaBootstrapServers:
              - my-kafka-broker-1:9092
              - my-kafka-broker-2:9092
            Topics:
              - SchemaRegistryTestTopic
            StartingPosition: LATEST
            SourceAccessConfigurations:
              - Type: VPC_SUBNET
                URI: subnet:subnet-12345678
              - Type: VPC_SECURITY_GROUP
                URI: security_group:sg-12345678
              - Type: BASIC_AUTH
                URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c
            ProvisionedPollerConfig:
              MinimumPollers: 1
            SchemaRegistryConfig:
              AccessConfigs:
              - Type: BASIC_AUTH
                URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c
              SchemaValidationConfigs:
              - Attribute: KEY
              EventRecordFormat: JSON
              SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry
```

### ProvisionedPollerConfig 示例
<a name="sam-property-function-selfmanagedkafka-example-provisionedpollerconfig"></a>

```
ProvisionedPollerConfig:
  MinimumPollers: 1
  MaximumPollers: 200
```

### 自行管理的 Kafka 事件源
<a name="sam-property-function-selfmanagedkafka--examples--self-managed-kafka-event-source"></a>

以下是 `SelfManagedKafka` 事件源类型的示例。

#### YAML
<a name="sam-property-function-selfmanagedkafka--examples--self-managed-kafka-event-source--yaml"></a>

```
Events:
  SelfManagedKafkaEvent:
    Type: SelfManagedKafka
    Properties:
      BatchSize: 1000
      Enabled: true
      KafkaBootstrapServers:
        - abc.xyz.com:xxxx
      SourceAccessConfigurations:
        -  Type: BASIC_AUTH
           URI: arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c
      Topics:
        - MyKafkaTopic
```

### 带有 AWS Glue 架构注册表的自管理 Kafka 事件源
<a name="sam-property-function-selfmanagedkafka-example-schemaregistry"></a>

以下是使用 AWS Glue 架构注册表配置`SelfManagedKafka`的事件源类型的示例。

```
Events:
  SelfManagedKafkaEvent:
    Type: SelfManagedKafka
    Properties:
      KafkaBootstrapServers:
        - abc.xyz.com:9092
      Topics:
        - SchemaRegistryTestTopic
      StartingPosition: LATEST
      ProvisionedPollerConfig:
        MinimumPollers: 1
      SchemaRegistryConfig:
        SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry
        EventRecordFormat: JSON
        SchemaValidationConfigs:
          - Attribute: KEY
          - Attribute: VALUE
      SourceAccessConfigurations:
        - Type: VPC_SUBNET
          URI: subnet:subnet-12345678
        - Type: VPC_SECURITY_GROUP
          URI: security_group:sg-12345678
```

### 自托管 Kafka 事件源与 Confluent 架构注册表
<a name="sam-property-function-selfmanagedkafka-example-schemaregistry-confluent"></a>

以下是一个配置了 Confluent 架构注册表的 `SelfManagedKafka` 事件源类型的示例。

```
Events:
  SelfManagedKafkaEvent:
    Type: SelfManagedKafka
    Properties:
      KafkaBootstrapServers:
        - abc.xyz.com:9092
      Topics:
        - SchemaRegistryTestTopic
      StartingPosition: LATEST
      ProvisionedPollerConfig:
        MinimumPollers: 1
      SchemaRegistryConfig:
        SchemaRegistryURI: https://my-schema-registry.confluent.cloud
        AccessConfigs:
          - Type: BASIC_AUTH
            URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-secret
        EventRecordFormat: JSON
        SchemaValidationConfigs:
          - Attribute: KEY
          - Attribute: VALUE
      SourceAccessConfigurations:
        - Type: VPC_SUBNET
          URI: subnet:subnet-12345678
        - Type: VPC_SECURITY_GROUP
          URI: security_group:sg-12345678
        - Type: BASIC_AUTH
          URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:kafka-secret
```