翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Pipes のソースとしての Amazon Managed Streaming for Apache Kafka EventBridge トピック
EventBridge Pipes を使用して、Amazon Managed Streaming for Apache Kafka (Amazon MSK) トピックからレコードを受信できます。オプションでこれらのレコードをフィルタリングまたは拡張してから、処理可能な送信先のいずれかに送信できます。パイプを設定するときに選択MSKできる Amazon 固有の設定があります。 EventBridge Pipes は、そのデータを宛先に送信するときに、メッセージブローカーからのレコードの順序を維持します。
Amazon MSK は、Apache Kafka を使用してストリーミングデータを処理するためのアプリケーションを構築および実行するために使用できるフルマネージドサービスです。Amazon は、Apache Kafka を実行しているクラスターのセットアップ、スケーリング、管理MSKを簡素化します。Amazon ではMSK、複数のアベイラビリティーゾーンと AWS Identity and Access Management () によるセキュリティのためにアプリケーションを設定できますIAM。Amazon MSKは、複数のオープンソースバージョンの Kafka をサポートしています。
ソースMSKとしての Amazon は、Amazon Simple Queue Service (Amazon SQS) または Amazon Kinesis の使用と同様に動作します。 EventBridge は、ソースからの新しいメッセージを内部的にポーリングし、ターゲットを同期的に呼び出します。 EventBridge はメッセージをバッチで読み取り、これらをイベントペイロードとして関数に提供します。最大バッチサイズは調整可能です。(デフォルト値は 100 メッセージ)。
Apache Kafka ベースのソースの場合、 はバッチ処理ウィンドウやバッチサイズなどの処理制御パラメータ EventBridge をサポートします。
EventBridge は、パーティションごとにメッセージを順番に読み取ります。が各バッチ EventBridge を処理すると、そのバッチ内のメッセージのオフセットがコミットされます。パイプのターゲットがバッチ内のいずれかのメッセージに対してエラーを返した場合、 は処理が成功するか、メッセージが期限切れになるまで、メッセージのバッチ全体を EventBridge 再試行します。
EventBridge は、ターゲットを呼び出すときに、イベントのメッセージのバッチを送信します。イベントペイロードにはメッセージの配列が含まれています。各配列項目には、Amazon MSKトピックとパーティション識別子の詳細と、タイムスタンプと base64 でエンコードされたメッセージが含まれます。
イベントの例
次のサンプルイベントは、パイプが受信した情報を示しています。このイベントを使用して、イベントパターンを作成およびフィルタリングしたり、入力変換を定義したりできます。すべてのフィールドをフィルタリングできるわけではありません。フィルターできるフィールドの詳細については、「Amazon EventBridge Pipes でのイベントフィルタリング」を参照してください。
[ { "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "eventSourceKey": "mytopic-0", "topic": "mytopic", "partition": "0", "offset": 15, "timestamp": 1545084650987, "timestampType": "CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers": [ { "headerKey": [ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ]
ポーリングとストリームの開始位置
パイプの作成時と更新時のストリームソースポーリングは、最終的に一貫性があることに注意してください。
パイプ作成中、ストリームからのイベントのポーリングが開始されるまでに数分かかること場合があります。
ソースのポーリング構成をパイプで更新している間、ストリームのポーリングイベントを停止して再開するまでに数分かかることがあります。
つまり、LATEST
をストリームの開始位置として指定すると、パイプ作成または更新中に送信されるイベントをパイプが見逃す可能性があります。イベントを見逃さないようにするには、ストリームの開始位置を TRIM_HORIZON
として指定します。
MSK クラスター認証
EventBridge には、Amazon MSKクラスターへのアクセス、レコードの取得、その他のタスクの実行のためのアクセス許可が必要です。Amazon MSK はMSK、クラスターへのクライアントアクセスを制御するためのいくつかのオプションをサポートしています。どの認証方法がいつ使用されるかについての詳細は、「がブートストラップブローカー EventBridge を選択する方法」を参照してください。
クラスターアクセスオプション
非認証アクセス
開発には非認証アクセスのみを使用することをお勧めします。認証されていないアクセスは、クラスターでIAMロールベースの認証が無効になっている場合にのみ機能します。
SASL/SCRAM 認証
Amazon MSKは、Transport Layer Security (SCRAM) 暗号化を使用した Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/TLS) 認証をサポートしています。がクラスターに接続する EventBridge には、認証情報 (サインイン認証情報) を AWS Secrets Manager シークレットに保存します。
Secrets Manager の使用に関する詳細については、「Amazon Managed Streaming for Apache Kafka デベロッパーガイド」の「AWS Secrets Managerを使用したユーザーネームとパスワードの認証」を参照してください。
Amazon MSKは SASL/PLAIN 認証をサポートしていません。
IAM ロールベースの認証
を使用してIAM、MSKクラスターに接続するクライアントの ID を認証できます。MSK クラスターでIAM認証がアクティブで、認証用のシークレットを指定しない場合、 EventBridge は自動的にデフォルトでIAM認証を使用します。IAM ユーザーまたはロールベースのポリシーを作成してデプロイするには、 IAMコンソールまたは を使用しますAPI。詳細については、「Amazon Managed Streaming for Apache Kafka デベロッパーガイド」のIAM「アクセスコントロール」を参照してください。
EventBridge がMSKクラスターに接続し、レコードを読み取り、その他の必要なアクションを実行できるようにするには、パイプの実行ロールに次のアクセス許可を追加します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:
region
:account-id
:cluster/cluster-name
/cluster-uuid
", "arn:aws:kafka:region
:account-id
:topic/cluster-name
/cluster-uuid
/topic-name
", "arn:aws:kafka:region
:account-id
:group/cluster-name
/cluster-uuid
/consumer-group-id
" ] } ] }
これらの許可は、特定のクラスター、トピック、およびグループにスコープできます。詳細については、MSK「Amazon Managed Streaming for Apache Kafka デベロッパーガイド」の「Amazon Kafka アクション」を参照してください。
相互TLS認証
相互 TLS (m TLS) は、クライアントとサーバー間の双方向認証を提供します。クライアントは、サーバーによるクライアントの検証のためにサーバーに証明書を送信し、サーバーは、クライアントによるサーバーの検証のためにクライアントに証明書を送信します。
Amazon の場合MSK、 EventBridge はクライアントとして機能します。(Secrets Manager のシークレットとして) MSKクラスター内のブローカー EventBridge で認証するようにクライアント証明書を設定します。クライアント証明書は、 サーバーのトラストストア内の認証局 (CA) によって署名される必要があります。MSK クラスターはサーバー証明書を に送信 EventBridge して、 でブローカーを認証します EventBridge。サーバー証明書は、 AWS 信頼ストアにある CA によって署名される必要があります。
Amazon MSKでは、自己署名サーバー証明書はサポートされていません。Amazon のすべてのブローカーは、デフォルトで が EventBridge 信頼する Amazon Trust Services CAs
Amazon の mTLS の詳細についてはMSK、「Amazon Managed Streaming for Apache Kafka デベロッパーガイド」の「相互TLS認証」を参照してください。
mTLS シークレットの設定
CLIENT_CERTIFICATE_TLS_AUTH シークレットには、証明書フィールドとプライベートキーフィールドが必要です。暗号化されたプライベートキーの場合、シークレットはプライベートキーのパスワードを必要とします。証明書とプライベートキーはどちらも PEM形式である必要があります。
注記
EventBridge は、PBES1
証明書フィールドには、クライアント証明書で始まり、その後に中間証明書が続き、ルート証明書で終わる証明書のリストが含まれている必要があります。各証明書は、以下の構造を使用した新しい行で始める必要があります。
-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----
Secrets Manager は最大 65,536 バイトのシークレットをサポートします。これは、長い証明書チェーンにも十分な領域です。
プライベートキーは PKCS #8
-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----
暗号化されたプライベートキーには、以下の構造を使用します。
-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----
次の例は、暗号化されたプライベートキーを使用した mTLS 認証用のシークレットの内容を示しています。暗号化されたプライベートキーの場合は、シークレットにプライベートキーのパスワードを含めます。
{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }
がブートストラップブローカー EventBridge を選択する方法
EventBridge は、クラスターで使用できる認証方法と、認証用のシークレットを提供するかどうかに基づいて、ブートストラップブローカーを選択します。mTLS または SASL/ のシークレットを指定するとSCRAM、 はその認証方法 EventBridge を自動的に選択します。シークレットを指定しない場合、 EventBridge はクラスターでアクティブな最も強力な認証方法を選択します。がブローカー EventBridge を選択する際の優先度の順序を、最も強力な認証から最も弱い認証まで次に示します。
-
mTLS (m にシークレットを提供TLS)
-
SASL/SCRAM (SASL/ にシークレットを提供SCRAM)
-
SASL IAM (シークレットが指定されておらず、IAM認証がアクティブ)
-
認証されていない TLS (シークレットが指定されておらず、IAM認証がアクティブではない)
-
プレーンテキスト (シークレットが提供されておらず、IAM認証と未認証の両方TLSがアクティブではない)
注記
が最も安全なブローカータイプに接続 EventBridge できない場合、 は別の (弱い) ブローカータイプに接続しようとしません。より弱いブローカータイプ EventBridge を選択する場合は、クラスターでより強力な認証方法をすべて無効にします。
ネットワーク構成
EventBridge は、Amazon クラスターに関連付けられた Amazon Virtual Private Cloud (Amazon VPC) リソースにアクセスできる必要があります。 MSK
-
Amazon MSKクラスターVPCの にアクセスするには、ソースのサブネットにアウトバウンドインターネットアクセス EventBridge を使用できます。プライベートサブネットの場合、NATゲートウェイでも独自の でもかまいませんNAT。にパブリック IP アドレスNATがあり、インターネットに接続できることを確認します。パブリックサブネットの場合は、VPCエンドポイントを使用する必要があります (以下で説明)。
-
EventBridge Pipes は を介したイベント配信もサポートしているためAWS PrivateLink
、 Amazon Virtual Private Cloud (Amazon VPC) にあるイベントソースから Pipes ターゲットにイベントを送信できます。パブリックインターネットを経由する必要はありません。Pipes を使用すると、インターネットゲートウェイをデプロイしたり、ファイアウォールルールを設定したり、プロキシサーバーを設定したりすることなく、 Amazon Managed Streaming for Apache Kafka (Amazon MSK)、セルフマネージド Apache Kafka、およびプライベートサブネットに存在する Amazon MQ ソースからポーリングできます。VPC エンドポイントを使用して、パブリックサブネットの Kafka クラスターからの配信をサポートすることもできます。 VPC エンドポイントを設定するには、「 ユーザーガイド」のVPC「エンドポイントの作成AWS PrivateLink 」を参照してください。サービス名で、 を選択します
com.amazonaws.
。region
.pipes-data
Amazon VPC セキュリティグループを次のルールで設定します (最低)。
-
インバウンドルール – ソースに指定されたセキュリティグループの Amazon MSKブローカーポート上のすべてのトラフィックを許可します。
-
アウトバウンドルール – すべての送信先に対して、ポート 443 上のすべてのトラフィックを許可します。ソースに指定されたセキュリティグループの Amazon MSKブローカーポート上のすべてのトラフィックを許可します。
ブローカーポートには以下が含まれます。
プレーンテキストの場合は 9092
の 9094 TLS
の 9096 SASL
の 9098 IAM
注記
Amazon VPC設定は、Amazon MSK APIを通じて検出できます。セットアップ中に設定する必要はありません。
カスタマイズ可能なコンシューマーグループ ID
Apache Kafka をソースとして設定する場合、コンシューマーグループIDを指定できます。このコンシューマーグループ ID は、パイプを結合したい Apache Kafka コンシューマーグループの既存の識別子です。この機能を使用して、進行中の Apache Kafka レコード処理設定を他のコンシューマーから に移行できます EventBridge。
コンシューマーグループ ID を指定し、そのコンシューマーグループ内に他のアクティブなポーラーが存在する場合、Apache Kafka はすべてのコンシューマーにメッセージを配信します。つまり、Apache Kafka EventBridge トピックのすべてのメッセージを受信しません。トピック内のすべてのメッセージを EventBridge 処理したい場合は、そのコンシューマーグループ内の他のポーラーをオフにします。
さらに、コンシューマーグループ ID を指定し、Apache Kafka が同じ ID を持つ有効な既存のコンシューマーグループを検索すると、 はパイプの StartingPosition
パラメータ EventBridge を無視します。代わりに、 はコンシューマーグループのコミットされたオフセットに従ってレコードの処理 EventBridge を開始します。コンシューマーグループ ID を指定し、Apache Kafka が既存のコンシューマーグループを見つけられない場合、 は指定された を使用してソース EventBridge を設定しますStartingPosition
。
指定するコンシューマーグループ ID は、すべての Apache Kafka イベントソースの中で一意でなければなりません。コンシューマーグループ ID を指定してパイプを作成した後は、この値を更新することはできません。
Amazon MSKソースの自動スケーリング
最初に Amazon MSKソースを作成すると、 は Apache Kafka トピック内のすべてのパーティションを処理するために 1 つのコンシューマーを EventBridge 割り当てます。各コンシューマーには、増加したワークロードを処理するために同時実行される複数のプロセッサがあります。さらに、 は、ワークロードに基づいてコンシューマーの数 EventBridge を自動的にスケールアップまたはスケールダウンします。各パーティションでメッセージの順序を保つため、コンシューマーの最大数は、トピック内のパーティションあたり 1 つとなっています。
1 分間隔で、 はトピック内のすべてのパーティションのコンシューマーオフセットラグ EventBridge を評価します。遅延が高すぎる場合、パーティションはメッセージを処理 EventBridge できるよりも速くメッセージを受信しています。必要に応じて、コンシューマーをトピック EventBridge に追加または削除します。コンシューマーを追加または削除するスケーリングプロセスは、評価から 3 分以内に行われます。
ターゲットが過負荷の場合、 はコンシューマーの数 EventBridge を減らします。このアクションにより、コンシューマーが取得しパイプに送信するメッセージの数が減り、パイプへのワークロードが軽減されます。