

# Amazon MQ で Lambda を使用する
<a name="with-mq"></a>

**注記**  
Lambda 関数以外のターゲットにデータを送信したい、または送信する前にデータをエンリッチしたいという場合は、「[Amazon EventBridge Pipes](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html)」を参照してください。

Amazon MQ は、[Apache ActiveMQ](https://activemq.apache.org/) および [RabbitMQ](https://www.rabbitmq.com) 用のマネージドメッセージブローカーサービスです。*メッセージブローカー*を使用すると、ソフトウェアアプリケーションおよびコンポーネントは、さまざまなプログラミング言語、オペレーティングシステム、および、トピックまたはキューイベント送信先を介した正式なメッセージングプロトコルを使って、通信できるようになります。

また Amazon MQ は、ActiveMQ か RabbitMQ ブローカーをインストールすることにより、もしくは、異なるネットワークトポロジやその他のインフラストラクチャのニーズを提供することにより、ユーザーに代わって Amazon Elastic Compute Cloud (Amazon EC2)インスタンスを管理することもできます。

Lambda 関数を使用することで、Amazon MQ メッセージブローカーからのレコードを処理できます。Lambda は、ブローカーからメッセージを読み取り関数を[同期的に](invocation-sync.md)呼び出す Lambda リソース、[イベントソースマッピング](invocation-eventsourcemapping.md)によって関数を呼び出します。

**警告**  
Lambda イベントソースマッピングは各イベントを少なくとも 1 回処理し、レコードの重複処理が発生する可能性があります。重複するイベントに関連する潜在的な問題を避けるため、関数コードを冪等にすることを強くお勧めします。詳細については、AWS ナレッジセンターの「[Lambda 関数を冪等にするにはどうすればよいですか?](https://repost.aws/knowledge-center/lambda-function-idempotent)」を参照してください。

Amazon MQ イベントソースマッピングには、次の設定制限があります。
+ 同時実行数 — Amazon MQ イベントソースマッピングを使用する Lambda 関数には、デフォルトの最大[同時実行数](lambda-concurrency.md)設定があります。ActiveMQ の場合、Lambda サービスは同時実行環境の数を Amazon MQ イベントソースマッピングごとに 5 つに制限します。RabbitMQ の場合、同時実行環境の数は Amazon MQ イベントソースマッピングごとに 1 つに制限されます。関数の予約またはプロビジョニングされる同時実行数設定を変更しても、Lambda サービスはこれ以上実行環境を利用できるようにしません。単一の Amazon MQ イベントソースマッピングのデフォルトの最大同時実行数の引き上げをリクエストするには、イベントソースマッピング UUID とリージョンを サポート に連絡してください。引き上げはアカウントレベルやリージョンレベルではなく、特定のイベントソースマッピングレベルで適用されるため、イベントソースマッピングごとにスケーリングの引き上げを手動でリクエストする必要があります。
+ クロスアカウント - Lambda はクロスアカウント処理をサポートしていません。Lambda を使用して、別の AWS アカウント にある Amazon MQ メッセージブローカーからのレコードを処理することはできません。
+ 認証 - ActiveMQ では、ActiveMQ [SimpleAuthenticationPlugin](https://activemq.apache.org/security#simple-authentication-plugin) のみサポートされています。RabbitMQ の場合、[PLAIN](https://www.rabbitmq.com/access-control.html#mechanisms) 認証メカニズムのみサポートされています。ユーザーは、資格情報の管理には AWS Secrets Manager を使用します。ActiveMQ 認証の詳細については、*Amazon MQ デベロッパーガイド*の [Integrating ActiveMQ brokers with LDAP](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/security-authentication-authorization.html) を参照してください。
+ 接続クォータ - ブローカーは、ワイヤレベルプロトコルごとに最大の接続可能数を持っています。このクォータは、ブローカーインスタンスタイプに基づいています。これらの制限の詳細については、*Amazon MQ デベロッパーガイド*の **Quotas in Amazon MQ** の [Brokers](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/amazon-mq-limits.html#broker-limits) のセクションを参照してください。
+ 接続 - ブローカーをパブリックまたはプライベートの Virtual Private Cloud (VPC) に作成できます。プライベート VPC の場合、Lambda 関数が VPC にアクセスしてメッセージを受信する必要があります。詳細については、このセクションで後述する[ネットワークセキュリティを設定する](process-mq-messages-with-lambda.md#process-mq-messages-with-lambda-networkconfiguration)を参照してください。
+ イベント送信先 - キューの送信先のみがサポートされます。ただし、仮想トピックを使用することができます。仮想トピックは、内部的にトピックとして動作し、キューとして Lambda と対話しながら動作します。詳細については、Apache ActiveMQ ウェブサイトの [Virtual Destinations](https://activemq.apache.org/virtual-destinations) および RabbitMQ ウェブサイトの [Virtual Hosts](https://www.rabbitmq.com/vhosts.html)を参照してください。
+ ネットワークトポロジ - ActiveMQ の場合、イベントソースマッピングごとに、１つの単一インスタンスまたはスタンバイブローカーがサポートされます。RabbitMQ の場合、イベントソースマッピングごとに、単一インスタンスブローカーまたはクラスターデプロイメントがサポートされます。単一インスタンスブローカーには、フェイルオーバーエンドポイントが必要です。これらのブローカーデプロイメントモードの詳細については、*Amazon MQ デベロッパーガイド*の [Active MQ Broker Architecture](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/amazon-mq-broker-architecture.html) および [Rabbit MQ Broker Architecture](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/rabbitmq-broker-architecture.html) を参照してください。
+ プロトコル — サポートされるプロトコルは、Amazon MQ の統合のタイプによって異なります。
  + ActiveMQ 統合の場合、Lambda は OpenWire/Java Message Service (JMS) プロトコルを使用してメッセージを使用します。その他のプロトコルは、メッセージの使用をサポートしていません。JMS プロトコル内では、[https://activemq.apache.org/components/cms/api_docs/activemqcpp-3.6.0/html/classactivemq_1_1commands_1_1_active_m_q_text_message.html](https://activemq.apache.org/components/cms/api_docs/activemqcpp-3.6.0/html/classactivemq_1_1commands_1_1_active_m_q_text_message.html) および [https://activemq.apache.org/components/cms/api_docs/activemqcpp-3.9.0/html/classactivemq_1_1commands_1_1_active_m_q_bytes_message.html](https://activemq.apache.org/components/cms/api_docs/activemqcpp-3.9.0/html/classactivemq_1_1commands_1_1_active_m_q_bytes_message.html) のみがサポートされています。Lambda は、JMS カスタムプロパティもサポートしています。OpenWire プロトコルの詳細については、Apache ActiveMQ ウェブサイトの [ OpenWire ](https://activemq.apache.org/openwire.html) を参照してください。
  + RabbitMQ 統合の場合、Lambda は AMQP 0-9-1 プロトコルを使ってメッセージを使用します。その他のプロトコルは、メッセージの使用をサポートしていません。RabbitMQ による AMQP 0-9-1 プロトコルの実装の詳細については、RabbitMQ ウェブサイトの [AMQP 0-9-1 Complete Reference Guide](https://www.rabbitmq.com/amqp-0-9-1-reference.html) を参照してください。

Lambda は、Amazon MQ がサポートする ActiveMQ および RabbitMQ の最新バージョンを自動的にサポートします。サポートされている最新バージョンについては、*Amazon MQ デベロッパーガイド*の [Amazon MQ リリースノート](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/amazon-mq-release-notes.html)を参照してください。

**注記**  
デフォルトでは、Amazon MQ には毎週、ブローカー用のメンテナンスウィンドウがあります。その期間中、ブローカーは利用できません。スタンバイのないブローカーの場合、Lambda はそのウィンドウ中にメッセージを処理できません。

**Topics**
+ [Amazon MQ の Lambda コンシューマーグループについて理解する](#services-mq-configure)
+ [Lambda での Amazon MQ イベントソースの設定](process-mq-messages-with-lambda.md)
+ [イベントソースマッピングパラメータ](services-mq-params.md)
+ [Amazon MQ イベントソースからのイベントをフィルタリングする](with-mq-filtering.md)
+ [Amazon MQ イベントソースマッピングエラーのトラブルシューティング](services-mq-errors.md)

## Amazon MQ の Lambda コンシューマーグループについて理解する
<a name="services-mq-configure"></a>

Amazon MQ と対話するため、Lambda は、Amazon MQ ブローカーから読み取ることができるコンシューマーグループを作成します。コンシューマーグループは、イベントソースマッピング UUID と同じ ID で作成されます。

Amazon MQ イベントソースの場合、Lambda はレコードをまとめてバッチ処理し、それらを単一のペイロードで関数に送信します。動作を制御するには、バッチ処理ウィンドウとバッチサイズを設定できます。Lambda は、最大 6 MB のペイロードサイズを処理する、バッチ処理ウィンドウの期限が切れる、またはレコード数が完全なバッチサイズに到達するまで、メッセージをプルします。詳細については、「[バッチ処理動作](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)」を参照してください。

コンシューマーグループは、メッセージをバイトの BLOB として取得し、それらを base64 でエンコードして単一の JSON ペイロードに変換してから、関数を呼び出します。関数がバッチ内のいずれかのメッセージに対してエラーを返すと、Lambda は、処理が成功するかメッセージが期限切れになるまでメッセージのバッチ全体を再試行します。

**注記**  
Lambda 関数の最大タイムアウト制限は通常 15 分ですが、Amazon MSK、自己管理型 Apache Kafka、Amazon DocumentDB、および ActiveMQ と RabbitMQ 向け Amazon MQ のイベントソースマッピングでは、最大タイムアウト制限が 14 分の関数のみがサポートされます。この制約により、イベントソースマッピングは関数エラーと再試行を適切に処理できます。

Amazon CloudWatch の `ConcurrentExecutions` メトリクスを使用して、特定の関数の同時実行使用率を監視できます。同時実行の詳細については、「[関数に対する予約済み同時実行数の設定](configuration-concurrency.md)」を参照してください。

**Example Amazon MQ レコードイベント**  

```
{
   "eventSource": "aws:mq",
   "eventSourceArn": "arn:aws:mq:us-east-2:111122223333:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8",
   "messages": [
      { 
        "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1---mq---us-east-2.amazonaws.com.rproxy.goskope.com-37557-1234520418293-4:1:1:1:1", 
        "messageType": "jms/text-message",
        "deliveryMode": 1,
        "replyTo": null,
        "type": null,
        "expiration": "60000",
        "priority": 1,
        "correlationId": "myJMSCoID",
        "redelivered": false,
        "destination": { 
          "physicalName": "testQueue" 
        },
        "data":"QUJDOkFBQUE=",
        "timestamp": 1598827811958,
        "brokerInTime": 1598827811958, 
        "brokerOutTime": 1598827811959, 
        "properties": {
          "index": "1",
          "doAlarm": "false",
          "myCustomProperty": "value"
        }
      },
      { 
        "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1---mq---us-east-2.amazonaws.com.rproxy.goskope.com-37557-1234520418293-4:1:1:1:1",
        "messageType": "jms/bytes-message",
        "deliveryMode": 1,
        "replyTo": null,
        "type": null,
        "expiration": "60000",
        "priority": 2,
        "correlationId": "myJMSCoID1",
        "redelivered": false,
        "destination": { 
          "physicalName": "testQueue" 
        },
        "data":"LQaGQ82S48k=",
        "timestamp": 1598827811958,
        "brokerInTime": 1598827811958, 
        "brokerOutTime": 1598827811959, 
        "properties": {
          "index": "1",
          "doAlarm": "false",
          "myCustomProperty": "value"
        }
      }
   ]
}
```

```
{
  "eventSource": "aws:rmq",
  "eventSourceArn": "arn:aws:mq:us-east-2:111122223333:broker:pizzaBroker:b-9bcfa592-423a-4942-879d-eb284b418fc8",
  "rmqMessagesByQueue": {
    "pizzaQueue::/": [
      {
        "basicProperties": {
          "contentType": "text/plain",
          "contentEncoding": null,
          "headers": {
            "header1": {
              "bytes": [
                118,
                97,
                108,
                117,
                101,
                49
              ]
            },
            "header2": {
              "bytes": [
                118,
                97,
                108,
                117,
                101,
                50
              ]
            },
            "numberInHeader": 10
          },
          "deliveryMode": 1,
          "priority": 34,
          "correlationId": null,
          "replyTo": null,
          "expiration": "60000",
          "messageId": null,
          "timestamp": "Jan 1, 1970, 12:33:41 AM",
          "type": null,
          "userId": "AIDACKCEVSQ6C2EXAMPLE",
          "appId": null,
          "clusterId": null,
          "bodySize": 80
        },
        "redelivered": false,
        "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ=="
      }
    ]
  }
}
```
RabbitMQ の例では、`pizzaQueue` は RabbitMQ キューの名前、`/` は仮想ホストの名前です。メッセージを受信すると、イベントソースは `pizzaQueue::/` の下にメッセージを一覧表示します。

# Lambda での Amazon MQ イベントソースの設定
<a name="process-mq-messages-with-lambda"></a>

**Topics**
+ [ネットワークセキュリティを設定する](#process-mq-messages-with-lambda-networkconfiguration)
+ [イベントソースマッピングを作成するには](#services-mq-eventsourcemapping)

## ネットワークセキュリティを設定する
<a name="process-mq-messages-with-lambda-networkconfiguration"></a>

イベントソースマッピングを通じて Lambda に Amazon MQ へのフルアクセスを許可するには、ブローカーがパブリックエンドポイント (パブリック IP アドレス) を使用するか、ブローカーを作成した Amazon VPC へのアクセスを提供する必要があります。

Lambda で Amazon MQ を使用する場合は、関数に Amazon VPC 内のリソースへのアクセスを付与する [AWS PrivateLink VPC エンドポイント](https://docs.aws.amazon.com/vpc/latest/privatelink/create-interface-endpoint.html)を作成します。

**注記**  
イベントポーラーにデフォルト (オンデマンド) モードを使用するイベントソースマッピングを持つ関数には、AWS PrivateLink VPC エンドポイントが必要です。イベントソースマッピングが[プロビジョンドモード](invocation-eventsourcemapping.md#invocation-eventsourcemapping-provisioned-mode)を使用している場合は、AWS PrivateLink VPC エンドポイントを設定する必要はありません。

エンドポイントを作成して、次のリソースへのアクセスを提供します。
+  Lambda — Lambda サービスプリンシパルのエンドポイントを作成します。
+  AWS STS — サービスプリンシパルがユーザーに代わってロールを引き受けるために、AWS STS のエンドポイントを作成します。
+  Secrets Manager — ブローカーが Secrets Manager を使用して認証情報を保存する場合は、Secrets Manager のエンドポイントを作成します。

または、Amazon VPC の各パブリックサブネットに NAT ゲートウェイを設定します。詳細については、「[VPC に接続された Lambda 関数にインターネットアクセスを有効にする](configuration-vpc-internet.md)」を参照してください。

Amazon MQ のイベントソースマッピングを作成すると、Lambda は Amazon VPC に設定されたサブネットおよびセキュリティグループに Elastic Network Interface (ENI) が既に存在するかどうかを確認します。Lambda が既存の ENI を検出した場合、再利用しようとします。それ以外の場合、Lambda は新しい ENI を作成し、イベントソースに接続して関数を呼び出します。

**注記**  
Lambda 関数は、Lambda サービスが所有する VPC 内で常に実行されます。関数の VPC 設定はイベントソースマッピングに影響しません。Lambda がイベントソースに接続する方法を判定するのは、イベントソースのネットワーク設定のみです。

ブローカーを含む Amazon VPC のセキュリティグループを設定します。デフォルトでは、Amazon MQ は以下のポートを使用します。`61617` (Amazon MQ for ActiveMQ)、および `5671` (Amazon MQ for RabbitMQ)。
+ インバウンドルール – イベントソースに関連付けられたセキュリティグループに対してデフォルトのブローカーポート上のすべてのトラフィックを許可します。または、自己参照セキュリティグループルールを使用して、同じセキュリティグループ内のインスタンスからのアクセスを許可することもできます。
+ アウトバウンドルール – 関数が AWS サービスと通信する必要がある場合、外部送信先のポート `443` 上のすべてのトラフィックを許可します。または、自己参照セキュリティグループルールを使用して、他の AWS サービスと通信する必要がない場合は、ブローカーへのアクセスを制限することもできます。
+ Amazon VPC エンドポイントのインバウンドルール – Amazon VPC エンドポイントを使用している場合、Amazon VPC エンドポイントに関連付けられたセキュリティグループで、ブローカーのセキュリティグループからポート `443` でインバウンドトラフィックを許可する必要があります。

ブローカーが認証を使用する場合、Secrets Manager エンドポイントのエンドポイントポリシーを制限することもできます。Secrets Manager API を呼び出す場合、Lambda は Lambda サービスプリンシパルではなく、関数ロールを使用します。

**Example VPC エンドポイントポリシー – Secrets Manager エンドポイント**  

```
{
      "Statement": [
          {
              "Action": "secretsmanager:GetSecretValue",
              "Effect": "Allow",
              "Principal": {
                  "AWS": [
                      "arn:aws::iam::123456789012:role/my-role"
                  ]
              },
              "Resource": "arn:aws::secretsmanager:us-west-2:123456789012:secret:my-secret"
          }
      ]
  }
```

Amazon VPC エンドポイントを使用する場合、AWS は API コールをルーティングし、エンドポイントの Elastic Network Interface (ENI) を使用して関数を呼び出します。Lambda サービスプリンシパルは、これらの ENI を使用するすべてのロールおよび関数に対して `lambda:InvokeFunction` を呼び出す必要があります。

デフォルトでは、Amazon VPC エンドポイントには、リソースへの広範なアクセスを許可するオープンな IAM ポリシーが適用されています。そのエンドポイントを使用して必要なアクションを実行するためのベストプラクティスは、これらのポリシーを制限することです。イベントソースマッピングが Lambda 関数を呼び出せるようにするには、VPC エンドポイントポリシーで、Lambda サービスプリンシパルが `sts:AssumeRole` および `lambda:InvokeFunction` を呼び出すことを許可する必要があります。組織内で発生する API コールのみを許可するように VPC エンドポイントポリシーを制限すると、イベントソースマッピングが正しく機能しなくなるため、これらのポリシーには `"Resource": "*"` が必要です。

次の VPC エンドポイントポリシーの例では、AWS STS および Lambda エンドポイントに Lambda サービスプリンシパルの必要なアクセスを付与する方法について示しています。

**Example VPC エンドポイントポリシー - AWS STS エンドポイント**  

```
{
      "Statement": [
          {
              "Action": "sts:AssumeRole",
              "Effect": "Allow",
              "Principal": {
                  "Service": [
                      "lambda.amazonaws.com"
                  ]
              },
              "Resource": "*"
          }
      ]
    }
```

**Example VPC エンドポイントポリシー – Lambda エンドポイント**  

```
{
      "Statement": [
          {
              "Action": "lambda:InvokeFunction",
              "Effect": "Allow",
              "Principal": {
                  "Service": [
                      "lambda.amazonaws.com"
                  ]
              },
              "Resource": "*"
          }
      ]
  }
```

## イベントソースマッピングを作成するには
<a name="services-mq-eventsourcemapping"></a>

[イベントソースマッピング](invocation-eventsourcemapping.md)を作成し、Amazon MQ ブローカーから Lambda 関数にレコードを送信するよう Lambda に通知します。複数のイベントソースマッピングを作成することで、複数の関数で同じデータを処理したり、単一の関数で複数のソースから項目を処理したりできます。

Amazon MQ から読み取るように関数を設定するには、必要なアクセス許可を追加し Lambda コンソールで **MQ** トリガーを作成します。

Amazon MQ ブローカーからレコードを読み取るには、Lambda 関数に次のアクセス許可が必要です。関数[実行ロール](lambda-intro-execution-role.md)にアクセス許可ステートメントを追加して、Lambda に Amazon MQ ブローカーとその基盤となるリソースとやり取りするためのアクセス許可を付与します。
+ [ mq:DescribeBroker ](https://docs.aws.amazon.com/amazon-mq/latest/api-reference/brokers-broker-id.html#brokers-broker-id-http-methods)
+ [secretsmanager:GetSecretValue](https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html)
+ [ec2:CreateNetworkInterface](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_CreateNetworkInterface.html)
+ [ec2:DeleteNetworkInterface](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DeleteNetworkInterface.html)
+ [ec2:DescribeNetworkInterfaces](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeNetworkInterfaces.html)
+ [ec2:DescribeSecurityGroups](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSecurityGroups.html)
+ [ec2:DescribeSubnets](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSubnets.html)
+ [ec2:DescribeVpcs](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeVpcs.html)
+ [logs:CreateLogGroup](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogGroup.html)
+ [logs:CreateLogStream](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogStream.html)
+ [logs:PutLogEvents](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html)

**注記**  
暗号化されたカスタマー管理キーを使用する場合は、`[kms:Decrypt](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-bootstrap-brokers.html#clusters-clusterarn-bootstrap-brokersget)` 権限も追加します。

**アクセス許可を追加してトリガーを作成するには**

1. Lambda コンソールの[関数ページ](https://console.aws.amazon.com/lambda/home#/functions)を開きます。

1. 関数の名前を選択します。

1. **[設定]** タブを開き、次に **[アクセス権限]** をクリックします。

1. **[実行ロール]** で、実行ロールのリンクを選択します。このリンクを選択すると、IAM コンソールでロールが開きます。  
![\[\]](http://docs.aws.amazon.com/ja_jp/lambda/latest/dg/images/execution-role.png)

1. **[アクセス権限を追加]**、**[インラインポリシーを作成]** の順に選択します。  
![\[\]](http://docs.aws.amazon.com/ja_jp/lambda/latest/dg/images/inline-policy.png)

1. **[ポリシーエディター]** で、**[JSON]** を選択します。以下のポリシーを入力します。Amazon MQ ブローカーから読み取るには、関数にこれらのアクセス許可が必要です。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
         {
           "Effect": "Allow",
           "Action": [
             "mq:DescribeBroker",
             "secretsmanager:GetSecretValue",
             "ec2:CreateNetworkInterface",
             "ec2:DeleteNetworkInterface",
             "ec2:DescribeNetworkInterfaces", 
             "ec2:DescribeSecurityGroups",
             "ec2:DescribeSubnets",
             "ec2:DescribeVpcs",
             "logs:CreateLogGroup",
             "logs:CreateLogStream", 
             "logs:PutLogEvents"		
           ],
           "Resource": "*"
         }
       ]
     }
   ```

------
**注記**  
暗号化されたカスタマー管理キーを使用する場合は、`kms:Decrypt` アクセス許可も追加する必要があります。

1. [**次へ**] を選択します。ポリシー名を入力し、**[ポリシーの作成]** を選択します。

1. Lambda コンソールの関数に戻ります。**[関数の概要]** で **[トリガーを追加]** をクリックします。  
![\[\]](http://docs.aws.amazon.com/ja_jp/lambda/latest/dg/images/add-trigger.png)

1. **MQ**トリガータイプを選択します。

1. 必須のオプションを設定し、**[追加]** を選択します。

Lambda では、Amazon MQ イベントソースの以下のオプションがサポートされています。
+ **MQ ブローカー** — Amazon MQ ブローカーを選択します。
+ **バッチサイズ** - 単一のバッチで取得するメッセージの最大数を設定します。
+ **キュー名** - 使用する Amazon MQ キューを入力します。
+ **ソースアクセス設定** — 仮想ホスト情報とブローカーの認証情報を保存する Secrets Manager のシークレットを入力します。
+ **トリガーの有効化** - レコードの処理を停止するトリガーを無効にします。

トリガーを有効または無効にする（または削除する）には、 ** MQ ** トリガーをデザイナーで選択します。トリガーを再設定するには、イベントソースマッピング API 操作を使用します。

# イベントソースマッピングパラメータ
<a name="services-mq-params"></a>

すべての Lambda イベントソースタイプは、同じ[CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) および [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) API オペレーションを共有しています。ただし、Amazon MQ と RabbitMQ に適用されるのは一部のパラメータのみです。


| [Parameter] (パラメータ) | 必須 | デフォルト | メモ | 
| --- | --- | --- | --- | 
|  BatchSize  |  N  |  100  |  最大: 10,000  | 
|  有効  |  N  |  true  | なし | 
|  FunctionName  |  Y  | 該当なし  | なし | 
|  FilterCriteria  |  N  |  該当なし   |  [Lambda が関数に送信するイベントを制御する](invocation-eventfiltering.md)  | 
|  MaximumBatchingWindowInSeconds  |  N  |  500 ミリ秒  |  [バッチ処理動作](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)  | 
|  キュー  |  N  | 該当なし |  消費する Amazon MQ ブローカーの送信先キューの名前。  | 
|  SourceAccessConfigurations  |  N  | 該当なし  |  ActiveMQ の場合、BASIC\$1AUTH 認証情報です。RabbitMQ の場合、BASIC\$1AUTH 認証情報と VIRTUAL\$1HOST 情報の両方を含めることができます。  | 

# Amazon MQ イベントソースからのイベントをフィルタリングする
<a name="with-mq-filtering"></a>

イベントフィルタリングを使用して、Lambda が関数に送信するストリームまたはキューからのレコードを制御することができます。イベントフィルタリングの仕組みに関する一般情報については、「[Lambda が関数に送信するイベントを制御する](invocation-eventfiltering.md)」を参照してください。

このセクションでは、Amazon MQ イベントソースのイベントフィルタリングに焦点を当てます。

**注記**  
Amazon MQ イベントソースマッピングは、 `data` キーでのフィルタリングのみをサポートします。

**Topics**
+ [Amazon MQ イベントフィルタリングの基本](#filtering-AMQ)

## Amazon MQ イベントフィルタリングの基本
<a name="filtering-AMQ"></a>

Amazon MQ メッセージキューには、有効な JSON 形式またはプレーン文字列でメッセージが含まれているとします。レコードの例は次のようになり、`data` フィールドでデータが Base64 でエンコードされた文字列に変換されます。

------
#### [ ActiveMQ ]

```
{ 
    "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1---mq---us-east-2.amazonaws.com.rproxy.goskope.com-37557-1234520418293-4:1:1:1:1", 
    "messageType": "jms/text-message",
    "deliveryMode": 1,
    "replyTo": null,
    "type": null,
    "expiration": "60000",
    "priority": 1,
    "correlationId": "myJMSCoID",
    "redelivered": false,
    "destination": { 
      "physicalName": "testQueue" 
    },
    "data":"QUJDOkFBQUE=",
    "timestamp": 1598827811958,
    "brokerInTime": 1598827811958, 
    "brokerOutTime": 1598827811959, 
    "properties": {
      "index": "1",
      "doAlarm": "false",
      "myCustomProperty": "value"
    }
}
```

------
#### [ RabbitMQ ]

```
{
    "basicProperties": {
        "contentType": "text/plain",
        "contentEncoding": null,
        "headers": {
            "header1": {
                "bytes": [
                  118,
                  97,
                  108,
                  117,
                  101,
                  49
                ]
            },
            "header2": {
                "bytes": [
                  118,
                  97,
                  108,
                  117,
                  101,
                  50
                ]
            },
            "numberInHeader": 10
        },
        "deliveryMode": 1,
        "priority": 34,
        "correlationId": null,
        "replyTo": null,
        "expiration": "60000",
        "messageId": null,
        "timestamp": "Jan 1, 1970, 12:33:41 AM",
        "type": null,
        "userId": "AIDACKCEVSQ6C2EXAMPLE",
        "appId": null,
        "clusterId": null,
        "bodySize": 80
        },
    "redelivered": false,
    "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ=="
}
```

------

Active MQ および Rabbit MQ ブローカーの両方では、イベントフィルタリングを使用して `data` キーを使用するレコードをフィルタリングできます。Amazon MQ キューに次の JSON 形式のメッセージが含まれているとします。

```
{
    "timeout": 0,
    "IPAddress": "203.0.113.254"
}
```

`timeout` フィールドが 0 より大きいレコードのみをフィルタリングするには、`FilterCriteria` オブジェクトは次のようになります。

```
{
    "Filters": [
        {
            "Pattern": "{ \"data\" : { \"timeout\" : [ { \"numeric\": [ \">\", 0] } } ] } }"
        }
    ]
}
```

以下は、わかりやすくするためにプレーン JSON で展開したフィルターの `Pattern` の値を記載しています。

```
{
    "data": {
        "timeout": [ { "numeric": [ ">", 0 ] } ]
        }
}
```

コンソール、AWS CLI、または AWS SAM テンプレートを使用してフィルターを追加できます。

------
#### [ Console ]

コンソールを使用してこのフィルターを追加するには、[イベントソースマッピングへのフィルター条件のアタッチ (コンソール)](invocation-eventfiltering.md#filtering-console) の指示に従い、**[フィルター条件]** に次の文字列を入力します。

```
{ "data" : { "timeout" : [ { "numeric": [ ">", 0 ] } ] } }
```

------
#### [ AWS CLI ]

AWS Command Line Interface (AWS CLI) を使用してこれらのフィルター条件を持つ新しいイベントソースマッピングを作成するには、以下のコマンドを実行します。

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:mq:us-east-2:123456789012:broker:my-broker:b-8ac7cc01-5898-482d-be2f-a6b596050ea8 \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"timeout\" : [ { \"numeric\": [ \">\", 0 ] } ] } }"}]}'
```

これらのフィルター条件を既存のイベントソースマッピングに追加するには、次のコマンドを実行します。

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"timeout\" : [ { \"numeric\": [ \">\", 0 ] } ] } }"}]}'
```

これらのフィルター条件を既存のイベントソースマッピングに追加するには、次のコマンドを実行します。

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"timeout\" : [ { \"numeric\": [ \">\", 0 ] } ] } }"}]}'
```

------
#### [ AWS SAM ]

AWS SAM を使用してこのフィルターを追加するには、イベントソースの YAML テンプレートに次のスニペットを追加します。

```
FilterCriteria:
  Filters:
    - Pattern: '{ "data" : { "timeout" : [ { "numeric": [ ">", 0 ] } ] } }'
```

------

Amazon MQ を使用すると、メッセージがプレーン文字列のレコードをフィルタリングすることもできます。メッセージが「Result:」で始まるレコードのみを処理するとします。`FilterCriteria` オブジェクトは次のようになります。

```
{
    "Filters": [
        {
            "Pattern": "{ \"data\" : [ { \"prefix\": \"Result: \" } ] }"
        }
    ]
}
```

以下は、わかりやすくするためにプレーン JSON で展開したフィルターの `Pattern` の値を記載しています。

```
{
    "data": [
        {
        "prefix": "Result: "
        }
    ]
}
```

コンソール、AWS CLI、または AWS SAM テンプレートを使用してフィルターを追加できます。

------
#### [ Console ]

コンソールを使用してこのフィルターを追加するには、[イベントソースマッピングへのフィルター条件のアタッチ (コンソール)](invocation-eventfiltering.md#filtering-console) の指示に従って **[フィルター条件]** に次の文字列を入力します。

```
{ "data" : [ { "prefix": "Result: " } ] }
```

------
#### [ AWS CLI ]

AWS Command Line Interface (AWS CLI) を使用してこれらのフィルター条件を持つ新しいイベントソースマッピングを作成するには、以下のコマンドを実行します。

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:mq:us-east-2:123456789012:broker:my-broker:b-8ac7cc01-5898-482d-be2f-a6b596050ea8 \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : [ { \"prefix\": \"Result: \" } ] }"}]}'
```

これらのフィルター条件を既存のイベントソースマッピングに追加するには、次のコマンドを実行します。

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : [ { \"prefix\": \"Result: \" } ] }"}]}'
```

------
#### [ AWS SAM ]

AWS SAM を使用してこのフィルターを追加するには、イベントソースの YAML テンプレートに次のスニペットを追加します。

```
FilterCriteria:
  Filters:
    - Pattern: '{ "data" : [ { "prefix": "Result " } ] }'
```

------

Amazon MQ メッセージは UTF-8 でエンコードされた文字列 (プレーン文字列または JSON 形式) である必要があります。これは、Lambda がフィルター条件を適用する前に Amazon MQ のバイト配列を UTF-8 にデコードするためです。メッセージが UTF-16 や ASCII などの別のエンコーディングを使用している場合、またはメッセージ形式が `FilterCriteria` 形式と一致しない場合、Lambda はメタデータフィルターのみを処理します。以下は、特定の動作を要約した表です。


| 着信メッセージの形式 | メッセージプロパティのフィルターパターン形式 | 結果として生じるアクション | 
| --- | --- | --- | 
|  プレーン文字列  |  プレーン文字列  |  Lambda がフィルター条件に基づいてフィルタリングを実行します。  | 
|  プレーン文字列  |  データプロパティのフィルターパターンがない  |  Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。  | 
|  プレーン文字列  |  有効な JSON  |  Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。  | 
|  有効な JSON  |  プレーン文字列  |  Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。  | 
|  有効な JSON  |  データプロパティのフィルターパターンがない  |  Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。  | 
|  有効な JSON  |  有効な JSON  |  Lambda がフィルター条件に基づいてフィルタリングを実行します。  | 
|  UTF-8 以外でエンコードされた文字  |  JSON、プレーン文字列、またはパターンなし  |  Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。  | 

# Amazon MQ イベントソースマッピングエラーのトラブルシューティング
<a name="services-mq-errors"></a>

Lambda 関数で回復不可能なエラーが発生すると、Amazon MQ コンシューマーはレコードの処理を停止します。他のコンシューマーは、同じエラーが発生しない限り、処理を続行できます。コンシューマーが停止した原因を特定するには、`StateTransitionReason` から返された詳細の `EventSourceMapping` フィールドをチェックし、以下のいずれかのコードを探します。

**`ESM_CONFIG_NOT_VALID`**  
イベントソースマッピングの設定が無効です。

**`EVENT_SOURCE_AUTHN_ERROR`**  
Lambda がイベントソースの認証に失敗しました。

**`EVENT_SOURCE_AUTHZ_ERROR`**  
Lambda にイベントソースへのアクセス許可がありません。

**`FUNCTION_CONFIG_NOT_VALID`**  
関数の設定が無効です。

レコードは、サイズが原因で Lambda が削除した場合も、未処理になります。Lambda レコードのサイズ上限は 6 MB です。関数エラー時にメッセージを再配信するには、デッドレターキュー（DLQ）を使用します。詳細については、Apache ActiveMQ ウェブサイトの [Message Redelivery and DLQ Handling](https://activemq.apache.org/message-redelivery-and-dlq-handling) および RabbitMQ ウェブサイトの [Reliability Guide](https://www.rabbitmq.com/reliability.html) を参照してください。

**注記**  
Lambda はカスタム再配信ポリシーをサポートしていません。代わりに Lambda は、Apache ActiveMQ ウェブサイトの [Redelivery Policy](https://activemq.apache.org/redelivery-policy) ページのデフォルト値のポリシーを使用し、`maximumRedeliveries` は 6 に設定します。