Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Migración del conector Spark Kinesis a 2.x SDK para Amazon 7.0 EMR
AWS SDKProporciona un amplio conjunto de bibliotecas para interactuar con APIs los servicios de computación AWS en la nube, como la administración de credenciales y la conexión a los servicios de S3 y Kinesis. El conector de Spark Kinesis se utiliza para consumir datos de Kinesis Data Streams, y los datos recibidos se transforman y procesan en el motor de ejecución de Spark. Actualmente, este conector se basa en 1.x de AWS SDK y Kinesis-client-library ()KCL.
Como parte de la migración a la versión AWS SDK 2.x, el conector de Spark Kinesis también se actualizó en consecuencia para que funcione con SDK la versión 2.x. En la versión EMR 7.0 de Amazon, Spark contiene la actualización SDK 2.x que aún no está disponible en la versión comunitaria de Apache Spark. Si utilizas el conector Spark Kinesis de una versión anterior a la 7.0, debes migrar los códigos de tus aplicaciones para que se ejecuten en la versión SDK 2.x antes de poder migrar a Amazon 7.0. EMR
Guías de migración
En esta sección, se describen los pasos para migrar una aplicación al conector de Spark Kinesis actualizado. Incluye guías para migrar a la biblioteca de clientes de Kinesis (KCL) 2.x, proveedores de AWS credenciales y clientes de AWS servicios en la versión 2.x. AWS SDK Como referencia, también incluye un WordCount
Temas
Migración de 1.x KCL a 2.x
-
Nivel de métricas y dimensiones en
KinesisInputDStream
Al crear una instancia de
KinesisInputDStream
, puede controlar el nivel de métricas y las dimensiones para la secuencia. El siguiente ejemplo muestra cómo se pueden personalizar estos parámetros con 1.x: KCLimport com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel val kinesisStream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet) .build()
En la KCL versión 2.x, estas opciones de configuración tienen nombres de paquete diferentes. Para migrar a 2.x:
-
Cambie las instrucciones de importación de
com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
ycom.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
asoftware.amazon.kinesis.metrics.MetricsLevel
ysoftware.amazon.kinesis.metrics.MetricsUtil
respectivamente.// import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel import software.amazon.kinesis.metrics.MetricsLevel // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import software.amazon.kinesis.metrics.MetricsUtil
-
Sustituya la línea
metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet
pormetricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)
A continuación, se presenta una versión actualizada de
KinesisInputDStream
con dimensiones y niveles de métricas personalizados:import software.amazon.kinesis.metrics.MetricsLevel import software.amazon.kinesis.metrics.MetricsUtil val kinesisStream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)) .build()
-
-
Función del controlador de mensajes en
KinesisInputDStream
Al crear una instancia de
KinesisInputDStream
, también puede proporcionar una “función del controlador de mensajes” que tome un registro de Kinesis y devuelva un objeto genérico T, en caso de que desee utilizar otros datos incluidos en un registro, como la clave de partición.En la KCL versión 1.x, la firma de la función del controlador de mensajes es:
Record => T
, donde Record es.com.amazonaws.services.kinesis.model.Record
En KCL 2.x, la firma del controlador se cambia a:KinesisClientRecord => T
, where is. KinesisClientRecordsoftware.amazon.kinesis.retrieval.KinesisClientRecord
A continuación se muestra un ejemplo de cómo proporcionar un controlador de mensajes en la versión 1.x: KCL
import com.amazonaws.services.kinesis.model.Record def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5 val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(Seconds(10)) .storageLevel(StorageLevel.MEMORY_ONLY) .buildWithMessageHandler(addFive)
Para migrar el controlador de mensajes:
-
Cambie la instrucción de importación de
com.amazonaws.services.kinesis.model.Record
asoftware.amazon.kinesis.retrieval.KinesisClientRecord
.// import com.amazonaws.services.kinesis.model.Record import software.amazon.kinesis.retrieval.KinesisClientRecord
-
Actualice la firma del método del controlador de mensajes.
//def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5 def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
A continuación se presenta un ejemplo actualizado de cómo proporcionar el controlador de mensajes en la versión 2.x: KCL
import software.amazon.kinesis.retrieval.KinesisClientRecord def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5 val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(Seconds(10)) .storageLevel(StorageLevel.MEMORY_ONLY) .buildWithMessageHandler(addFive)
Para obtener más información sobre la migración de la versión KCL 1.x a la 2.x, consulte Migración de consumidores de la versión 1.x a la 2.x. KCL KCL
-
Migración AWS de proveedores de credenciales de la versión 1.x a la 2.x AWS SDK
Los proveedores de credenciales se utilizan para obtener AWS credenciales con las que interactuar. AWS Hay varios cambios de interfaz y clase relacionados con los proveedores de credenciales de la versión SDK 2.x, que se pueden consultar aquíorg.apache.spark.streaming.kinesis.SparkAWSCredentials
) y clases de implementación que devuelven la versión 1.x de los proveedores de AWS credenciales. Estos proveedores de credenciales son necesarios para inicializar los clientes de Kinesis. Por ejemplo, si utiliza este método SparkAWSCredentials.provider
en las aplicaciones, necesitará actualizar los códigos para utilizar la versión 2.x de los proveedores de credenciales. AWS
A continuación se muestra un ejemplo del uso de los proveedores de credenciales en la versión 1.x: AWS SDK
import org.apache.spark.streaming.kinesis.SparkAWSCredentials import com.amazonaws.auth.AWSCredentialsProvider val basicSparkCredentials = SparkAWSCredentials.builder .basicCredentials("accessKey", "secretKey") .build() val credentialProvider = basicSparkCredentials.provider assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
Para migrar a la versión 2.xSDK:
-
Cambie la instrucción de importación de
com.amazonaws.auth.AWSCredentialsProvider
asoftware.amazon.awssdk.auth.credentials.AwsCredentialsProvider
//import com.amazonaws.auth.AWSCredentialsProvider import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
-
Actualice los códigos restantes que utilizan esta clase.
import org.apache.spark.streaming.kinesis.SparkAWSCredentials import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider val basicSparkCredentials = SparkAWSCredentials.builder .basicCredentials("accessKey", "secretKey") .build() val credentialProvider = basicSparkCredentials.provider assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")
Migración de clientes de AWS servicios de AWS SDK 1.x a 2.x
AWS los clientes de servicio tienen diferentes nombres de paquete en la versión 2.x (es decir,software.amazon.awssdk
), mientras que en la versión 1.x los usa. SDK com.amazonaws
Para obtener más información sobre los cambios de los clientes, consulte aquí. Si utiliza estos clientes de servicio en los códigos, deberá realizar la migración correspondiente de los clientes.
A continuación se muestra un ejemplo de cómo crear un cliente en SDK la versión 1.x:
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient import com.amazonaws.services.dynamodbv2.document.DynamoDB AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient(); AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
Para migrar a 2.x:
-
Cambie las instrucciones de importación para los clientes de servicio. Tomemos como ejemplo los clientes de DynamoDB. Tendría que cambiar
com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
ocom.amazonaws.services.dynamodbv2.document.DynamoDB
asoftware.amazon.awssdk.services.dynamodb.DynamoDbClient
.// import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient // import com.amazonaws.services.dynamodbv2.document.DynamoDB import software.amazon.awssdk.services.dynamodb.DynamoDbClient
-
Actualización de los códigos que inicializan los clientes
// AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient(); // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient(); DynamoDbClient ddbClient = DynamoDbClient.create(); DynamoDbClient ddbClient = DynamoDbClient.builder().build();
Para obtener más información sobre la migración AWS SDK de la versión 1.x a la 2.x, consulte ¿Qué diferencias hay entre las versiones 1.x y 2.x de Java AWS SDK?
Ejemplos de código para aplicaciones de transmisión
import java.net.URI import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider import software.amazon.awssdk.http.apache.ApacheHttpClient import software.amazon.awssdk.services.kinesis.KinesisClient import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest import software.amazon.awssdk.regions.Region import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil} import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest import org.apache.spark.streaming.kinesis.KinesisInputDStream object KinesisWordCountASLSDKV2 { def main(args: Array[String]): Unit = { val appName = "demo-app" val streamName = "demo-kinesis-test" val endpointUrl = "https://kinesis.us-west-2.amazonaws.com" val regionName = "us-west-2" // Determine the number of shards from the stream using the low-level Kinesis Client // from the AWS Java SDK. val credentialsProvider = DefaultCredentialsProvider.create require(credentialsProvider.resolveCredentials() != null, "No AWS credentials found. Please specify credentials using one of the methods specified " + "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html") val kinesisClient = KinesisClient.builder() .credentialsProvider(credentialsProvider) .region(Region.US_WEST_2) .endpointOverride(URI.create(endpointUrl)) .httpClientBuilder(ApacheHttpClient.builder()) .build() val describeStreamRequest = DescribeStreamRequest.builder() .streamName(streamName) .build() val numShards = kinesisClient.describeStream(describeStreamRequest) .streamDescription .shards .size // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard. // This is not a necessity; if there are less receivers/DStreams than the number of shards, // then the shards will be automatically distributed among the receivers and each receiver // will receive data from multiple shards. val numStreams = numShards // Spark Streaming batch interval val batchInterval = Milliseconds(2000) // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information // on sequence number of records that have been received. Same as batchInterval for this // example. val kinesisCheckpointInterval = batchInterval // Setup the SparkConfig and StreamingContext val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2") val ssc = new StreamingContext(sparkConfig, batchInterval) // Create the Kinesis DStreams val kinesisStreams = (0 until numStreams).map { i => KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)) .build() } // Union all the streams val unionStreams = ssc.union(kinesisStreams) // Convert each line of Array[Byte] to String, and split into words val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" ")) // Map each word to a (word, 1) tuple so we can reduce by key to count the words val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) // Print the first 10 wordCounts wordCounts.print() // Start the streaming context and await termination ssc.start() ssc.awaitTermination() } }
Consideraciones a la hora de utilizar el conector de Spark Kinesis actualizado
-
Si sus aplicaciones utilizan una JDK versión inferior a la 11, es posible que se encuentre
Kinesis-producer-library
con excepciones como.java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter
Esto sucede porque la EMR versión 7.0 viene con JDK 17 de forma predeterminada y los módulos J2EE se han eliminado de las bibliotecas estándar desde Java 11 y versiones posteriores. Esto se puede solucionar si agrega la siguiente dependencia en el archivo pom. Reemplace la versión de biblioteca por la que crea conveniente.<dependency> <groupId>javax.xml.bind</groupId> <artifactId>jaxb-api</artifactId> <version>${jaxb-api.version}</version> </dependency>
-
El tarro del conector de Spark Kinesis se encuentra en esta ruta después de crear un EMR clúster:
/usr/lib/spark/connector/lib/