Seleccione sus preferencias de cookies

Usamos cookies esenciales y herramientas similares que son necesarias para proporcionar nuestro sitio y nuestros servicios. Usamos cookies de rendimiento para recopilar estadísticas anónimas para que podamos entender cómo los clientes usan nuestro sitio y hacer mejoras. Las cookies esenciales no se pueden desactivar, pero puede hacer clic en “Personalizar” o “Rechazar” para rechazar las cookies de rendimiento.

Si está de acuerdo, AWS y los terceros aprobados también utilizarán cookies para proporcionar características útiles del sitio, recordar sus preferencias y mostrar contenido relevante, incluida publicidad relevante. Para aceptar o rechazar todas las cookies no esenciales, haga clic en “Aceptar” o “Rechazar”. Para elegir opciones más detalladas, haga clic en “Personalizar”.

Migración del conector de Spark Kinesis al SDK 2.x para Amazon EMR 7.0 - Amazon EMR

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Migración del conector de Spark Kinesis al SDK 2.x para Amazon EMR 7.0

El AWS SDK proporciona un amplio conjunto de APIs bibliotecas para interactuar con los servicios de computación AWS en la nube, como la administración de credenciales y la conexión a los servicios de S3 y Kinesis. El conector de Spark Kinesis se utiliza para consumir datos de Kinesis Data Streams, y los datos recibidos se transforman y procesan en el motor de ejecución de Spark. Actualmente, este conector se basa en la versión 1.x del AWS SDK y Kinesis-client-library (KCL).

Como parte de la migración del AWS SDK 2.x, el conector de Spark Kinesis también se actualizó en consecuencia para que funcione con el SDK 2.x. En la versión 7.0 de Amazon EMR, Spark contiene la actualización 2.x del SDK que aún no está disponible en la versión comunitaria de Apache Spark. Si utiliza el conector de Spark Kinesis de una versión anterior a la 7.0, debe migrar los códigos de las aplicaciones para que se ejecuten en el SDK 2.x antes de poder migrar a Amazon EMR 7.0.

Guías de migración

En esta sección, se describen los pasos para migrar una aplicación al conector de Spark Kinesis actualizado. Incluye guías para migrar a la biblioteca de clientes de Kinesis (KCL) 2.x, proveedores de AWS credenciales y clientes de AWS servicios en el SDK 2.x. AWS Como referencia, también incluye un WordCountprograma de ejemplo que usa el conector Kinesis.

Migración de KCL de la versión 1.x a la 2.x

  • Nivel de métricas y dimensiones en KinesisInputDStream

    Al crear una instancia de KinesisInputDStream, puede controlar el nivel de métricas y las dimensiones para la secuencia. En el siguiente ejemplo, se muestra cómo se pueden personalizar estos parámetros con KCL 1.x:

    import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel val kinesisStream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet) .build()

    En KCL 2.x, estas opciones de configuración tienen nombres de paquete diferentes. Para migrar a 2.x:

    1. Cambie las instrucciones de importación de com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration y com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel a software.amazon.kinesis.metrics.MetricsLevel y software.amazon.kinesis.metrics.MetricsUtil respectivamente.

      // import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel import software.amazon.kinesis.metrics.MetricsLevel // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import software.amazon.kinesis.metrics.MetricsUtil
    2. Sustituya la línea metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet por metricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)

    A continuación, se presenta una versión actualizada de KinesisInputDStream con dimensiones y niveles de métricas personalizados:

    import software.amazon.kinesis.metrics.MetricsLevel import software.amazon.kinesis.metrics.MetricsUtil val kinesisStream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)) .build()
  • Función del controlador de mensajes en KinesisInputDStream

    Al crear una instancia de KinesisInputDStream, también puede proporcionar una “función del controlador de mensajes” que tome un registro de Kinesis y devuelva un objeto genérico T, en caso de que desee utilizar otros datos incluidos en un registro, como la clave de partición.

    En KCL 1.x, la firma de la función del controlador de mensajes es: Record => T, donde Record es com.amazonaws.services.kinesis.model.Record. En KCL 2.x, la firma del controlador se cambia a:KinesisClientRecord => T, where is. KinesisClientRecord software.amazon.kinesis.retrieval.KinesisClientRecord

    A continuación, se muestra un ejemplo de cómo proporcionar un controlador de mensajes en KCL 1.x:

    import com.amazonaws.services.kinesis.model.Record def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5 val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(Seconds(10)) .storageLevel(StorageLevel.MEMORY_ONLY) .buildWithMessageHandler(addFive)

    Para migrar el controlador de mensajes:

    1. Cambie la instrucción de importación de com.amazonaws.services.kinesis.model.Record a software.amazon.kinesis.retrieval.KinesisClientRecord.

      // import com.amazonaws.services.kinesis.model.Record import software.amazon.kinesis.retrieval.KinesisClientRecord
    2. Actualice la firma del método del controlador de mensajes.

      //def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5 def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5

    A continuación, se presenta un ejemplo actualizado de cómo proporcionar el controlador de mensajes en KCL 2.x:

    import software.amazon.kinesis.retrieval.KinesisClientRecord def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5 val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(Seconds(10)) .storageLevel(StorageLevel.MEMORY_ONLY) .buildWithMessageHandler(addFive)

    Para obtener información sobre cómo migrar de KCL 1.x a KCL 2.x, consulte Migración de consumidores de KCL 1.x a KCL 2.x.

Migración de proveedores de AWS credenciales del SDK 1.x al AWS 2.x

Los proveedores de credenciales se utilizan para obtener AWS credenciales con las que interactuar. AWS Se han realizado varios cambios en las interfaces y clases relacionadas con los proveedores de credenciales en el SDK 2.x. Puede encontrar más información aquí. El conector Spark Kinesis ha definido una interfaz (org.apache.spark.streaming.kinesis.SparkAWSCredentials) y clases de implementación que devuelven la versión 1.x de los proveedores de AWS credenciales. Estos proveedores de credenciales son necesarios para inicializar los clientes de Kinesis. Por ejemplo, si utiliza este método SparkAWSCredentials.provider en las aplicaciones, necesitará actualizar los códigos para utilizar la versión 2.x de los proveedores de credenciales. AWS

A continuación se muestra un ejemplo del uso de los proveedores de credenciales en AWS el SDK 1.x:

import org.apache.spark.streaming.kinesis.SparkAWSCredentials import com.amazonaws.auth.AWSCredentialsProvider val basicSparkCredentials = SparkAWSCredentials.builder .basicCredentials("accessKey", "secretKey") .build() val credentialProvider = basicSparkCredentials.provider assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
Para migrar a SDK 2.x:
  1. Cambie la instrucción de importación de com.amazonaws.auth.AWSCredentialsProvider a software.amazon.awssdk.auth.credentials.AwsCredentialsProvider

    //import com.amazonaws.auth.AWSCredentialsProvider import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
  2. Actualice los códigos restantes que utilizan esta clase.

    import org.apache.spark.streaming.kinesis.SparkAWSCredentials import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider val basicSparkCredentials = SparkAWSCredentials.builder .basicCredentials("accessKey", "secretKey") .build() val credentialProvider = basicSparkCredentials.provider assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")

Migración de clientes de AWS servicios del AWS SDK 1.x al 2.x

AWS los clientes de servicio tienen diferentes nombres de paquete en la versión 2.x (es decir,software.amazon.awssdk), mientras que en el SDK 1.x los usa. com.amazonaws Para obtener más información sobre los cambios de los clientes, consulte aquí. Si utiliza estos clientes de servicio en los códigos, deberá realizar la migración correspondiente de los clientes.

A continuación, se muestra un ejemplo de cómo crear un cliente en SDK 1.x:

import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient import com.amazonaws.services.dynamodbv2.document.DynamoDB AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient(); AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
Para migrar a 2.x:
  1. Cambie las instrucciones de importación para los clientes de servicio. Tomemos como ejemplo los clientes de DynamoDB. Tendría que cambiar com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient o com.amazonaws.services.dynamodbv2.document.DynamoDB a software.amazon.awssdk.services.dynamodb.DynamoDbClient.

    // import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient // import com.amazonaws.services.dynamodbv2.document.DynamoDB import software.amazon.awssdk.services.dynamodb.DynamoDbClient
  2. Actualización de los códigos que inicializan los clientes

    // AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient(); // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient(); DynamoDbClient ddbClient = DynamoDbClient.create(); DynamoDbClient ddbClient = DynamoDbClient.builder().build();

    Para obtener más información sobre la migración del AWS SDK de la versión 1.x a la 2.x, consulte ¿Qué diferencias hay entre el AWS SDK para Java 1.x y 2.x?

Ejemplos de código para aplicaciones de transmisión

import java.net.URI import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider import software.amazon.awssdk.http.apache.ApacheHttpClient import software.amazon.awssdk.services.kinesis.KinesisClient import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest import software.amazon.awssdk.regions.Region import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil} import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest import org.apache.spark.streaming.kinesis.KinesisInputDStream object KinesisWordCountASLSDKV2 { def main(args: Array[String]): Unit = { val appName = "demo-app" val streamName = "demo-kinesis-test" val endpointUrl = "https://kinesis.us-west-2.amazonaws.com" val regionName = "us-west-2" // Determine the number of shards from the stream using the low-level Kinesis Client // from the AWS Java SDK. val credentialsProvider = DefaultCredentialsProvider.create require(credentialsProvider.resolveCredentials() != null, "No AWS credentials found. Please specify credentials using one of the methods specified " + "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html") val kinesisClient = KinesisClient.builder() .credentialsProvider(credentialsProvider) .region(Region.US_WEST_2) .endpointOverride(URI.create(endpointUrl)) .httpClientBuilder(ApacheHttpClient.builder()) .build() val describeStreamRequest = DescribeStreamRequest.builder() .streamName(streamName) .build() val numShards = kinesisClient.describeStream(describeStreamRequest) .streamDescription .shards .size // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard. // This is not a necessity; if there are less receivers/DStreams than the number of shards, // then the shards will be automatically distributed among the receivers and each receiver // will receive data from multiple shards. val numStreams = numShards // Spark Streaming batch interval val batchInterval = Milliseconds(2000) // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information // on sequence number of records that have been received. Same as batchInterval for this // example. val kinesisCheckpointInterval = batchInterval // Setup the SparkConfig and StreamingContext val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2") val ssc = new StreamingContext(sparkConfig, batchInterval) // Create the Kinesis DStreams val kinesisStreams = (0 until numStreams).map { i => KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)) .build() } // Union all the streams val unionStreams = ssc.union(kinesisStreams) // Convert each line of Array[Byte] to String, and split into words val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" ")) // Map each word to a (word, 1) tuple so we can reduce by key to count the words val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) // Print the first 10 wordCounts wordCounts.print() // Start the streaming context and await termination ssc.start() ssc.awaitTermination() } }

Consideraciones a la hora de utilizar el conector de Spark Kinesis actualizado

  • Si sus aplicaciones utilizan la Kinesis-producer-library con una versión del JDK inferior a la 11, es posible que encuentre excepciones como java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter. Esto sucede porque EMR 7.0 incluye JDK 17 de forma predeterminada y los módulos J2EE se han eliminado de las bibliotecas estándar desde Java 11 en adelante. Esto se puede solucionar si agrega la siguiente dependencia en el archivo pom. Reemplace la versión de biblioteca por la que crea conveniente.

    <dependency> <groupId>javax.xml.bind</groupId> <artifactId>jaxb-api</artifactId> <version>${jaxb-api.version}</version> </dependency>
  • El jar del conector de Spark Kinesis se encuentra en esta ruta después de crear un clúster de EMR: /usr/lib/spark/connector/lib/

PrivacidadTérminos del sitioPreferencias de cookies
© 2025, Amazon Web Services, Inc o sus afiliados. Todos los derechos reservados.