

# 结合 Amazon MQ 使用 Lambda
<a name="with-mq"></a>

**注意**  
如果想要将数据发送到 Lambda 函数以外的目标，或要在发送数据之前丰富数据，请参阅 [Amazon EventBridge Pipes](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html)（Amazon EventBridge 管道）。

Amazon MQ 是一项托管消息代理服务，用于 [Apache ActiveMQ](https://activemq.apache.org/) 和 [RabbitMQ](https://www.rabbitmq.com)。*消息代理*允许软件应用程序和组件使用各种编程语言、操作系统和正式消息收发协议，通过主题或队列事件目标进行通信。

Amazon MQ 还可以通过安装 ActiveMQ 代理或 RabbitMQ 代理以及提供不同的网络拓扑和其他基础设施需求来代表您管理 Amazon Elastic Compute Cloud (Amazon EC2) 实例。

您可使用 Lambda 函数处理来自 Amazon MQ 消息代理的记录。Lambda 通过[事件源映射](invocation-eventsourcemapping.md)调用您的函数，事件源映射是从您的代理读取消息并[同步](invocation-sync.md)调用函数的一种 Lambda 资源。

**警告**  
Lambda 事件源映射至少处理每个事件一次，有可能出现重复处理记录的情况。为避免与重复事件相关的潜在问题，我们强烈建议您将函数代码设为幂等性。要了解更多信息，请参阅 AWS 知识中心的[如何让我的 Lambda 函数保持幂等性](https://repost.aws/knowledge-center/lambda-function-idempotent)。

Amazon MQ 事件源映射有以下配置限制：
+ 并发 – 使用 Amazon MQ 事件源映射的 Lambda 函数具有默认的最大[并发](lambda-concurrency.md)设置。对于 ActiveMQ，Lambda 服务将每个 Amazon MQ 事件源映射的并发执行环境数量限制为 5 个。对于 RabbitMQ，每个 Amazon MQ 事件源映射的并发执行环境数量限制为 1 个。即使您更改了函数的预留或预调配并发设置，Lambda 服务也不会提供更多的执行环境。要请求增加单个 Amazon MQ 事件源映射的默认最大并发数，请联系 支持 并提供事件源映射 UUID 和区域。因为是在特定的事件源映射级别而不是账户或区域级别增加，所以需要为每个事件源映射手动请求按比例增加。
+ 跨账户 – Lambda 不支持跨账户处理。您不能使用 Lambda 处理来自不同 AWS 账户 账户中的 Amazon MQ 消息代理的记录。
+ 身份验证 – 对于 ActiveMQ，仅支持 ActiveMQ [SimpleAuthenticationPlugin](https://activemq.apache.org/security#simple-authentication-plugin)。对于 RabbitMQ，仅支持 [PLAIN](https://www.rabbitmq.com/access-control.html#mechanisms) 身份验证机制。用户必须使用 AWS Secrets Manager 来管理凭据。有关 ActiveMQ 身份验证的更多信息，请参阅 *Amazon MQ 开发人员指南*中的[使用 LDAP 集成 ActiveMQ 代理](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/security-authentication-authorization.html)。
+ 连接配额 – 代理具有每个有线级协议允许的最大连接数。此配额基于代理实例类型。有关更多信息，请参阅 *Amazon MQ 开发人员指南*中的 **Amazon MQ 中的配额**的[代理](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/amazon-mq-limits.html#broker-limits)部分。
+ 连接 – 您可以在公有或私有虚拟私有云（VPC）中创建代理。对于私有 VPC，您的 Lambda 函数需要具备对 VPC 的访问权限才能接收消息。有关更多信息，请参阅此部分后面的[配置网络安全](process-mq-messages-with-lambda.md#process-mq-messages-with-lambda-networkconfiguration)。
+ 事件目标 – 仅支持队列目标。但是，您可以使用虚拟主题，虚拟主题在内部与主题行为一致，在与 Lambda 交互时与队列行为一致。有关更多信息，请参阅 Apache ActiveMQ 网站上的[虚拟目标](https://activemq.apache.org/virtual-destinations)和 RabbitMQ 网站上的[虚拟主机](https://www.rabbitmq.com/vhosts.html)。
+ 网络拓扑 – 对于 ActiveMQ，每个事件源映射仅支持一个单实例或备用代理。对于 RabbitMQ，每个事件源映射只支持一个单实例代理或集群部署。单实例代理需要一个失效转移端点。有关这些代理部署模式的更多信息，请参阅 *Amazon MQ 开发人员指南*中的 [Active MQ 代理架构](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/amazon-mq-broker-architecture.html)和[RabbitMQ 代理架构](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/rabbitmq-broker-architecture.html)。
+ 协议 – 支持的协议取决于 Amazon MQ 集成的类型。
  + 对于 ActiveMQ 集成，Lambda 使用 OpenWire/Java Message Service (JMS) 协议来使用消息。消息的使用不支持任何其他协议。在 JMS 协议中，仅支持 [https://activemq.apache.org/components/cms/api_docs/activemqcpp-3.6.0/html/classactivemq_1_1commands_1_1_active_m_q_text_message.html](https://activemq.apache.org/components/cms/api_docs/activemqcpp-3.6.0/html/classactivemq_1_1commands_1_1_active_m_q_text_message.html) 和 [https://activemq.apache.org/components/cms/api_docs/activemqcpp-3.9.0/html/classactivemq_1_1commands_1_1_active_m_q_bytes_message.html](https://activemq.apache.org/components/cms/api_docs/activemqcpp-3.9.0/html/classactivemq_1_1commands_1_1_active_m_q_bytes_message.html)。Lambda 还支持 JMS 自定义属性。有关 OpenWire 协议的更多信息，请参阅 Apache ActiveMQ 网站上的 [OpenWire](https://activemq.apache.org/openwire.html)。
  + 对于 RabbitMQ 集成，Lambda 使用 AMQP 0-9-1 协议来使用消息。消息的使用不支持任何其他协议。有关 RabbitMQ 的 AMQP 0-9-1 协议实施的详细信息，请参阅 RabbitMQ 网站上的 [AMQP 0-9-1 完整参考指南](https://www.rabbitmq.com/amqp-0-9-1-reference.html)。

Lambda 自动支持 Amazon MQ 支持的最新版本的 ActiveMQ 和 RabbitMQ。有关受支持的最新版本，请参阅 *Amazon MQ 开发人员指南*中的 [Amazon MQ 发布说明](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/amazon-mq-release-notes.html)。

**注意**  
默认情况下，Amazon MQ 代理有一个每周维护时段。代理在该时段内无法使用。对于没有备用代理的代理，Lambda 将无法在该时段处理任何消息。

**Topics**
+ [了解 Amazon MQ 的 Lambda 使用者组](#services-mq-configure)
+ [为 Lambda 配置 Amazon MQ 事件源](process-mq-messages-with-lambda.md)
+ [事件源映射 API](services-mq-params.md)
+ [筛选来自 Amazon MQ 事件源的事件](with-mq-filtering.md)
+ [Amazon MQ 事件源映射错误排查](services-mq-errors.md)

## 了解 Amazon MQ 的 Lambda 使用者组
<a name="services-mq-configure"></a>

为了与 Amazon MQ 进行交互，Lambda 会创建一个可以从 Amazon MQ 代理中读取的使用者组。使用与事件源映射 UUID 相同的 ID 创建使用者组。

对于 Amazon MQ 事件源，Lambda 会将记录合并为批处理，然后通过单个有效负载中将其发送到您的函数。要控制行为，您可以配置批处理时段和批处理大小。Lambda 会持续提取消息，直到达到 6 MB 的最大有效负载大小、批处理时段过期或记录数达到完整批处理大小时为止。有关更多信息，请参阅 [批处理行为](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)。

使用者组将消息作为字节 BLOB 进行检索，然后以 base64 格式编码为单个 JSON 有效负载，接下来调用您的函数。如果函数为批处理中的任何消息返回错误，Lambda 将重试整批消息，直到处理成功或消息过期为止。

**注意**  
尽管 Lambda 函数的最大超时限制通常为 15 分钟，但 Amazon MSK、自行管理的 Apache Kafka、Amazon DocumentDB、Amazon MQ for ActiveMQ 和 RabbitMQ 的事件源映射，仅支持最大超时限制为 14 分钟的函数。此约束可确保事件源映射可以正确处理函数错误和重试。

您可以使用 Amazon CloudWatch 中的 `ConcurrentExecutions` 指标监控给定函数的并发使用情况。有关并发的更多信息，请参阅 [为函数配置预留并发](configuration-concurrency.md)。

**Example Amazon MQ 记录事件**  

```
{
   "eventSource": "aws:mq",
   "eventSourceArn": "arn:aws:mq:us-east-2:111122223333:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8",
   "messages": [
      { 
        "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1---mq---us-east-2.amazonaws.com.rproxy.goskope.com-37557-1234520418293-4:1:1:1:1", 
        "messageType": "jms/text-message",
        "deliveryMode": 1,
        "replyTo": null,
        "type": null,
        "expiration": "60000",
        "priority": 1,
        "correlationId": "myJMSCoID",
        "redelivered": false,
        "destination": { 
          "physicalName": "testQueue" 
        },
        "data":"QUJDOkFBQUE=",
        "timestamp": 1598827811958,
        "brokerInTime": 1598827811958, 
        "brokerOutTime": 1598827811959, 
        "properties": {
          "index": "1",
          "doAlarm": "false",
          "myCustomProperty": "value"
        }
      },
      { 
        "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1---mq---us-east-2.amazonaws.com.rproxy.goskope.com-37557-1234520418293-4:1:1:1:1",
        "messageType": "jms/bytes-message",
        "deliveryMode": 1,
        "replyTo": null,
        "type": null,
        "expiration": "60000",
        "priority": 2,
        "correlationId": "myJMSCoID1",
        "redelivered": false,
        "destination": { 
          "physicalName": "testQueue" 
        },
        "data":"LQaGQ82S48k=",
        "timestamp": 1598827811958,
        "brokerInTime": 1598827811958, 
        "brokerOutTime": 1598827811959, 
        "properties": {
          "index": "1",
          "doAlarm": "false",
          "myCustomProperty": "value"
        }
      }
   ]
}
```

```
{
  "eventSource": "aws:rmq",
  "eventSourceArn": "arn:aws:mq:us-east-2:111122223333:broker:pizzaBroker:b-9bcfa592-423a-4942-879d-eb284b418fc8",
  "rmqMessagesByQueue": {
    "pizzaQueue::/": [
      {
        "basicProperties": {
          "contentType": "text/plain",
          "contentEncoding": null,
          "headers": {
            "header1": {
              "bytes": [
                118,
                97,
                108,
                117,
                101,
                49
              ]
            },
            "header2": {
              "bytes": [
                118,
                97,
                108,
                117,
                101,
                50
              ]
            },
            "numberInHeader": 10
          },
          "deliveryMode": 1,
          "priority": 34,
          "correlationId": null,
          "replyTo": null,
          "expiration": "60000",
          "messageId": null,
          "timestamp": "Jan 1, 1970, 12:33:41 AM",
          "type": null,
          "userId": "AIDACKCEVSQ6C2EXAMPLE",
          "appId": null,
          "clusterId": null,
          "bodySize": 80
        },
        "redelivered": false,
        "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ=="
      }
    ]
  }
}
```
在 RabbitMQ 示例中，`pizzaQueue` 是 RabbitMQ 队列的名称，`/` 是虚拟主机的名称。接收消息时，事件源会将消息列在 `pizzaQueue::/` 下。

# 为 Lambda 配置 Amazon MQ 事件源
<a name="process-mq-messages-with-lambda"></a>

**Topics**
+ [配置网络安全](#process-mq-messages-with-lambda-networkconfiguration)
+ [创建事件源映射](#services-mq-eventsourcemapping)

## 配置网络安全
<a name="process-mq-messages-with-lambda-networkconfiguration"></a>

要通过事件源映射向 Lambda 提供对 Amazon MQ 的完全访问权限，代理必须使用公有端点（公有 IP 地址），或者您必须提供对您在其中创建了代理的 Amazon VPC 的访问权限。

将 Amazon MQ 与 Lambda 配合使用时，创建 [AWS PrivateLink VPC 端点](https://docs.aws.amazon.com/vpc/latest/privatelink/create-interface-endpoint.html)，以便您的函数可以访问 Amazon VPC 中的资源。

**注意**  
对于使用事件轮询器的默认（按需）模式的事件源映射函数，需要 AWS PrivateLink VPC 端点。如果事件源映射使用[预调配模式](invocation-eventsourcemapping.md#invocation-eventsourcemapping-provisioned-mode)，则无需配置 AWS PrivateLink VPC 端点。

创建端点以提供对以下资源的访问权限：
+  Lambda：为 Lambda 服务主体创建端点。
+  AWS STS：为 AWS STS 创建端点，以便服务主体代您代入角色。
+  Secrets Manager：如果代理使用 Secrets Manager 来存储凭证，请为 Secrets Manager 创建端点。

也可以在 Amazon VPC 中的每个公有子网上配置 NAT 网关。有关更多信息，请参阅 [为连接到 VPC 的 Lambda 函数启用互联网访问权限](configuration-vpc-internet.md)。

为 Amazon MQ 创建事件源映射时，Lambda 会检查为 Amazon VPC 配置的子网和安全组是否已经存在弹性网络接口（ENI）。如果 Lambda 发现现有 ENI，则会尝试重用这些 ENI。否则，Lambda 会创建新的 ENI 来连接到事件源并调用函数。

**注意**  
Lambda 函数始终在 Lambda 服务拥有的 Amazon VPC 中运行。函数的 VPC 配置不会影响事件源映射。只有事件源的网络配置才能决定 Lambda 连接到事件源的方式。

为包含代理的 Amazon VPC 配置安全组。默认情况下，Amazon MQ 使用以下端口：`61617`（Amazon MQ for ActiveMQ）和 `5671`（Amazon MQ for RabbitMQ）。
+ 入站规则：允许与事件源关联安全组的默认代理端口的所有流量。或者，您可以使用自引用安全组规则允许来自同一安全组内的实例进行访问。
+ 出站规则 – 如果您的函数需要与 AWS 服务进行通信，则允许端口 `443` 上的所有流量向外部目标传输。或者，如果您不需要与其他 AWS 服务通信，也可以使用自引用的安全组规则来限制对代理的访问权限。
+ Amazon VPC 端点入站规则：如果您正在使用 Amazon VPC 端点，则与 Amazon VPC 端点关联的安全组，必须允许来自代理安全组的端口 `443` 上的入站流量。

如果代理使用身份验证，您还可以限制 Secrets Manager 端点的端点策略。要调用 Secrets Manager API，Lambda 会使用函数角色而非 Lambda 服务主体。

**Example VPC 端点策略：Secrets Manager 端点**  

```
{
      "Statement": [
          {
              "Action": "secretsmanager:GetSecretValue",
              "Effect": "Allow",
              "Principal": {
                  "AWS": [
                      "arn:aws::iam::123456789012:role/my-role"
                  ]
              },
              "Resource": "arn:aws::secretsmanager:us-west-2:123456789012:secret:my-secret"
          }
      ]
  }
```

当您使用 Amazon VPC 端点时，AWS 会使用端点的弹性网络接口（ENI）路由 API 调用来调用函数。Lambda 服务主体需要针对使用这些 ENI 的任何角色和函数调用 `lambda:InvokeFunction`。

默认情况下，Amazon VPC 端点具有开放的 IAM 策略，允许对资源进行广泛访问。最佳实践是，将这些策略限制为使用该端点执行所需的操作。为确保事件源映射能够调用 Lambda 函数，VPC 端点策略必须允许 Lambda 服务主体调用 `sts:AssumeRole` 和 `lambda:InvokeFunction`。将 VPC 端点策略限制为仅允许来自组织内部的 API 调用，会导致事件源映射无法正常运行，因此这些策略中需要 `"Resource": "*"`。

以下 VPC 端点策略示例展示了如何向 AWS STS 的 Lambda 服务主体和 Lambda 端点授予所需的访问权限。

**Example VPC 端点策略 – AWS STS 端点**  

```
{
      "Statement": [
          {
              "Action": "sts:AssumeRole",
              "Effect": "Allow",
              "Principal": {
                  "Service": [
                      "lambda.amazonaws.com"
                  ]
              },
              "Resource": "*"
          }
      ]
    }
```

**Example VPC 端点策略 – Lambda 端点**  

```
{
      "Statement": [
          {
              "Action": "lambda:InvokeFunction",
              "Effect": "Allow",
              "Principal": {
                  "Service": [
                      "lambda.amazonaws.com"
                  ]
              },
              "Resource": "*"
          }
      ]
  }
```

## 创建事件源映射
<a name="services-mq-eventsourcemapping"></a>

创建[事件源映射](invocation-eventsourcemapping.md)以指示 Lambda 将 Amazon MQ 代理中的记录发送到 Lambda 函数。您可以创建多个事件源映射，以使用多个函数处理相同的数据，或使用单个函数处理来自多个源的项目。

要将您的函数配置为从 Amazon MQ 中读取，请添加所需权限并在 Lambda 控制台中创建 **MQ** 触发器。

要从 Amazon MQ 代理读取记录，Lambda 函数需要以下权限：通过向函数[执行角色](lambda-intro-execution-role.md)添加权限语句，可以授予 Lambda 与 Amazon MQ 代理及其底层资源交互的权限：
+ [mq:DescribeBroker](https://docs.aws.amazon.com/amazon-mq/latest/api-reference/brokers-broker-id.html#brokers-broker-id-http-methods)
+ [secretsmanager:GetSecretValue](https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html)
+ [ec2:CreateNetworkInterface](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_CreateNetworkInterface.html)
+ [ec2:DeleteNetworkInterface](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DeleteNetworkInterface.html)
+ [ec2:DescribeNetworkInterfaces](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeNetworkInterfaces.html)
+ [ec2:DescribeSecurityGroups](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSecurityGroups.html)
+ [ec2:DescribeSubnets](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSubnets.html)
+ [ec2:DescribeVpcs](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeVpcs.html)
+ [logs:CreateLogGroup](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogGroup.html)
+ [logs:CreateLogStream](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogStream.html)
+ [logs:PutLogEvents](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html)

**注意**  
使用加密的客户托管密钥时，也可以添加 `[kms:Decrypt](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-bootstrap-brokers.html#clusters-clusterarn-bootstrap-brokersget)` 权限。

**要添加权限并创建触发器**

1. 打开 Lamba 控制台的[函数](https://console.aws.amazon.com/lambda/home#/functions)页面。

1. 选择一个函数的名称。

1. 选择 **Configuration**（配置）选项卡，然后选择 **Permissions**（权限）。

1. 在**角色名称**下，选择至执行角色的链接。此角色将在 IAM 控制台中打开角色。  
![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/execution-role.png)

1. 选择**添加权限**，然后选择**创建内联策略**。  
![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/inline-policy.png)

1. 在**策略编辑器**中，选择 **JSON**。输入以下策略。您的函数需要这些权限才能从 Amazon MQ 代理读取数据。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
         {
           "Effect": "Allow",
           "Action": [
             "mq:DescribeBroker",
             "secretsmanager:GetSecretValue",
             "ec2:CreateNetworkInterface",
             "ec2:DeleteNetworkInterface",
             "ec2:DescribeNetworkInterfaces", 
             "ec2:DescribeSecurityGroups",
             "ec2:DescribeSubnets",
             "ec2:DescribeVpcs",
             "logs:CreateLogGroup",
             "logs:CreateLogStream", 
             "logs:PutLogEvents"		
           ],
           "Resource": "*"
         }
       ]
     }
   ```

------
**注意**  
使用加密的客户托管密钥时，还必须添加 `kms:Decrypt` 权限。

1. 选择**下一步**。输入策略名称，然后选择**创建策略**。

1. 在 Lambda 控制台中返回您的函数。在 **Function overview**（函数概览）下，选择 **Add trigger**（添加触发器）。  
![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/add-trigger.png)

1. 选择 **MQ** 触发器类型。

1. 配置必填选项，然后选择 **Add**（添加）。

Lambda 支持对 Amazon MQ 事件源使用以下选项：
+ **MQ broker**（MQ 代理）– 选择 Amazon MQ 代理。
+ **Batch size**（批处理大小）– 设置要在单个批次中检索的最大消息数。
+ **Queue name**（队列名称）– 输入要使用的 Amazon MQ 队列。
+ **Source access configuration**（源访问配置）– 输入虚拟主机信息和 Secret Secrets Manager 密钥，用于存储您的代理凭证。
+ **Enable trigger**（启用触发器）– 禁用触发器以停止处理记录。

要启用或禁用触发器（或删除触发器），请在设计器中选择 **MQ** 触发器。要重新配置触发器，请使用事件源映射 API 操作。

# 事件源映射 API
<a name="services-mq-params"></a>

所有 Lambda 事件源类型共享相同的 [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) 和 [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) API 操作。但是，只有部分参数适用于 Amazon MQ 和 RabbitMQ。


| 参数 | 必需 | 默认值 | 备注 | 
| --- | --- | --- | --- | 
|  BatchSize  |  否  |  100  |  最大值：10000  | 
|  已启用  |  否  |  真实  | none | 
|  FunctionName  |  是  | 不适用  | none | 
|  FilterCriteria  |  否  |  不适用   |  [控制 Lambda 向您的函数发送的事件](invocation-eventfiltering.md)  | 
|  MaximumBatchingWindowInSeconds  |  否  |  500 毫秒  |  [批处理行为](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)  | 
|  队列  |  否  | 不适用 |  要使用的 Amazon MQ 代理目标队列的名称。  | 
|  SourceAccessConfigurations  |  否  | 不适用  |  对于 ActiveMQ 为 BASIC\$1AUTH 凭证。对于 RabbitMQ，可以同时包含 BASIC\$1AUTH 凭证和 VIRTUAL\$1HOST 信息。  | 

# 筛选来自 Amazon MQ 事件源的事件
<a name="with-mq-filtering"></a>

您可以使用事件筛选，控制 Lambda 将流或队列中的哪些记录发送给函数。有关事件筛选工作原理的一般信息，请参阅 [控制 Lambda 向您的函数发送的事件](invocation-eventfiltering.md)。

本节重点介绍 Amazon MQ 事件源的事件筛选。

**注意**  
Amazon MQ 事件源映射仅支持对 `data` 键进行筛选。

**Topics**
+ [Amazon MQ 事件筛选基础知识](#filtering-AMQ)

## Amazon MQ 事件筛选基础知识
<a name="filtering-AMQ"></a>

假设 Amazon MQ 消息队列包含有效 JSON 格式或纯字符串的消息。示例记录如下所示，`data` 字段中的数据会转换为 Base64 编码字符串。

------
#### [ ActiveMQ ]

```
{ 
    "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1---mq---us-east-2.amazonaws.com.rproxy.goskope.com-37557-1234520418293-4:1:1:1:1", 
    "messageType": "jms/text-message",
    "deliveryMode": 1,
    "replyTo": null,
    "type": null,
    "expiration": "60000",
    "priority": 1,
    "correlationId": "myJMSCoID",
    "redelivered": false,
    "destination": { 
      "physicalName": "testQueue" 
    },
    "data":"QUJDOkFBQUE=",
    "timestamp": 1598827811958,
    "brokerInTime": 1598827811958, 
    "brokerOutTime": 1598827811959, 
    "properties": {
      "index": "1",
      "doAlarm": "false",
      "myCustomProperty": "value"
    }
}
```

------
#### [ RabbitMQ ]

```
{
    "basicProperties": {
        "contentType": "text/plain",
        "contentEncoding": null,
        "headers": {
            "header1": {
                "bytes": [
                  118,
                  97,
                  108,
                  117,
                  101,
                  49
                ]
            },
            "header2": {
                "bytes": [
                  118,
                  97,
                  108,
                  117,
                  101,
                  50
                ]
            },
            "numberInHeader": 10
        },
        "deliveryMode": 1,
        "priority": 34,
        "correlationId": null,
        "replyTo": null,
        "expiration": "60000",
        "messageId": null,
        "timestamp": "Jan 1, 1970, 12:33:41 AM",
        "type": null,
        "userId": "AIDACKCEVSQ6C2EXAMPLE",
        "appId": null,
        "clusterId": null,
        "bodySize": 80
        },
    "redelivered": false,
    "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ=="
}
```

------

对于 Active MQ 和 Rabbit MQ 代理，您可以使用 `data` 键，通过事件筛选来筛选记录。假设 Amazon MQ 队列包含以下 JSON 格式的消息。

```
{
    "timeout": 0,
    "IPAddress": "203.0.113.254"
}
```

要仅筛选 `timeout` 字段大于 0 的记录，`FilterCriteria` 对象将如下所示。

```
{
    "Filters": [
        {
            "Pattern": "{ \"data\" : { \"timeout\" : [ { \"numeric\": [ \">\", 0] } } ] } }"
        }
    ]
}
```

为了更清楚起见，以下是在纯 JSON 中展开的筛选条件 `Pattern` 的值。

```
{
    "data": {
        "timeout": [ { "numeric": [ ">", 0 ] } ]
        }
}
```

您可以使用控制台、AWS CLI 或 AWS SAM 模板添加筛选条件。

------
#### [ Console ]

要使用控制台添加此筛选条件，请按照 [将筛选条件附加到事件源映射（控制台）](invocation-eventfiltering.md#filtering-console) 中的说明，为**筛选条件**输入以下字符串。

```
{ "data" : { "timeout" : [ { "numeric": [ ">", 0 ] } ] } }
```

------
#### [ AWS CLI ]

要使用 AWS Command Line Interface（AWS CLI）创建包含这些筛选条件的新事件源映射，请运行以下命令。

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:mq:us-east-2:123456789012:broker:my-broker:b-8ac7cc01-5898-482d-be2f-a6b596050ea8 \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"timeout\" : [ { \"numeric\": [ \">\", 0 ] } ] } }"}]}'
```

要将这些筛选条件添加到现有事件源映射中，请运行以下命令。

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"timeout\" : [ { \"numeric\": [ \">\", 0 ] } ] } }"}]}'
```

要将这些筛选条件添加到现有事件源映射中，请运行以下命令。

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"timeout\" : [ { \"numeric\": [ \">\", 0 ] } ] } }"}]}'
```

------
#### [ AWS SAM ]

要使用 AWS SAM 添加此筛选条件，请将以下代码段添加到事件源的 YAML 模板中。

```
FilterCriteria:
  Filters:
    - Pattern: '{ "data" : { "timeout" : [ { "numeric": [ ">", 0 ] } ] } }'
```

------

使用 Amazon MQ，您还可以筛选消息为纯字符串的记录。假设您只想处理消息以“结果：”开头的记录。`FilterCriteria` 对象将如下所示。

```
{
    "Filters": [
        {
            "Pattern": "{ \"data\" : [ { \"prefix\": \"Result: \" } ] }"
        }
    ]
}
```

为了更清楚起见，以下是在纯 JSON 中展开的筛选条件 `Pattern` 的值。

```
{
    "data": [
        {
        "prefix": "Result: "
        }
    ]
}
```

您可以使用控制台、AWS CLI 或 AWS SAM 模板添加筛选条件。

------
#### [ Console ]

要使用控制台添加此筛选条件，请按照 [将筛选条件附加到事件源映射（控制台）](invocation-eventfiltering.md#filtering-console) 中的说明，为**筛选条件**输入以下字符串。

```
{ "data" : [ { "prefix": "Result: " } ] }
```

------
#### [ AWS CLI ]

要使用 AWS Command Line Interface（AWS CLI）创建包含这些筛选条件的新事件源映射，请运行以下命令。

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:mq:us-east-2:123456789012:broker:my-broker:b-8ac7cc01-5898-482d-be2f-a6b596050ea8 \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : [ { \"prefix\": \"Result: \" } ] }"}]}'
```

要将这些筛选条件添加到现有事件源映射中，请运行以下命令。

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : [ { \"prefix\": \"Result: \" } ] }"}]}'
```

------
#### [ AWS SAM ]

要使用 AWS SAM 添加此筛选条件，请将以下代码段添加到事件源的 YAML 模板中。

```
FilterCriteria:
  Filters:
    - Pattern: '{ "data" : [ { "prefix": "Result " } ] }'
```

------

Amazon MQ 消息必须是 UTF-8 编码的字符串，可以是纯字符串或 JSON 格式。这是因为 Lambda 在应用筛选条件之前将 Amazon MQ 字节数组解码为 UTF-8。如果您的消息使用另一种编码，例如 UTF-16 或 ASCII，或者消息格式与 `FilterCriteria` 格式不匹配，则 Lambda 仅处理元数据筛选条件。下表汇总了具体行为：


| 传入消息格式 | 消息属性的筛选条件模式格式 | 导致的操作 | 
| --- | --- | --- | 
|  纯字符串  |  纯字符串  |  Lambda 根据您的筛选条件进行筛选。  | 
|  纯字符串  |  数据属性中没有筛选条件模式  |  Lambda 根据您的筛选条件进行筛选（仅限其他元数据属性）。  | 
|  纯字符串  |  有效 JSON  |  Lambda 根据您的筛选条件进行筛选（仅限其他元数据属性）。  | 
|  有效 JSON  |  纯字符串  |  Lambda 根据您的筛选条件进行筛选（仅限其他元数据属性）。  | 
|  有效 JSON  |  数据属性中没有筛选条件模式  |  Lambda 根据您的筛选条件进行筛选（仅限其他元数据属性）。  | 
|  有效 JSON  |  有效 JSON  |  Lambda 根据您的筛选条件进行筛选。  | 
|  非 UTF-8 编码字符串  |  JSON、纯字符串或无模式  |  Lambda 根据您的筛选条件进行筛选（仅限其他元数据属性）。  | 

# Amazon MQ 事件源映射错误排查
<a name="services-mq-errors"></a>

当 Lambda 函数遇到不可恢复的错误时，您的 Amazon MQ 使用者将停止处理记录。任何其他使用者如果没有遇到相同的错误，都可以继续处理。要确定使用者停止的潜在原因，请检查 `StateTransitionReason` 返回的详细信息中的 `EventSourceMapping` 字段中是否有以下代码：

**`ESM_CONFIG_NOT_VALID`**  
事件源映射配置无效。

**`EVENT_SOURCE_AUTHN_ERROR`**  
Lambda 验证事件源失败。

**`EVENT_SOURCE_AUTHZ_ERROR`**  
Lambda 没有访问事件源所需的权限。

**`FUNCTION_CONFIG_NOT_VALID`**  
函数的配置无效。

如果记录由于其大小而被 Lambda 丢弃，也将处于未处理状态。Lambda 记录的大小限制为 6 MB。要在函数出错时重新传递消息，您可以使用死信队列 (DLQ)。有关更多信息，请参阅 Apache ActiveMQ 网站上的[消息重新传递和 DLQ 处理](https://activemq.apache.org/message-redelivery-and-dlq-handling)和 RabbitMQ 网站上的[可靠性指南](https://www.rabbitmq.com/reliability.html)。

**注意**  
Lambda 不支持自定义重新传递策略。相反，Lambda 使用一个策略，其默认值来自 Apache ActiveMQ 网站上的[重新传递策略](https://activemq.apache.org/redelivery-policy)页面。其中，`maximumRedeliveries` 设置为 6。