Cookie の設定を選択する

当社は、当社のサイトおよびサービスを提供するために必要な必須 Cookie および類似のツールを使用しています。当社は、パフォーマンス Cookie を使用して匿名の統計情報を収集することで、お客様が当社のサイトをどのように利用しているかを把握し、改善に役立てています。必須 Cookie は無効化できませんが、[カスタマイズ] または [拒否] をクリックしてパフォーマンス Cookie を拒否することはできます。

お客様が同意した場合、AWS および承認された第三者は、Cookie を使用して便利なサイト機能を提供したり、お客様の選択を記憶したり、関連する広告を含む関連コンテンツを表示したりします。すべての必須ではない Cookie を受け入れるか拒否するには、[受け入れる] または [拒否] をクリックしてください。より詳細な選択を行うには、[カスタマイズ] をクリックしてください。

DynamoDB と Amazon Managed Streaming for Apache Kafka の統合

フォーカスモード
DynamoDB と Amazon Managed Streaming for Apache Kafka の統合 - Amazon DynamoDB

Amazon Managed Streaming for Apache Kafka (Amazon MSK) では、フルマネージドで可用性の高い Apache Kafka サービスを使用して、ストリーミングデータをリアルタイムで簡単に取り込み、処理できます。

Apache Kafka は、ストリーミングデータをリアルタイムで取り込み、処理するために最適化された分散型データストアです。Kafka はレコードのストリームを処理し、レコードが生成された順序でレコードのストリームを効果的に保存し、レコードのストリームを公開およびサブスクライブできます。

これらの機能により、Apache Kafka はリアルタイムストリーミングデータパイプラインの構築によく使用されます。データパイプラインは、あるシステムから別のシステムにデータを確実に処理して移動し、それぞれが異なるユースケースをサポートする複数のデータベースの使用を容易にすることで、目的別データベース戦略を採用する上で重要な部分となる可能性があります。

Amazon DynamoDB は、キーバリューまたはドキュメントデータモデルを使用し、一貫した 1 桁ミリ秒のパフォーマンスで無制限のスケーラビリティを求めるアプリケーションをサポートするために、これらのデータパイプラインの一般的なターゲットです。

仕組み

Amazon MSK と DynamoDB の統合では、Lambda 関数を使用して Amazon MSK からのレコードを消費し、DynamoDB に書き込みます。

Amazon MSK と DynamoDB の統合と、Amazon MSK が Lambda 関数を使用してレコードを消費し、DynamoDB に書き込む方法を示す図。

Lambda は、Amazon MSK からの新しいメッセージを内部的にポーリングした後、ターゲットの Lambda 関数を同期的に呼び出します。Lambda 関数のイベントペイロードには、Amazon MSK からのメッセージのバッチが含まれています。Amazon MSK と DynamoDB の統合のために、Lambda 関数はこれらのメッセージを DynamoDB に書き込みます。

Amazon MSK と DynamoDB の統合を設定する

注記

この例で使用されているリソースは、次の GitHub リポジトリでダウンロードできます。

以下の手順は、Amazon MSK と Amazon DynamoDB 間のサンプル統合を設定する方法を示しています。この例では、モノのインターネット (IoT) デバイスによって生成され、Amazon MSK に取り込まれたデータを表します。データが Amazon MSK に取り込まれると、Apache Kafka と互換性のある分析サービスまたはサードパーティーツールと統合できるため、さまざまな分析ユースケースが可能になります。DynamoDB を統合することで、個々のデバイスレコードのキー値も検索できます。

この例では、Python スクリプトが IoT センサーデータを Amazon MSK に書き込む方法を示します。次に、Lambda 関数はパーティションキー「deviceid」を持つ項目を DynamoDB に書き込みます。

提供された CloudFormation テンプレートは、Amazon S3 バケット、Amazon VPC、Amazon MSK クラスター、データオペレーションをテストするための AWS CloudShell のリソースを作成します。

テストデータを生成するには、Amazon MSK トピックを作成し、DynamoDB テーブルを作成します。マネジメントコンソールからセッションマネージャーを使用して CloudShell のオペレーティングシステムにログインし、Python スクリプトを実行できます。

CloudFormation テンプレートを実行したら、次のオペレーションを実行して、このアーキテクチャの構築を完了できます。

  1. CloudFormation テンプレートを実行して S3bucket.yaml S3 バケットを作成します。以降のスクリプトまたはオペレーションについては、同じリージョンで実行してください。CloudFormation スタックの名前として ForMSKTestS3 を入力します。

    CloudFormation コンソールのスタック作成画面を示す画像。

    これが完了したら、出力の下にある S3 バケット名出力を書き留めます。ステップ 3 でこの S3 バケット名出力が必要になります。

  2. 作成した S3 バケットにダウンロードした ZIP ファイル fromMSK.zip をアップロードします。

    S3 コンソールでファイルをアップロードできる場所を示す画像。
  3. CloudFormation のテンプレート VPC.yaml を実行して、VPC、Amazon MSK クラスター、Lambda 関数を作成します。パラメータ入力画面で S3 バケットを求められたら、ステップ 1 で作成した S3 バケット名を入力します。CloudFormation スタック名を ForMSKTestVPC に設定します。

    CloudFormation スタックの詳細を指定するときに入力する必要があるフィールドを示す画像。
  4. CloudShell で Python スクリプトを実行するための環境を準備します。AWS Management Console で CloudShell を使用できます。CloudShell の使用の詳細については、「AWS CloudShell の開始方法」を参照してください。CloudShell を起動したら、Amazon MSK クラスターに接続するために作成した VPC に属する CloudShell を作成します。プライベートサブネットに CloudShell を作成します。以下のフィールドに入力します。

    1. 名前 - 任意の名前に設定できます。例: MSK-VPC

    2. VPC - MSKTest を選択します。

    3. サブネット - MSKTest プライベートサブネット (AZ1) を選択します。

    4. SecurityGroup - ForMSKSecurityGroup を選択します。

    指定する必要があるフィールドを含む CloudShell 環境を示す画像。

    プライベートサブネットに属する CloudShell が開始されたら、次のコマンドを実行します。

    pip install boto3 kafka-python aws-msk-iam-sasl-signer-python
  5. S3 バケットから Python スクリプトをダウンロードします。

    aws s3 cp s3://[YOUR-BUCKET-NAME]/pythonScripts.zip ./ unzip pythonScripts.zip
  6. マネジメントコンソールを確認し、Python スクリプトでブローカー URL とリージョン値の環境変数を設定します。マネジメントコンソールで Amazon MSK クラスターブローカーエンドポイントを確認します。

    TODO
  7. CloudShell で環境変数を設定します。米国西部 (オレゴン) を使用している場合:

    export AWS_REGION="us-west-2" export MSK_BROKER="boot-YOURMSKCLUSTER.c3.kafka-serverless.ap-southeast-1.amazonaws.com:9098"
  8. 以下の Python スクリプトを実行します。

    Amazon MSK トピックを作成します。

    python ./createTopic.py

    DynamoDB テーブルを作成します。

    python ./createTable.py

    Amazon MSK トピックにテストデータを書き込みます。

    python ./kafkaDataGen.py
  9. 作成した Amazon MSK、Lambda、DynamoDB リソースの CloudWatch メトリクスを確認し、DynamoDB Data Explorer を使用して device_status テーブルに保存されているデータを検証して、すべてのプロセスが正しく実行されたことを確認します。各プロセスがエラーなしで実行される場合は、CloudShell から Amazon MSK に書き込まれたテストデータも DynamoDB に書き込まれていることを確認できます。

    DynamoDB コンソールと、スキャンの実行時に返される項目がどのように表示されるかを示す画像。
  10. この例が完了したら、このチュートリアルで作成したリソースを削除します。ForMSKTestS3ForMSKTestVPC の 2 つの CloudFormation スタックを削除します。スタックの削除が正常に完了すると、すべてのリソースが削除されます。

次のステップ

注記

この例に従ってリソースを作成した場合は、予期しない料金が発生しないように、リソースを削除してください。

統合では、Amazon MSK と DynamoDB をリンクして、ストリームデータを OLTP ワークロードをサポートできるようにするアーキテクチャを特定しました。ここから、DynamoDB と Amazon OpenSearch を連携することで、より複雑な検索を実現できます。より複雑なイベント駆動型のニーズには EventBridge との統合を検討し、スループットを高め、レイテンシー要件を低くするには Amazon Managed Service for Apache Flink などの拡張機能を検討してください。

プライバシーサイト規約Cookie の設定
© 2025, Amazon Web Services, Inc. or its affiliates.All rights reserved.