Migración del conector Spark Kinesis a 2.x SDK para Amazon 7.0 EMR - Amazon EMR

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Migración del conector Spark Kinesis a 2.x SDK para Amazon 7.0 EMR

AWS SDKProporciona un amplio conjunto de bibliotecas para interactuar con APIs los servicios de computación AWS en la nube, como la administración de credenciales y la conexión a los servicios de S3 y Kinesis. El conector de Spark Kinesis se utiliza para consumir datos de Kinesis Data Streams, y los datos recibidos se transforman y procesan en el motor de ejecución de Spark. Actualmente, este conector se basa en 1.x de AWS SDK y Kinesis-client-library ()KCL.

Como parte de la migración a la versión AWS SDK 2.x, el conector de Spark Kinesis también se actualizó en consecuencia para que funcione con SDK la versión 2.x. En la versión EMR 7.0 de Amazon, Spark contiene la actualización SDK 2.x que aún no está disponible en la versión comunitaria de Apache Spark. Si utilizas el conector Spark Kinesis de una versión anterior a la 7.0, debes migrar los códigos de tus aplicaciones para que se ejecuten en la versión SDK 2.x antes de poder migrar a Amazon 7.0. EMR

Guías de migración

En esta sección, se describen los pasos para migrar una aplicación al conector de Spark Kinesis actualizado. Incluye guías para migrar a la biblioteca de clientes de Kinesis (KCL) 2.x, proveedores de AWS credenciales y clientes de AWS servicios en la versión 2.x. AWS SDK Como referencia, también incluye un WordCountprograma de ejemplo que usa el conector Kinesis.

Migración de 1.x KCL a 2.x

  • Nivel de métricas y dimensiones en KinesisInputDStream

    Al crear una instancia de KinesisInputDStream, puede controlar el nivel de métricas y las dimensiones para la secuencia. El siguiente ejemplo muestra cómo se pueden personalizar estos parámetros con 1.x: KCL

    import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel val kinesisStream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet) .build()

    En la KCL versión 2.x, estas opciones de configuración tienen nombres de paquete diferentes. Para migrar a 2.x:

    1. Cambie las instrucciones de importación de com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration y com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel a software.amazon.kinesis.metrics.MetricsLevel y software.amazon.kinesis.metrics.MetricsUtil respectivamente.

      // import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel import software.amazon.kinesis.metrics.MetricsLevel // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import software.amazon.kinesis.metrics.MetricsUtil
    2. Sustituya la línea metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet por metricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)

    A continuación, se presenta una versión actualizada de KinesisInputDStream con dimensiones y niveles de métricas personalizados:

    import software.amazon.kinesis.metrics.MetricsLevel import software.amazon.kinesis.metrics.MetricsUtil val kinesisStream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)) .build()
  • Función del controlador de mensajes en KinesisInputDStream

    Al crear una instancia de KinesisInputDStream, también puede proporcionar una “función del controlador de mensajes” que tome un registro de Kinesis y devuelva un objeto genérico T, en caso de que desee utilizar otros datos incluidos en un registro, como la clave de partición.

    En la KCL versión 1.x, la firma de la función del controlador de mensajes es:Record => T, donde Record es. com.amazonaws.services.kinesis.model.Record En KCL 2.x, la firma del controlador se cambia a:KinesisClientRecord => T, where is. KinesisClientRecord software.amazon.kinesis.retrieval.KinesisClientRecord

    A continuación se muestra un ejemplo de cómo proporcionar un controlador de mensajes en la versión 1.x: KCL

    import com.amazonaws.services.kinesis.model.Record def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5 val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(Seconds(10)) .storageLevel(StorageLevel.MEMORY_ONLY) .buildWithMessageHandler(addFive)

    Para migrar el controlador de mensajes:

    1. Cambie la instrucción de importación de com.amazonaws.services.kinesis.model.Record a software.amazon.kinesis.retrieval.KinesisClientRecord.

      // import com.amazonaws.services.kinesis.model.Record import software.amazon.kinesis.retrieval.KinesisClientRecord
    2. Actualice la firma del método del controlador de mensajes.

      //def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5 def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5

    A continuación se presenta un ejemplo actualizado de cómo proporcionar el controlador de mensajes en la versión 2.x: KCL

    import software.amazon.kinesis.retrieval.KinesisClientRecord def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5 val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(Seconds(10)) .storageLevel(StorageLevel.MEMORY_ONLY) .buildWithMessageHandler(addFive)

    Para obtener más información sobre la migración de la versión KCL 1.x a la 2.x, consulte Migración de consumidores de la versión 1.x a la 2.x. KCL KCL

Migración AWS de proveedores de credenciales de la versión 1.x a la 2.x AWS SDK

Los proveedores de credenciales se utilizan para obtener AWS credenciales con las que interactuar. AWS Hay varios cambios de interfaz y clase relacionados con los proveedores de credenciales de la versión SDK 2.x, que se pueden consultar aquí. El conector Spark Kinesis ha definido una interfaz (org.apache.spark.streaming.kinesis.SparkAWSCredentials) y clases de implementación que devuelven la versión 1.x de los proveedores de AWS credenciales. Estos proveedores de credenciales son necesarios para inicializar los clientes de Kinesis. Por ejemplo, si utiliza este método SparkAWSCredentials.provider en las aplicaciones, necesitará actualizar los códigos para utilizar la versión 2.x de los proveedores de credenciales. AWS

A continuación se muestra un ejemplo del uso de los proveedores de credenciales en la versión 1.x: AWS SDK

import org.apache.spark.streaming.kinesis.SparkAWSCredentials import com.amazonaws.auth.AWSCredentialsProvider val basicSparkCredentials = SparkAWSCredentials.builder .basicCredentials("accessKey", "secretKey") .build() val credentialProvider = basicSparkCredentials.provider assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
Para migrar a la versión 2.xSDK:
  1. Cambie la instrucción de importación de com.amazonaws.auth.AWSCredentialsProvider a software.amazon.awssdk.auth.credentials.AwsCredentialsProvider

    //import com.amazonaws.auth.AWSCredentialsProvider import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
  2. Actualice los códigos restantes que utilizan esta clase.

    import org.apache.spark.streaming.kinesis.SparkAWSCredentials import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider val basicSparkCredentials = SparkAWSCredentials.builder .basicCredentials("accessKey", "secretKey") .build() val credentialProvider = basicSparkCredentials.provider assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")

Migración de clientes de AWS servicios de AWS SDK 1.x a 2.x

AWS los clientes de servicio tienen diferentes nombres de paquete en la versión 2.x (es decir,software.amazon.awssdk), mientras que en la versión 1.x los usa. SDK com.amazonaws Para obtener más información sobre los cambios de los clientes, consulte aquí. Si utiliza estos clientes de servicio en los códigos, deberá realizar la migración correspondiente de los clientes.

A continuación se muestra un ejemplo de cómo crear un cliente en SDK la versión 1.x:

import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient import com.amazonaws.services.dynamodbv2.document.DynamoDB AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient(); AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
Para migrar a 2.x:
  1. Cambie las instrucciones de importación para los clientes de servicio. Tomemos como ejemplo los clientes de DynamoDB. Tendría que cambiar com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient o com.amazonaws.services.dynamodbv2.document.DynamoDB a software.amazon.awssdk.services.dynamodb.DynamoDbClient.

    // import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient // import com.amazonaws.services.dynamodbv2.document.DynamoDB import software.amazon.awssdk.services.dynamodb.DynamoDbClient
  2. Actualización de los códigos que inicializan los clientes

    // AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient(); // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient(); DynamoDbClient ddbClient = DynamoDbClient.create(); DynamoDbClient ddbClient = DynamoDbClient.builder().build();

    Para obtener más información sobre la migración AWS SDK de la versión 1.x a la 2.x, consulte ¿Qué diferencias hay entre las versiones 1.x y 2.x de Java AWS SDK?

Ejemplos de código para aplicaciones de transmisión

import java.net.URI import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider import software.amazon.awssdk.http.apache.ApacheHttpClient import software.amazon.awssdk.services.kinesis.KinesisClient import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest import software.amazon.awssdk.regions.Region import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil} import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest import org.apache.spark.streaming.kinesis.KinesisInputDStream object KinesisWordCountASLSDKV2 { def main(args: Array[String]): Unit = { val appName = "demo-app" val streamName = "demo-kinesis-test" val endpointUrl = "https://kinesis.us-west-2.amazonaws.com" val regionName = "us-west-2" // Determine the number of shards from the stream using the low-level Kinesis Client // from the AWS Java SDK. val credentialsProvider = DefaultCredentialsProvider.create require(credentialsProvider.resolveCredentials() != null, "No AWS credentials found. Please specify credentials using one of the methods specified " + "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html") val kinesisClient = KinesisClient.builder() .credentialsProvider(credentialsProvider) .region(Region.US_WEST_2) .endpointOverride(URI.create(endpointUrl)) .httpClientBuilder(ApacheHttpClient.builder()) .build() val describeStreamRequest = DescribeStreamRequest.builder() .streamName(streamName) .build() val numShards = kinesisClient.describeStream(describeStreamRequest) .streamDescription .shards .size // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard. // This is not a necessity; if there are less receivers/DStreams than the number of shards, // then the shards will be automatically distributed among the receivers and each receiver // will receive data from multiple shards. val numStreams = numShards // Spark Streaming batch interval val batchInterval = Milliseconds(2000) // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information // on sequence number of records that have been received. Same as batchInterval for this // example. val kinesisCheckpointInterval = batchInterval // Setup the SparkConfig and StreamingContext val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2") val ssc = new StreamingContext(sparkConfig, batchInterval) // Create the Kinesis DStreams val kinesisStreams = (0 until numStreams).map { i => KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)) .build() } // Union all the streams val unionStreams = ssc.union(kinesisStreams) // Convert each line of Array[Byte] to String, and split into words val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" ")) // Map each word to a (word, 1) tuple so we can reduce by key to count the words val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) // Print the first 10 wordCounts wordCounts.print() // Start the streaming context and await termination ssc.start() ssc.awaitTermination() } }

Consideraciones a la hora de utilizar el conector de Spark Kinesis actualizado

  • Si sus aplicaciones utilizan una JDK versión inferior a la 11, es posible que se encuentre Kinesis-producer-library con excepciones como. java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter Esto sucede porque la EMR versión 7.0 viene con JDK 17 de forma predeterminada y los módulos J2EE se han eliminado de las bibliotecas estándar desde Java 11 y versiones posteriores. Esto se puede solucionar si agrega la siguiente dependencia en el archivo pom. Reemplace la versión de biblioteca por la que crea conveniente.

    <dependency> <groupId>javax.xml.bind</groupId> <artifactId>jaxb-api</artifactId> <version>${jaxb-api.version}</version> </dependency>
  • El tarro del conector de Spark Kinesis se encuentra en esta ruta después de crear un EMR clúster: /usr/lib/spark/connector/lib/