サポートされているストリーミングコネクタ - Amazon EMR

サポートされているストリーミングコネクタ

ストリーミングコネクタは、ストリーミングソースからのデータの読み取りを容易にし、ストリーミングシンクにデータを書き込むこともできます。

サポートされているストリーミングコネクタは次のとおりです。

Amazon Kinesis Data Streams コネクタ

Apache Spark 用の Amazon Kinesis Data Streams コネクタを使用すると、Amazon Kinesis Data Streams との間でデータを消費し、データを書き込むストリーミングアプリケーションとパイプラインを構築できます。コネクタは、シャードあたり最大 2 MB/秒の専用の読み取りスループットレートで、拡張ファンアウト消費をサポートします。デフォルトでは、Amazon EMR Serverless 7.1.0 以降にはコネクタが組み込まれているため、追加のパッケージを構築またはダウンロードする必要はありません。コネクタの詳細については、GitHub の spark-sql-kinesis-connector ページを参照してください。

以下は、Kinesis Data Streams コネクタの依存関係を使用してジョブ実行を開始する方法の例です。

aws emr-serverless start-job-run \ --application-id <APPLICATION_ID> \ --execution-role-arn <JOB_EXECUTION_ROLE> \ --mode 'STREAMING' \ --job-driver '{ "sparkSubmit": { "entryPoint": "s3://<Kinesis-streaming-script>", "entryPointArguments": ["s3://<DOC-EXAMPLE-BUCKET-OUTPUT>/output"], "sparkSubmitParameters": "--conf spark.executor.cores=4 --conf spark.executor.memory=16g --conf spark.driver.cores=4 --conf spark.driver.memory=16g --conf spark.executor.instances=3 --jars /usr/share/aws/kinesis/spark-sql-kinesis/lib/spark-streaming-sql-kinesis-connector.jar" } }'

Kinesis Data Streams に接続するには、VPC アクセスを使用して EMR Serverless アプリケーションを設定し、VPC エンドポイントを使用してプライベートアクセスを許可するか、NAT Gateway を使用してパブリックアクセスを取得する必要があります。詳細については、Configuring VPC access を参照してください。また、ジョブのランタイムロールに、必要とするデータストリームにアクセスするために必要な読み取り権限と書き込み権限があることを確認する必要があります。ジョブランタイムロールの設定方法の詳細については、「Job runtime roles for Amazon EMR Serverless」を参照してください。必要なすべてのアクセス許可の完全なリストについては、GitHub の spark-sql-kinesis-connector ページを参照してください。

Apache Kafka コネクタ

Spark 構造化ストリーミング用の Apache Kafka コネクタは Spark コミュニティのオープンソースコネクタであり、Maven リポジトリで利用できます。このコネクタにより、Spark 構造化ストリーミングアプリケーションや、セルフマネージド Apache Kafka と Amazon Managed Streaming for Apache Kafka との間でデータを読み書きするのがが容易になります。コネクタの詳細については、Apache Spark ドキュメントの「Structured Streaming + Kafka Integration Guide」を参照してください。

次の例は、ジョブ実行リクエストに Kafka コネクタを含める方法を示しています。

aws emr-serverless start-job-run \ --application-id <APPLICATION_ID> \ --execution-role-arn <JOB_EXECUTION_ROLE> \ --mode 'STREAMING' \ --job-driver '{ "sparkSubmit": { "entryPoint": "s3://<Kafka-streaming-script>", "entryPointArguments": ["s3://<DOC-EXAMPLE-BUCKET-OUTPUT>/output"], "sparkSubmitParameters": "--conf spark.executor.cores=4 --conf spark.executor.memory=16g --conf spark.driver.cores=4 --conf spark.driver.memory=16g --conf spark.executor.instances=3 --packages org.apache.spark:spark-sql-kafka-0-10_2.12:<KAFKA_CONNECTOR_VERSION>" } }'

Apache Kafka コネクタのバージョンは、EMR Serverless リリースバージョンと対応する Spark バージョンによって異なります。正しい Kafka バージョンを確認するには、「Structured Streaming + Kafka Integration Guide」を参照してください。

IAM 認証で Amazon Managed Streaming for Apache Kafka を使用するには、Kafka コネクタが IAM で Amazon MSK に接続できるように別の依存関係を含める必要があります。詳細については、GitHub の aws-msk-iam-auth リポジトリを参照してください。また、ジョブのランタイムロールに必要な IAM アクセス許可があることを確認する必要があります。次の例は、IAM 認証でコネクタを使用する方法を示しています。

aws emr-serverless start-job-run \ --application-id <APPLICATION_ID> \ --execution-role-arn <JOB_EXECUTION_ROLE> \ --mode 'STREAMING' \ --job-driver '{ "sparkSubmit": { "entryPoint": "s3://<Kafka-streaming-script>", "entryPointArguments": ["s3://<DOC-EXAMPLE-BUCKET-OUTPUT>/output"], "sparkSubmitParameters": "--conf spark.executor.cores=4 --conf spark.executor.memory=16g --conf spark.driver.cores=4 --conf spark.driver.memory=16g --conf spark.executor.instances=3 --packages org.apache.spark:spark-sql-kafka-0-10_2.12:<KAFKA_CONNECTOR_VERSION>,software.amazon.msk:aws-msk-iam-auth:<MSK_IAM_LIB_VERSION>" } }'

Amazon MSK から Kafka コネクタと IAM 認証ライブラリを使用するには、VPC アクセスを使用して EMR Serverless アプリケーションを設定する必要があります。サブネットにはインターネットアクセスが必要で、Maven の依存関係にアクセスするには NAT Gateway を使用する必要があります。詳細については、Configuring VPC access を参照してください。Kafka クラスターにアクセスするには、サブネットにネットワーク接続が必要です。これは、Kafka クラスターがセルフマネージド型かどうか、Amazon Managed Streaming for Apache Kafka を使用しているかどうか、に関係なく当てはまります。