Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Migration des Spark Kinesis-Connectors auf SDK 2.x für Amazon 7.0 EMR
Das AWS SDK bietet eine Vielzahl von Bibliotheken für die Interaktion mit AWS Cloud-Computing-Diensten, z. B. die Verwaltung von Anmeldeinformationen APIs und die Verbindung zu S3- und Kinesis-Diensten. Der Spark-Kinesis-Konnektor wird verwendet, um Daten aus Kinesis Data Streams zu verarbeiten, und die empfangenen Daten werden in der Ausführungs-Engine von Spark transformiert und verarbeitet. Derzeit baut dieser Konnektor auf 1.x von AWS SDK und K inesis-client-library () KCL auf.
Im Rahmen der AWS SDK 2.x-Migration wird auch der Spark Kinesis-Connector entsprechend aktualisiert, sodass er mit 2.x ausgeführt werden kann. SDK In der Amazon EMR 7.0-Version enthält Spark das SDK 2.x-Upgrade, das in der Community-Version von Apache Spark noch nicht verfügbar ist. Wenn Sie den Spark Kinesis-Connector aus einer Version unter 7.0 verwenden, müssen Sie Ihre Anwendungscodes für die Ausführung auf SDK 2.x migrieren, bevor Sie zu Amazon EMR 7.0 migrieren können.
Migrationshandbücher
In diesem Abschnitt werden die Schritte zur Migration einer Anwendung zum aktualisierten Spark-Kinesis-Konnektor beschrieben. Es enthält Anleitungen für die Migration zur Kinesis Client Library (KCL) 2.x, AWS Anmeldeinformationsanbieter und AWS Service-Clients in 2.x. AWS SDK Als Referenz enthält es auch ein WordCount
Themen
Migration KCL von 1.x zu 2.x
-
Ebene und Dimensionen der Metriken in
KinesisInputDStream
Wenn Sie einen
KinesisInputDStream
instanziieren, können Sie die Metrikebene und die Dimensionen für den Stream steuern. Das folgende Beispiel zeigt, wie Sie diese Parameter mit 1.x anpassen können: KCLimport com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel val kinesisStream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet) .build()
In KCL 2.x haben diese Konfigurationseinstellungen unterschiedliche Paketnamen. Für die Migration zu 2.x:
-
Ändern Sie die Importanweisungen für
com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
undcom.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
zusoftware.amazon.kinesis.metrics.MetricsLevel
bzw.software.amazon.kinesis.metrics.MetricsUtil
.// import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel import software.amazon.kinesis.metrics.MetricsLevel // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import software.amazon.kinesis.metrics.MetricsUtil
-
Ersetzen Sie die Zeile
metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet
durchmetricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)
Im Folgenden finden Sie eine aktualisierte Version von
KinesisInputDStream
mit benutzerdefinierten Metrikebene und Metrikdimensionen:import software.amazon.kinesis.metrics.MetricsLevel import software.amazon.kinesis.metrics.MetricsUtil val kinesisStream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)) .build()
-
-
Meldungshandler-Funktion in
KinesisInputDStream
Bei der Instanziierung eines
KinesisInputDStream
können Sie auch eine „Meldungshandler-Funktion“ angeben, die einen Kinesis-Datensatz verwendet und ein generisches Objekt T zurückgibt, falls Sie andere in einem Datensatz enthaltene Daten wie den Partitionsschlüssel verwenden möchten.In KCL 1.x lautet die Signatur der Nachrichtenhandler-Funktion:
Record => T
, wobei Record steht.com.amazonaws.services.kinesis.model.Record
In KCL 2.x wurde die Signatur des Handlers in:KinesisClientRecord => T
, wo KinesisClientRecord ist, geändert.software.amazon.kinesis.retrieval.KinesisClientRecord
Es folgt ein Beispiel für die Bereitstellung eines Message-Handlers in KCL 1.x:
import com.amazonaws.services.kinesis.model.Record def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5 val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(Seconds(10)) .storageLevel(StorageLevel.MEMORY_ONLY) .buildWithMessageHandler(addFive)
Für die Migration des Meldungshandlers:
-
Ändern Sie die Importanweisung für
com.amazonaws.services.kinesis.model.Record
zusoftware.amazon.kinesis.retrieval.KinesisClientRecord
.// import com.amazonaws.services.kinesis.model.Record import software.amazon.kinesis.retrieval.KinesisClientRecord
-
Aktualisieren Sie die Methodensignatur des Meldungshandlers.
//def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5 def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
Es folgt ein aktualisiertes Beispiel für die Bereitstellung des Message-Handlers in KCL 2.x:
import software.amazon.kinesis.retrieval.KinesisClientRecord def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5 val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(Seconds(10)) .storageLevel(StorageLevel.MEMORY_ONLY) .buildWithMessageHandler(addFive)
Weitere Informationen zur Migration von KCL 1.x auf 2.x finden Sie unter Migration von Verbrauchern von 1.x auf 2.x. KCL KCL
-
Migrieren von Anbietern von Anmeldeinformationen von 1.x auf 2.x AWSAWS SDK
Anbieter von Anmeldeinformationen werden verwendet, um AWS Anmeldeinformationen für Interaktionen mit zu erhalten. AWS In SDK 2.x gibt es mehrere Schnittstellen- und Klassenänderungen im Zusammenhang mit den Anbietern von Anmeldeinformationen, die Sie hier finden.org.apache.spark.streaming.kinesis.SparkAWSCredentials
) und Implementierungsklassen definiert, die die Version 1.x von AWS Credential Providern zurückgeben. Diese Anbieter von Anmeldeinformationen werden bei der Initialisierung von Kinesis-Clients benötigt. Wenn Sie die Methode beispielsweise SparkAWSCredentials.provider
in den Anwendungen verwenden, müssten Sie die Codes aktualisieren, um die 2.x-Version der Credential Provider nutzen zu können. AWS
Im Folgenden finden Sie ein Beispiel für die Verwendung der Anmeldeinformationsanbieter in 1.x: AWS SDK
import org.apache.spark.streaming.kinesis.SparkAWSCredentials import com.amazonaws.auth.AWSCredentialsProvider val basicSparkCredentials = SparkAWSCredentials.builder .basicCredentials("accessKey", "secretKey") .build() val credentialProvider = basicSparkCredentials.provider assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
Um zu 2.x zu migrierenSDK:
-
Ändern Sie die Importanweisung für
com.amazonaws.auth.AWSCredentialsProvider
zusoftware.amazon.awssdk.auth.credentials.AwsCredentialsProvider
//import com.amazonaws.auth.AWSCredentialsProvider import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
-
Aktualisieren Sie die verbleibenden Codes, die diese Klasse verwenden.
import org.apache.spark.streaming.kinesis.SparkAWSCredentials import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider val basicSparkCredentials = SparkAWSCredentials.builder .basicCredentials("accessKey", "secretKey") .build() val credentialProvider = basicSparkCredentials.provider assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")
Migration von AWS Service-Clients von AWS SDK 1.x auf 2.x
AWS Service-Clients haben in 2.x unterschiedliche Paketnamen (d. h.software.amazon.awssdk
), wohingegen in Version 1.x andere Paketnamen verwendet werden. SDK com.amazonaws
Weitere Informationen über die Änderungen in dieser Version finden Sie hier. Wenn Sie diese Service-Clients in den Codes verwenden, müssten Sie die Clients entsprechend migrieren.
Es folgt ein Beispiel für die Erstellung eines Clients in SDK 1.x:
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient import com.amazonaws.services.dynamodbv2.document.DynamoDB AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient(); AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
Für die Migration zu 2.x:
-
Ändern Sie die Importanweisungen für Service-Clients. Nehmen wir als Beispiel DynamoDB-Clients. Sie müssten
com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
odercom.amazonaws.services.dynamodbv2.document.DynamoDB
zusoftware.amazon.awssdk.services.dynamodb.DynamoDbClient
ändern.// import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient // import com.amazonaws.services.dynamodbv2.document.DynamoDB import software.amazon.awssdk.services.dynamodb.DynamoDbClient
-
Die Codes aktualisieren, die die Clients initialisieren
// AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient(); // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient(); DynamoDbClient ddbClient = DynamoDbClient.create(); DynamoDbClient ddbClient = DynamoDbClient.builder().build();
Weitere Informationen zur Migration AWS SDK von 1.x zu 2.x finden Sie unter Was ist der Unterschied zwischen dem AWS SDK für Java 1.x und 2.x
Codebeispiele für Streaming-Anwendungen
import java.net.URI import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider import software.amazon.awssdk.http.apache.ApacheHttpClient import software.amazon.awssdk.services.kinesis.KinesisClient import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest import software.amazon.awssdk.regions.Region import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil} import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest import org.apache.spark.streaming.kinesis.KinesisInputDStream object KinesisWordCountASLSDKV2 { def main(args: Array[String]): Unit = { val appName = "demo-app" val streamName = "demo-kinesis-test" val endpointUrl = "https://kinesis.us-west-2.amazonaws.com" val regionName = "us-west-2" // Determine the number of shards from the stream using the low-level Kinesis Client // from the AWS Java SDK. val credentialsProvider = DefaultCredentialsProvider.create require(credentialsProvider.resolveCredentials() != null, "No AWS credentials found. Please specify credentials using one of the methods specified " + "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html") val kinesisClient = KinesisClient.builder() .credentialsProvider(credentialsProvider) .region(Region.US_WEST_2) .endpointOverride(URI.create(endpointUrl)) .httpClientBuilder(ApacheHttpClient.builder()) .build() val describeStreamRequest = DescribeStreamRequest.builder() .streamName(streamName) .build() val numShards = kinesisClient.describeStream(describeStreamRequest) .streamDescription .shards .size // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard. // This is not a necessity; if there are less receivers/DStreams than the number of shards, // then the shards will be automatically distributed among the receivers and each receiver // will receive data from multiple shards. val numStreams = numShards // Spark Streaming batch interval val batchInterval = Milliseconds(2000) // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information // on sequence number of records that have been received. Same as batchInterval for this // example. val kinesisCheckpointInterval = batchInterval // Setup the SparkConfig and StreamingContext val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2") val ssc = new StreamingContext(sparkConfig, batchInterval) // Create the Kinesis DStreams val kinesisStreams = (0 until numStreams).map { i => KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)) .build() } // Union all the streams val unionStreams = ssc.union(kinesisStreams) // Convert each line of Array[Byte] to String, and split into words val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" ")) // Map each word to a (word, 1) tuple so we can reduce by key to count the words val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) // Print the first 10 wordCounts wordCounts.print() // Start the streaming context and await termination ssc.start() ssc.awaitTermination() } }
Überlegungen zur Verwendung des aktualisierten Spark-Kinesis-Konnektors
-
Wenn Ihre Anwendungen die JDK Version
Kinesis-producer-library
mit einer niedrigeren Version als 11 verwenden, können Sie auf Ausnahmen stoßen wie.java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter
Dies liegt daran, dass EMR 7.0 standardmäßig JDK 17 enthält und J2EE-Module seit Java 11+ aus den Standardbibliotheken entfernt wurden. Dies könnte behoben werden, indem die folgende Abhängigkeit zur POM-Datei hinzugefügt wird. Ersetzen Sie die Bibliotheksversion nach Bedarf durch eine passende.<dependency> <groupId>javax.xml.bind</groupId> <artifactId>jaxb-api</artifactId> <version>${jaxb-api.version}</version> </dependency>
-
Das Spark Kinesis-Connector-JAR befindet sich nach der Erstellung eines EMR Clusters unter diesem Pfad:
/usr/lib/spark/connector/lib/