Spark Kinesis コネクタを Amazon SDK 7.0 用の 2.x EMR に移行する - Amazon EMR

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Spark Kinesis コネクタを Amazon SDK 7.0 用の 2.x EMR に移行する

は、認証情報の管理、S3 APIsおよび AWS SDKKinesis サービスへの接続など、クラウドコンピューティングサービスとやり取り AWS するための豊富な および ライブラリのセットを提供します。Spark Kinesis コネクターは Kinesis データストリームからデータを消費するために使用され、受信したデータは Spark の実行エンジンで変換および処理されます。現在、このコネクタは および K inesis-client-library () の AWS SDK 1.x 上に構築されていますKCL。

2.x 移行の一環として AWS SDK、Spark Kinesis コネクタも 2.x SDK で実行されるように更新されます。Amazon EMR7.0 リリースでは、Spark SDK にはコミュニティバージョンの Apache Spark ではまだ利用できない 2.x アップグレードが含まれています。7.0 より前のリリースの Spark Kinesis コネクタを使用する場合は、Amazon 7.0 SDK に移行する前に、アプリケーションコードを 2.x EMR で実行するように移行する必要があります。

移行ガイド

このセクションでは、アップグレードされた Spark Kinesis コネクタにアプリケーションを移行する手順について説明します。これには、Kinesis Client Library (KCL) 2.x、 AWS 認証情報プロバイダー、および 2.x AWS SDK AWS のサービスクライアントへの移行に関するガイドが含まれています。参考までに、Kinesis コネクタを使用するサンプルWordCountプログラムも含まれています。

1.x KCLから 2.x への移行

  • KinesisInputDStream のメトリクスレベルとディメンション

    KinesisInputDStream をインスタンス化すると、ストリームのメトリクスレベルとディメンションを制御できます。次の例は、これらのパラメータを 1.x KCL でカスタマイズする方法を示しています。

    import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel val kinesisStream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet) .build()

    KCL 2.x では、これらの設定のパッケージ名は異なります。2.x に移行するには:

    1. com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration および com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel のインポートステートメントをそれぞれ software.amazon.kinesis.metrics.MetricsLevel および software.amazon.kinesis.metrics.MetricsUtil に変更します。

      // import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel import software.amazon.kinesis.metrics.MetricsLevel // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import software.amazon.kinesis.metrics.MetricsUtil
    2. metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSetmetricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME) に置き換えます。

    以下は、カスタマイズされたメトリクスレベルとメトリクスディメンションを備えた KinesisInputDStream の更新バージョンです。

    import software.amazon.kinesis.metrics.MetricsLevel import software.amazon.kinesis.metrics.MetricsUtil val kinesisStream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)) .build()
  • KinesisInputDStream のメッセージハンドラー関数

    KinesisInputDStream をインスタンス化するときに、パーティションキーなどの Record に含まれる他のデータを使用したい場合に備えて、Kinesis Record を取得して汎用オブジェクト T を返す「メッセージハンドラー関数」を提供することもできます。

    KCL 1.x では、メッセージハンドラー関数の署名は でRecord => T、レコードは ですcom.amazonaws.services.kinesis.model.Record。KCL 2.x では、ハンドラーの署名は に変更されます。ここでKinesisClientRecord => T KinesisClientRecord、 は ですsoftware.amazon.kinesis.retrieval.KinesisClientRecord

    1.x KCL でメッセージハンドラーを提供する例を次に示します。

    import com.amazonaws.services.kinesis.model.Record def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5 val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(Seconds(10)) .storageLevel(StorageLevel.MEMORY_ONLY) .buildWithMessageHandler(addFive)

    メッセージハンドラーを移行するには:

    1. com.amazonaws.services.kinesis.model.Record のインポートステートメントを software.amazon.kinesis.retrieval.KinesisClientRecord に変更します。

      // import com.amazonaws.services.kinesis.model.Record import software.amazon.kinesis.retrieval.KinesisClientRecord
    2. メッセージハンドラーのメソッドシグネチャを更新します。

      //def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5 def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5

    KCL 2.x でメッセージハンドラーを提供する最新の例を次に示します。

    import software.amazon.kinesis.retrieval.KinesisClientRecord def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5 val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(Seconds(10)) .storageLevel(StorageLevel.MEMORY_ONLY) .buildWithMessageHandler(addFive)

    1.x から 2.x KCL への移行の詳細については、「1.x から KCL 2.x KCL へのコンシューマーの移行」を参照してください。

AWS 認証情報プロバイダーを 1.x から AWS SDK 2.x に移行する

認証情報プロバイダーは、 とのやり取りの AWS 認証情報を取得するために使用されます AWS。2SDK.x の認証情報プロバイダーに関連するインターフェイスとクラスの変更がいくつかあります。こちら を参照してください。Spark Kinesis コネクタは、 AWS 認証情報プロバイダーの 1.x バージョンを返すインターフェイス (org.apache.spark.streaming.kinesis.SparkAWSCredentials) と実装クラスを定義しています。これらの認証情報プロバイダーは、Kinesis クライアントを初期化する際に必要です。例えば、SparkAWSCredentials.providerアプリケーションで メソッドを使用している場合は、 AWS 認証情報プロバイダーの 2.x バージョンを使用するようにコードを更新する必要があります。

1.x で AWS SDK認証情報プロバイダーを使用する例を次に示します。

import org.apache.spark.streaming.kinesis.SparkAWSCredentials import com.amazonaws.auth.AWSCredentialsProvider val basicSparkCredentials = SparkAWSCredentials.builder .basicCredentials("accessKey", "secretKey") .build() val credentialProvider = basicSparkCredentials.provider assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
2.x SDK に移行するには:
  1. com.amazonaws.auth.AWSCredentialsProvider のインポートステートメントを software.amazon.awssdk.auth.credentials.AwsCredentialsProvider に変更します。

    //import com.amazonaws.auth.AWSCredentialsProvider import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
  2. このクラスを使用する残りのコードを更新します。

    import org.apache.spark.streaming.kinesis.SparkAWSCredentials import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider val basicSparkCredentials = SparkAWSCredentials.builder .basicCredentials("accessKey", "secretKey") .build() val credentialProvider = basicSparkCredentials.provider assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")

AWS サービスクライアントの 1.x から AWS SDK 2.x への移行

AWS サービスクライアントは 2.x ( などsoftware.amazon.awssdk) で異なるパッケージ名を持ちますが、1.x SDK は を使用しますcom.amazonaws。クライアント変更に関する詳細については、こちらを参照してください。コードでこれらのサービスクライアントを使用している場合は、それに応じてクライアントを移行する必要があります。

1.x SDK でクライアントを作成する例を次に示します。

import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient import com.amazonaws.services.dynamodbv2.document.DynamoDB AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient(); AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
2.x に移行するには:
  1. サービスクライアントのインポートステートメントを変更します。DynamoDB クライアントを例にとってみましょう。com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient または com.amazonaws.services.dynamodbv2.document.DynamoDBsoftware.amazon.awssdk.services.dynamodb.DynamoDbClient に変更する必要があります。

    // import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient // import com.amazonaws.services.dynamodbv2.document.DynamoDB import software.amazon.awssdk.services.dynamodb.DynamoDbClient
  2. クライアントを初期化するコードを更新してください

    // AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient(); // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient(); DynamoDbClient ddbClient = DynamoDbClient.create(); DynamoDbClient ddbClient = DynamoDbClient.builder().build();

    1.x から 2.x への移行の詳細については、 AWS SDK「 for Java 1.x と 2.x の違い AWS SDK」を参照してください。

ストリーミングアプリケーションのコード例

import java.net.URI import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider import software.amazon.awssdk.http.apache.ApacheHttpClient import software.amazon.awssdk.services.kinesis.KinesisClient import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest import software.amazon.awssdk.regions.Region import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil} import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest import org.apache.spark.streaming.kinesis.KinesisInputDStream object KinesisWordCountASLSDKV2 { def main(args: Array[String]): Unit = { val appName = "demo-app" val streamName = "demo-kinesis-test" val endpointUrl = "https://kinesis.us-west-2.amazonaws.com" val regionName = "us-west-2" // Determine the number of shards from the stream using the low-level Kinesis Client // from the AWS Java SDK. val credentialsProvider = DefaultCredentialsProvider.create require(credentialsProvider.resolveCredentials() != null, "No AWS credentials found. Please specify credentials using one of the methods specified " + "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html") val kinesisClient = KinesisClient.builder() .credentialsProvider(credentialsProvider) .region(Region.US_WEST_2) .endpointOverride(URI.create(endpointUrl)) .httpClientBuilder(ApacheHttpClient.builder()) .build() val describeStreamRequest = DescribeStreamRequest.builder() .streamName(streamName) .build() val numShards = kinesisClient.describeStream(describeStreamRequest) .streamDescription .shards .size // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard. // This is not a necessity; if there are less receivers/DStreams than the number of shards, // then the shards will be automatically distributed among the receivers and each receiver // will receive data from multiple shards. val numStreams = numShards // Spark Streaming batch interval val batchInterval = Milliseconds(2000) // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information // on sequence number of records that have been received. Same as batchInterval for this // example. val kinesisCheckpointInterval = batchInterval // Setup the SparkConfig and StreamingContext val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2") val ssc = new StreamingContext(sparkConfig, batchInterval) // Create the Kinesis DStreams val kinesisStreams = (0 until numStreams).map { i => KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)) .build() } // Union all the streams val unionStreams = ssc.union(kinesisStreams) // Convert each line of Array[Byte] to String, and split into words val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" ")) // Map each word to a (word, 1) tuple so we can reduce by key to count the words val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) // Print the first 10 wordCounts wordCounts.print() // Start the streaming context and await termination ssc.start() ssc.awaitTermination() } }

アップグレードされた Spark Kinesis コネクターを使用する際の考慮事項

  • アプリケーションが 11 より前のJDKバージョンKinesis-producer-libraryで を使用している場合、 のような例外が発生する可能性がありますjava.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter。これは、7.0 EMR にはデフォルトで 17 JDK 個が付属しており、Java 11 以降 J2EE モジュールが標準ライブラリから削除されているために発生します。これは pom ファイルに次の依存関係を追加することで修正できます。必要に応じてライブラリバージョンを適切なもので交換します。

    <dependency> <groupId>javax.xml.bind</groupId> <artifactId>jaxb-api</artifactId> <version>${jaxb-api.version}</version> </dependency>
  • Spark Kinesis コネクタ jar は、EMRクラスターの作成後にこのパスの下にあります。 /usr/lib/spark/connector/lib/