Amazon EventBridge パイプの作成 - Amazon EventBridge

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon EventBridge パイプの作成

EventBridge Pipes を使用すると、高度なイベント変換やエンリッチメントなど、ソースとターゲット間の point-to-point 統合を作成できます。 EventBridge パイプを作成するには、次の手順を実行します。

を使用してパイプを作成する方法については AWS CLI、 AWS CLI コマンドリファレンス「create-pipe」を参照してください。

ソースの指定

まず、パイプでイベントを受信するソースを指定します。

コンソールを使用してパイプソースを指定するには
  1. で Amazon EventBridge コンソールを開きますhttps://console.aws.amazon.com/events/

  2. ナビゲーションペインで、[パイプ] を選択します。

  3. [Create pipe] (パイプの作成) を選択します。

  4. パイプの名前を入力します。

  5. (オプション) パイプの説明を追加します。

  6. [パイプを構築] タブの [ソース] で、このパイプに指定するソースのタイプを選択し、ソースを設定します。

    設定のプロパティは、選択するソースの種類によって異なります。

    Confluent
    コンソールを使用して Confluent Cloud ストリームをソースとして設定するには
    1. ソース で、Confluent Cloud を選択します。

    2. [Bootstrap servers] (ブートストラップサーバー) には、ブローカーの host:port ペアアドレスを入力します。

    3. [Topic name] (トピック名) には、読み取り元のトピック名を入力します。

    4. (オプション) でVPC、VPC必要な を選択します。次に、VPCサブネット で、目的のサブネットを選択します。VPC セキュリティグループ では、セキュリティグループを選択します。

    5. 認証 - オプション の場合認証の使用 をオンにし、以下を実行します。

      1. [Authentication method] (認証方法) で、認証タイプを選択します。

      2. [Secret key] (シークレットキー) で、シークレットキーを選択します。

      詳細については、Confluent ドキュメントの「Confluent Cloud リソースへの認証」を参照してください。

    6. (オプション) [Additional settings - optional] (追加設定 - オプション) では、以下を実行します。

      1. [Starting position] (開始位置) で、次のいずれかを選択します。

        • Latest (最新) — シャード内の最新のレコードでストリームの読み取りを開始します。

        • Trim horizon (水平トリム) — シャード内の最後のトリミングされていないレコードからストリームの読み取りを開始します。これはシャード内の最も古いレコードです。

      2. [Batch size - optional] (バッチサイズ - オプション) に、各バッチの最大レコード数を入力します。デフォルト値は 100 です。

      3. [Batch window - optional] (バッチウィンドウ - オプション) の場合、次に進む前にレコードを収集する最大秒数を入力します。

    DynamoDB
    1. [ソース] で、[DynamoDB] を選択します。

    2. DynamoDB ストリームの場合は、ソースとして使用するストリームを選択します。

    3. [Starting position] (開始位置) で、次のいずれかを選択します。

      • Latest (最新) — シャード内の最新のレコードでストリームの読み取りを開始します。

      • Trim horizon (水平トリム) — シャード内の最後のトリミングされていないレコードからストリームの読み取りを開始します。これはシャード内の最も古いレコードです。

    4. (オプション) [Additional settings - optional] (追加設定 - オプション) では、以下を実行します。

      1. [Batch size - optional] (バッチサイズ - オプション) に、各バッチの最大レコード数を入力します。デフォルト値は 100 です。

      2. [Batch window - optional] (バッチウィンドウ - オプション) の場合、次に進む前にレコードを収集する最大秒数を入力します。

      3. [Concurrent batches per shard - optional] (シャードあたりの同時バッチ数 - オプション) の場合、同じシャードから同時に読み取ることができるバッチの数を入力します。

      4. [On partial batch item failure] (バッチ項目の一部に障害が発生した場合) では、次の項目を選択します。

        • AUTOMATIC_BISECT – すべてのレコードが処理されるか、バッチに 1 つの失敗したメッセージが残るまで、各バッチを半分にし、半分ずつ再試行します。

        注記

        AUTOMATIC_BISECT を選択しない場合、特定の失敗したレコードを返すことができ、それらのみが再試行されます。

    Kinesis
    コンソールを使用して Kinesis ソースを設定するには
    1. [ソース] で、[Kinesis] を選択します。

    2. [Kinesis stream] (Kinesis ストリーム) には、ソースとして使用するストリームを選択します。

    3. [Starting position] (開始位置) で、次のいずれかを選択します。

      • Latest (最新) — シャード内の最新のレコードでストリームの読み取りを開始します。

      • Trim horizon (水平トリム) — シャード内の最後のトリミングされていないレコードからストリームの読み取りを開始します。これはシャード内の最も古いレコードです。

      • [At timestamp] (タイムスタンプ時) — 指定した時刻からストリームの読み取りを開始します。タイムスタンプ で、YYYY/MM/DD および hh:mm:ss 形式を使用してデータと時刻を入力します。

    4. (オプション) [Additional settings - optional] (追加設定 - オプション) では、以下を実行します。

      1. [Batch size - optional] (バッチサイズ - オプション) に、各バッチの最大レコード数を入力します。デフォルト値は 100 です。

      2. (オプション) [Batch window - optional] (バッチウィンドウ - オプション) の場合、次に進む前にレコードを収集する最大秒数を入力します。

      3. [Concurrent batches per shard - optional] (シャードあたりの同時バッチ数 - オプション) の場合、同じシャードから同時に読み取ることができるバッチの数を入力します。

      4. [On partial batch item failure] (バッチ項目の一部に障害が発生した場合) では、次の項目を選択します。

        • AUTOMATIC_BISECT – 各バッチを半分にし、すべてのレコードが処理されるか、バッチに 1 つの失敗したメッセージが残るまで、半分ずつ再試行します。

        注記

        AUTOMATIC_BISECT を選択しない場合、特定の失敗したレコードを返すことができ、それらのみが再試行されます。

    Amazon MQ
    コンソールを使用して Amazon MQ ソースを設定するには
    1. [ソース] で、[Amazon MQ] を選択します。

    2. Amazon MQ ブローカーの場合は、ソースとして使用するストリームを選択します。

    3. [Queue name] (キュー名) には、読み取り元のトピック名を入力します。

    4. 認証方法 では、BASIC_AUTH を選択します。

    5. [Secret key] (シークレットキー) で、シークレットキーを選択します。

    6. (オプション) [Additional settings - optional] (追加設定 - オプション) では、以下を実行します。

      1. [Batch size - optional] (バッチサイズ - オプション) に、各バッチの最大メッセージ数を入力します。デフォルト値は 100 です。

      2. [Batch window - optional] (バッチウィンドウ - オプション) の場合、次に進む前にレコードを収集する最大秒数を入力します。

    Amazon MSK
    コンソールを使用して Amazon MSKソースを設定するには
    1. ソース で、Amazon MSKを選択します。

    2. Amazon MSKクラスター では、使用するクラスターを選択します。

    3. [Topic name] (トピック名) には、読み取り元のトピック名を入力します。

    4. (オプション) [Consumer Group ID - optional] (コンシューマーグループ ID) で、パイプが参加するコンシューマーグループの ID を入力します。

    5. (オプション) [Authentication - optional] (認証 - オプション) で、[Use Authentication] (認証を使用) をオンにして次の操作を行います。

      1. [Authentication method] (認証方法) で、認証タイプを選択します。

      2. [Secret key] (シークレットキー) で、シークレットキーを選択します。

    6. (オプション) [Additional settings - optional] (追加設定 - オプション) では、以下を実行します。

      1. [Batch size - optional] (バッチサイズ - オプション) に、各バッチの最大レコード数を入力します。デフォルト値は 100 です。

      2. [Batch window - optional] (バッチウィンドウ - オプション) の場合、次に進む前にレコードを収集する最大秒数を入力します。

      3. [Starting position] (開始位置) で、次のいずれかを選択します。

        • Latest (最新) — シャード内の最新のレコードでトピックの読み取りを開始します。

        • Trim horizon (水平トリム) — シャード内の最後のトリミングされていないレコードからトピックの読み取りを開始します。これはシャード内の最も古いレコードです。

          注記

          Trim horizon (水平トリム) はApache Kafka の[Earliest] (最も早い時間) と同じです。

    Self managed Apache Kafka
    コンソールを使用してセルフマネージド型の Apache Kafka ソースを設定するには
    1. [ソース] で、[セルフマネージド型の Apache Kafka] を選択します。

    2. [Bootstrap servers] (ブートストラップサーバー) には、ブローカーの host:port ペアアドレスを入力します。

    3. [Topic name] (トピック名) には、読み取り元のトピック名を入力します。

    4. (オプション) でVPC、VPC必要な を選択します。次に、VPCサブネット で、目的のサブネットを選択します。VPC セキュリティグループ では、セキュリティグループを選択します。

    5. (オプション) [Authentication - optional] (認証 - オプション) で、[Use Authentication] (認証を使用) をオンにして次の操作を行います。

      1. [Authentication method] (認証方法) で、認証タイプを選択します。

      2. [Secret key] (シークレットキー) で、シークレットキーを選択します。

    6. (オプション) [Additional settings - optional] (追加設定 - オプション) では、以下を実行します。

      1. [Starting position] (開始位置) で、次のいずれかを選択します。

        • Latest (最新) — シャード内の最新のレコードでストリームの読み取りを開始します。

        • Trim horizon (水平トリム) — シャード内の最後のトリミングされていないレコードからストリームの読み取りを開始します。これはシャード内の最も古いレコードです。

      2. [Batch size - optional] (バッチサイズ - オプション) に、各バッチの最大レコード数を入力します。デフォルト値は 100 です。

      3. [Batch window - optional] (バッチウィンドウ - オプション) の場合、次に進む前にレコードを収集する最大秒数を入力します。

    Amazon SQS
    コンソールを使用して Amazon SQSソースを設定するには
    1. [Source] (ソース) で、SQS を選択します。

    2. SQS キュー では、使用するキューを選択します。

    3. (オプション) [Additional settings - optional] (追加設定 - オプション) では、以下を実行します。

      1. [Batch size - optional] (バッチサイズ - オプション) に、各バッチの最大レコード数を入力します。デフォルト値は 100 です。

      2. [Batch window - optional] (バッチウィンドウ - オプション) の場合、次に進む前にレコードを収集する最大秒数を入力します。

イベントフィルタリングの設定 (オプション)

パイプにフィルタリングを追加して、ソースからイベントのサブセットのみをターゲットに送信できます。

コンソールを使用してフィルタリングを設定するには
  1. [Filtering] (フィルタリング) を選択します。

  2. [サンプルイベント - オプション] には、イベントパターンの作成に使用できるサンプルイベントが表示されます。また、[独自のイベントを入力] を選択して独自のイベントを入力することもできます。

  3. [イベントパターン] に、イベントのフィルタリングに使用するイベントパターンを入力します。フィルターの構築の詳細については、「」を参照してくださいAmazon EventBridge Pipes でのイベントフィルタリング

    以下は、[City] (都市) フィールドの値が [Seattle] のイベントのみを送信するイベントパターンの例です。

    { "data": { "City": ["Seattle"] } }

イベントがフィルタリングされたので、オプションのエンリッチメントとパイプのターゲットを追加できます。

イベントのエンリッチメントの定義 (オプション)

Lambda 関数、 AWS Step Functions ステートマシン、Amazon API Gateway、またはAPI送信先にエンリッチメント用のイベントデータを送信できます。

エンリッチメントを選択するには
  1. [Enrichment] (エンリッチメント) を選択します。

  2. [Details] (詳細) の [Service] (サービス) で、エンリッチメントに使用したいサービスと関連設定を選択します。

データを送信する前にデータを変換して拡張することもできます。

(オプション) Input Transformer を定義するには
  1. [Enrichment Input Transformer - optional] (エンリッチメント Input Transformer - オプション) を選択してください。

  2. [Sample events/Event Payload] (サンプルイベント/イベントペイロード)では、サンプルイベントタイプを選択します。

  3. [Transformer] (トランスフォーマー) には、サンプルイベントからフィールドへ参照する"Event happened at <$.detail.field>."場所<$.detail.field> など、トランスフォーマー構文を入力します。また、サンプルイベントのフィールドをダブルクリックしてトランスフォーマーに追加することもできます。

  4. [Output] (出力) で、出力が希望どおりであることを確認します。

これでデータのフィルタリングと拡張が完了したので、次はイベントデータを送信するターゲットを定義する必要があります。

ターゲットの設定

ターゲットを設定するには
  1. [Target] を選択します。

  2. [Details] (詳細) の [Target service] (ターゲットサービス) で、ターゲットを選択します。表示されるフィールドは、選択したターゲットによって異なります。必要に応じて、このターゲットタイプに固有の情報を入力します。

ターゲットに送信する前にデータを変換することもできます。

(オプション) Input Transformer を定義するには
  1. [Target Input Transformer - optional] (ターゲットインプットトランスフォーマー - オプション) を選択します。

  2. [Sample events/Event Payload] (サンプルイベント/イベントペイロード)では、サンプルイベントタイプを選択します。

  3. [Transformer] (トランスフォーマー) には、サンプルイベントからフィールドへ参照する"Event happened at <$.detail.field>."場所<$.detail.field> など、トランスフォーマー構文を入力します。また、サンプルイベントのフィールドをダブルクリックしてトランスフォーマーに追加することもできます。

  4. [Output] (出力) で、出力が希望どおりであることを確認します。

これでパイプが設定されたので、その設定が正しく設定されていることを確認します。

パイプ設定の指定

パイプはデフォルトで有効ですが、無効にすることもできます。パイプのアクセス許可の指定、パイプのロギングの設定、タグの追加を行うこともできます。

パイプ設定を行うには
  1. [Pipe settings] (パイプ設定) タブを選択します。

  2. デフォルトでは、新しく作成されたパイプは作成されるとすぐに有効になります。無効なパイプを作成する場合は、[Activation] (アクティベーション) の [Activate pipe]] (パイプを有効化) で [Active] (有効) をオフにします。

  3. [Permissions] (アクセス許可) の [Execution role] (実行ロール) で、次のいずれかを実行します。

    1. このパイプの新しい実行ロール EventBridge を作成するには、この特定のリソースの新しいロールを作成するを選択します。[Role name] (ロール名) では、ロール名をオプションで編集できます。

    2. 既存の実行ロールを使用するには、[Use an existing role] (既存のロールを使用する) を選択します。[Logging role] (ログ記録ロール) でロールを選択します。

  4. (オプション) パイプソースとして Kinesis または DynamoDB ストリームを指定している場合は、再試行ポリシーとデッドレターキュー () を設定できますDLQ。

    [再試行ポリシーとデッドレターキュー – オプション] で、次の操作を行います。

    [Retry policy] (再試行ポリシー) で、以下の作業を行います。

    1. 再試行ポリシーを有効にする場合は、[Retry] (再試行) をオンにします。デフォルトでは、新しく作成されたパイプは、再試行ポリシーが有効になっていません。

    2. 最大イベント有効期間 に、1 分 (00:01) から 24 時間 (24:00) の間の値を入力します。

    3. 再試行 で、0~185 の数値を入力します。

    4. デッドレターキュー (DLQ) を使用する場合は、デッドレターキュー をオンにし、任意の方法を選択し、使用するキューまたはトピックを選択します。デフォルトでは、新しく作成されたパイプは を使用しませんDLQ。

  5. パイプデータの暗号化 EventBridge に使用する KMS key を選択します。

    EventBridge の使用方法の詳細については KMS keys、「」を参照してください保管中の暗号化

    • Use AWS 所有のキー for を選択して EventBridge 、 を使用してデータを暗号化します AWS 所有のキー。

      これは、複数の AWS アカウントで使用できるように EventBridge を所有および管理 KMS key している AWS 所有のキー です。通常、リソースを保護する暗号化キーを監査または制御する必要がない限り、 AWS 所有のキー は適切な選択です。

      これがデフォルトです。

    • Use カスタマー管理キー for を選択して EventBridge 、 カスタマー管理キー 指定または作成した を使用してデータを暗号化します。

      カスタマーマネージドキー は、作成、所有、管理する AWS アカウント KMS keys にあります。これらの を完全に制御できます KMS keys。

      1. 既存の を指定するか カスタマー管理キー、新しい を作成する KMS keyを選択します。

        EventBridge は、キーステータスと、指定された に関連付けられているキーエイリアスを表示します カスタマー管理キー。

  6. (オプション) [ログ - オプション] で、 EventBridge Pipe から、サポートされているサービスにログ情報を送信する方法 (ログの設定方法を含む) を設定できます。

    パイプレコードのログ記録の詳細については、「Amazon EventBridge Pipes のパフォーマンスのログ記録」を参照してください。

    CloudWatch ログは、ログレベルと同様に、デフォルトでERRORログ送信先として選択されます。そのため、デフォルトで、 EventBridge Pipes は、詳細ERRORレベルを含む CloudWatch ログレコードを送信する新しいロググループを作成します。

    EventBridge Pipes がサポートされているログ送信先にログレコードを送信できるようにするには、次の手順を実行します。

    1. [ログ - オプション] で、ログレコードの配信先を選択します。

    2. ログレベル では、ログレコード EventBridge に含める の情報レベルを選択します。ERROR ログレベルはデフォルトで選択されています。

      詳細については、「 EventBridge Pipes ログレベルの指定」を参照してください。

    3. ログレコードにイベントペイロード情報、サービスリクエスト、レスポンス情報を含める場合は EventBridge 、実行データを含めるを選択します。

      詳細については、「 EventBridge Pipes ログに実行データを含める」を参照してください。

    4. 選択したログの送信先をそれぞれ設定します。

      CloudWatch Logs ログの場合、 CloudWatch ログで以下を実行します。

      • CloudWatch ロググループ では、新しいロググループ EventBridge を作成するか、既存のロググループを選択するかARN、既存のロググループの を指定することができます。

      • 新しいロググループを作成する場合は、必要に応じてロググループ名を編集します。

      CloudWatch ログはデフォルトで選択されています。

      Firehose ストリームログの場合は、Firehose ストリームログ で Firehose ストリームを選択します。

      Amazon S3 ログの場合、S3 ログで以下を実行します。

      • ログの送信先として使用するバケットの名前を入力します。

      • バケット所有者の AWS アカウント ID を入力します。

      • EventBridge で S3 オブジェクトの作成時に使用するプレフィックステキストを入力します。

        詳細については、「Amazon Simple Storage Service ユーザーガイド」の「プレフィックスを使用してオブジェクトを整理する」を参照してください。

      • S3 ログレコード EventBridge のフォーマット方法を選択します。

  7. (オプション) [Tags - optional] (タグ-オプション) で、[Add new tag] (新しいタグを追加する) を選択し、ルールに対する 1 つまたは複数のタグを入力します。詳細については、「Amazon でのリソースのタグ付け EventBridge」を参照してください。

  8. [Create pipe] (パイプの作成) を選択します。

構成パラメータの検証

パイプが作成されると、 は次の設定パラメータ EventBridge を検証します。

  • IAM ロール – パイプの作成後にパイプのソースを変更できないため、 は指定されたIAMロールがソースにアクセスできる EventBridge ことを確認します。

    注記

    EventBridge は、パイプの作成後に更新できるため、エンリッチメントまたはターゲットに対して同じ検証を実行しません。

  • バッチ処理 – EventBridge ソースのバッチサイズがターゲットの最大バッチサイズを超えないことを確認します。その場合、 にはより低いバッチサイズ EventBridge が必要です。さらに、ターゲットがバッチ処理をサポートしていない場合、ソース EventBridge の でバッチ処理を設定することはできません。

  • エンリッチメント – 1 のバッチサイズのみがサポートされているため、APIゲートウェイおよびAPI送信先エンリッチメントのバッチサイズが 1 であることを EventBridge 検証します。