Migration des Spark Kinesis-Connectors auf SDK 2.x für Amazon 7.0 EMR - Amazon EMR

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Migration des Spark Kinesis-Connectors auf SDK 2.x für Amazon 7.0 EMR

Das AWS SDK bietet eine Vielzahl von Bibliotheken für die Interaktion mit AWS Cloud-Computing-Diensten, z. B. die Verwaltung von Anmeldeinformationen APIs und die Verbindung zu S3- und Kinesis-Diensten. Der Spark-Kinesis-Konnektor wird verwendet, um Daten aus Kinesis Data Streams zu verarbeiten, und die empfangenen Daten werden in der Ausführungs-Engine von Spark transformiert und verarbeitet. Derzeit baut dieser Konnektor auf 1.x von AWS SDK und K inesis-client-library () KCL auf.

Im Rahmen der AWS SDK 2.x-Migration wird auch der Spark Kinesis-Connector entsprechend aktualisiert, sodass er mit 2.x ausgeführt werden kann. SDK In der Amazon EMR 7.0-Version enthält Spark das SDK 2.x-Upgrade, das in der Community-Version von Apache Spark noch nicht verfügbar ist. Wenn Sie den Spark Kinesis-Connector aus einer Version unter 7.0 verwenden, müssen Sie Ihre Anwendungscodes für die Ausführung auf SDK 2.x migrieren, bevor Sie zu Amazon EMR 7.0 migrieren können.

Migrationshandbücher

In diesem Abschnitt werden die Schritte zur Migration einer Anwendung zum aktualisierten Spark-Kinesis-Konnektor beschrieben. Es enthält Anleitungen für die Migration zur Kinesis Client Library (KCL) 2.x, AWS Anmeldeinformationsanbieter und AWS Service-Clients in 2.x. AWS SDK Als Referenz enthält es auch ein WordCountBeispielprogramm, das den Kinesis-Konnektor verwendet.

Migration KCL von 1.x zu 2.x

  • Ebene und Dimensionen der Metriken in KinesisInputDStream

    Wenn Sie einen KinesisInputDStream instanziieren, können Sie die Metrikebene und die Dimensionen für den Stream steuern. Das folgende Beispiel zeigt, wie Sie diese Parameter mit 1.x anpassen können: KCL

    import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel val kinesisStream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet) .build()

    In KCL 2.x haben diese Konfigurationseinstellungen unterschiedliche Paketnamen. Für die Migration zu 2.x:

    1. Ändern Sie die Importanweisungen für com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration und com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel zu software.amazon.kinesis.metrics.MetricsLevel bzw. software.amazon.kinesis.metrics.MetricsUtil.

      // import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel import software.amazon.kinesis.metrics.MetricsLevel // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import software.amazon.kinesis.metrics.MetricsUtil
    2. Ersetzen Sie die Zeile metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet durch metricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)

    Im Folgenden finden Sie eine aktualisierte Version von KinesisInputDStream mit benutzerdefinierten Metrikebene und Metrikdimensionen:

    import software.amazon.kinesis.metrics.MetricsLevel import software.amazon.kinesis.metrics.MetricsUtil val kinesisStream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)) .build()
  • Meldungshandler-Funktion in KinesisInputDStream

    Bei der Instanziierung eines KinesisInputDStream können Sie auch eine „Meldungshandler-Funktion“ angeben, die einen Kinesis-Datensatz verwendet und ein generisches Objekt T zurückgibt, falls Sie andere in einem Datensatz enthaltene Daten wie den Partitionsschlüssel verwenden möchten.

    In KCL 1.x lautet die Signatur der Nachrichtenhandler-Funktion:Record => T, wobei Record steht. com.amazonaws.services.kinesis.model.Record In KCL 2.x wurde die Signatur des Handlers in:KinesisClientRecord => T, wo KinesisClientRecord ist, geändert. software.amazon.kinesis.retrieval.KinesisClientRecord

    Es folgt ein Beispiel für die Bereitstellung eines Message-Handlers in KCL 1.x:

    import com.amazonaws.services.kinesis.model.Record def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5 val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(Seconds(10)) .storageLevel(StorageLevel.MEMORY_ONLY) .buildWithMessageHandler(addFive)

    Für die Migration des Meldungshandlers:

    1. Ändern Sie die Importanweisung für com.amazonaws.services.kinesis.model.Record zu software.amazon.kinesis.retrieval.KinesisClientRecord.

      // import com.amazonaws.services.kinesis.model.Record import software.amazon.kinesis.retrieval.KinesisClientRecord
    2. Aktualisieren Sie die Methodensignatur des Meldungshandlers.

      //def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5 def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5

    Es folgt ein aktualisiertes Beispiel für die Bereitstellung des Message-Handlers in KCL 2.x:

    import software.amazon.kinesis.retrieval.KinesisClientRecord def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5 val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(Seconds(10)) .storageLevel(StorageLevel.MEMORY_ONLY) .buildWithMessageHandler(addFive)

    Weitere Informationen zur Migration von KCL 1.x auf 2.x finden Sie unter Migration von Verbrauchern von 1.x auf 2.x. KCL KCL

Migrieren von Anbietern von Anmeldeinformationen von 1.x auf 2.x AWSAWS SDK

Anbieter von Anmeldeinformationen werden verwendet, um AWS Anmeldeinformationen für Interaktionen mit zu erhalten. AWS In SDK 2.x gibt es mehrere Schnittstellen- und Klassenänderungen im Zusammenhang mit den Anbietern von Anmeldeinformationen, die Sie hier finden. Der Spark Kinesis-Konnektor hat eine Schnittstelle (org.apache.spark.streaming.kinesis.SparkAWSCredentials) und Implementierungsklassen definiert, die die Version 1.x von AWS Credential Providern zurückgeben. Diese Anbieter von Anmeldeinformationen werden bei der Initialisierung von Kinesis-Clients benötigt. Wenn Sie die Methode beispielsweise SparkAWSCredentials.provider in den Anwendungen verwenden, müssten Sie die Codes aktualisieren, um die 2.x-Version der Credential Provider nutzen zu können. AWS

Im Folgenden finden Sie ein Beispiel für die Verwendung der Anmeldeinformationsanbieter in 1.x: AWS SDK

import org.apache.spark.streaming.kinesis.SparkAWSCredentials import com.amazonaws.auth.AWSCredentialsProvider val basicSparkCredentials = SparkAWSCredentials.builder .basicCredentials("accessKey", "secretKey") .build() val credentialProvider = basicSparkCredentials.provider assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
Um zu 2.x zu migrierenSDK:
  1. Ändern Sie die Importanweisung für com.amazonaws.auth.AWSCredentialsProvider zu software.amazon.awssdk.auth.credentials.AwsCredentialsProvider

    //import com.amazonaws.auth.AWSCredentialsProvider import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
  2. Aktualisieren Sie die verbleibenden Codes, die diese Klasse verwenden.

    import org.apache.spark.streaming.kinesis.SparkAWSCredentials import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider val basicSparkCredentials = SparkAWSCredentials.builder .basicCredentials("accessKey", "secretKey") .build() val credentialProvider = basicSparkCredentials.provider assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")

Migration von AWS Service-Clients von AWS SDK 1.x auf 2.x

AWS Service-Clients haben in 2.x unterschiedliche Paketnamen (d. h.software.amazon.awssdk), wohingegen in Version 1.x andere Paketnamen verwendet werden. SDK com.amazonaws Weitere Informationen über die Änderungen in dieser Version finden Sie hier. Wenn Sie diese Service-Clients in den Codes verwenden, müssten Sie die Clients entsprechend migrieren.

Es folgt ein Beispiel für die Erstellung eines Clients in SDK 1.x:

import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient import com.amazonaws.services.dynamodbv2.document.DynamoDB AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient(); AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
Für die Migration zu 2.x:
  1. Ändern Sie die Importanweisungen für Service-Clients. Nehmen wir als Beispiel DynamoDB-Clients. Sie müssten com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient oder com.amazonaws.services.dynamodbv2.document.DynamoDB zu software.amazon.awssdk.services.dynamodb.DynamoDbClient ändern.

    // import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient // import com.amazonaws.services.dynamodbv2.document.DynamoDB import software.amazon.awssdk.services.dynamodb.DynamoDbClient
  2. Die Codes aktualisieren, die die Clients initialisieren

    // AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient(); // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient(); DynamoDbClient ddbClient = DynamoDbClient.create(); DynamoDbClient ddbClient = DynamoDbClient.builder().build();

    Weitere Informationen zur Migration AWS SDK von 1.x zu 2.x finden Sie unter Was ist der Unterschied zwischen dem AWS SDK für Java 1.x und 2.x

Codebeispiele für Streaming-Anwendungen

import java.net.URI import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider import software.amazon.awssdk.http.apache.ApacheHttpClient import software.amazon.awssdk.services.kinesis.KinesisClient import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest import software.amazon.awssdk.regions.Region import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil} import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest import org.apache.spark.streaming.kinesis.KinesisInputDStream object KinesisWordCountASLSDKV2 { def main(args: Array[String]): Unit = { val appName = "demo-app" val streamName = "demo-kinesis-test" val endpointUrl = "https://kinesis.us-west-2.amazonaws.com" val regionName = "us-west-2" // Determine the number of shards from the stream using the low-level Kinesis Client // from the AWS Java SDK. val credentialsProvider = DefaultCredentialsProvider.create require(credentialsProvider.resolveCredentials() != null, "No AWS credentials found. Please specify credentials using one of the methods specified " + "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html") val kinesisClient = KinesisClient.builder() .credentialsProvider(credentialsProvider) .region(Region.US_WEST_2) .endpointOverride(URI.create(endpointUrl)) .httpClientBuilder(ApacheHttpClient.builder()) .build() val describeStreamRequest = DescribeStreamRequest.builder() .streamName(streamName) .build() val numShards = kinesisClient.describeStream(describeStreamRequest) .streamDescription .shards .size // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard. // This is not a necessity; if there are less receivers/DStreams than the number of shards, // then the shards will be automatically distributed among the receivers and each receiver // will receive data from multiple shards. val numStreams = numShards // Spark Streaming batch interval val batchInterval = Milliseconds(2000) // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information // on sequence number of records that have been received. Same as batchInterval for this // example. val kinesisCheckpointInterval = batchInterval // Setup the SparkConfig and StreamingContext val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2") val ssc = new StreamingContext(sparkConfig, batchInterval) // Create the Kinesis DStreams val kinesisStreams = (0 until numStreams).map { i => KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)) .build() } // Union all the streams val unionStreams = ssc.union(kinesisStreams) // Convert each line of Array[Byte] to String, and split into words val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" ")) // Map each word to a (word, 1) tuple so we can reduce by key to count the words val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) // Print the first 10 wordCounts wordCounts.print() // Start the streaming context and await termination ssc.start() ssc.awaitTermination() } }

Überlegungen zur Verwendung des aktualisierten Spark-Kinesis-Konnektors

  • Wenn Ihre Anwendungen die JDK Version Kinesis-producer-library mit einer niedrigeren Version als 11 verwenden, können Sie auf Ausnahmen stoßen wie. java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter Dies liegt daran, dass EMR 7.0 standardmäßig JDK 17 enthält und J2EE-Module seit Java 11+ aus den Standardbibliotheken entfernt wurden. Dies könnte behoben werden, indem die folgende Abhängigkeit zur POM-Datei hinzugefügt wird. Ersetzen Sie die Bibliotheksversion nach Bedarf durch eine passende.

    <dependency> <groupId>javax.xml.bind</groupId> <artifactId>jaxb-api</artifactId> <version>${jaxb-api.version}</version> </dependency>
  • Das Spark Kinesis-Connector-JAR befindet sich nach der Erstellung eines EMR Clusters unter diesem Pfad: /usr/lib/spark/connector/lib/