Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Migración del conector de Spark Kinesis al SDK 2.x para Amazon EMR 7.0
El AWS SDK proporciona un amplio conjunto de APIs bibliotecas para interactuar con los servicios de computación AWS en la nube, como la administración de credenciales y la conexión a los servicios de S3 y Kinesis. El conector de Spark Kinesis se utiliza para consumir datos de Kinesis Data Streams, y los datos recibidos se transforman y procesan en el motor de ejecución de Spark. Actualmente, este conector se basa en la versión 1.x del AWS SDK y Kinesis-client-library (KCL).
Como parte de la migración del AWS SDK 2.x, el conector de Spark Kinesis también se actualizó en consecuencia para que funcione con el SDK 2.x. En la versión 7.0 de Amazon EMR, Spark contiene la actualización 2.x del SDK que aún no está disponible en la versión comunitaria de Apache Spark. Si utiliza el conector de Spark Kinesis de una versión anterior a la 7.0, debe migrar los códigos de las aplicaciones para que se ejecuten en el SDK 2.x antes de poder migrar a Amazon EMR 7.0.
Guías de migración
En esta sección, se describen los pasos para migrar una aplicación al conector de Spark Kinesis actualizado. Incluye guías para migrar a la biblioteca de clientes de Kinesis (KCL) 2.x, proveedores de AWS credenciales y clientes de AWS servicios en el SDK 2.x. AWS Como referencia, también incluye un WordCount
Temas
Migración de KCL de la versión 1.x a la 2.x
-
Nivel de métricas y dimensiones en
KinesisInputDStream
Al crear una instancia de
KinesisInputDStream
, puede controlar el nivel de métricas y las dimensiones para la secuencia. En el siguiente ejemplo, se muestra cómo se pueden personalizar estos parámetros con KCL 1.x:import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel val kinesisStream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet) .build()
En KCL 2.x, estas opciones de configuración tienen nombres de paquete diferentes. Para migrar a 2.x:
-
Cambie las instrucciones de importación de
com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
ycom.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
asoftware.amazon.kinesis.metrics.MetricsLevel
ysoftware.amazon.kinesis.metrics.MetricsUtil
respectivamente.// import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel import software.amazon.kinesis.metrics.MetricsLevel // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import software.amazon.kinesis.metrics.MetricsUtil
-
Sustituya la línea
metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet
pormetricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)
A continuación, se presenta una versión actualizada de
KinesisInputDStream
con dimensiones y niveles de métricas personalizados:import software.amazon.kinesis.metrics.MetricsLevel import software.amazon.kinesis.metrics.MetricsUtil val kinesisStream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)) .build()
-
-
Función del controlador de mensajes en
KinesisInputDStream
Al crear una instancia de
KinesisInputDStream
, también puede proporcionar una “función del controlador de mensajes” que tome un registro de Kinesis y devuelva un objeto genérico T, en caso de que desee utilizar otros datos incluidos en un registro, como la clave de partición.En KCL 1.x, la firma de la función del controlador de mensajes es:
Record => T
, donde Record escom.amazonaws.services.kinesis.model.Record
. En KCL 2.x, la firma del controlador se cambia a:KinesisClientRecord => T
, where is. KinesisClientRecordsoftware.amazon.kinesis.retrieval.KinesisClientRecord
A continuación, se muestra un ejemplo de cómo proporcionar un controlador de mensajes en KCL 1.x:
import com.amazonaws.services.kinesis.model.Record def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5 val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(Seconds(10)) .storageLevel(StorageLevel.MEMORY_ONLY) .buildWithMessageHandler(addFive)
Para migrar el controlador de mensajes:
-
Cambie la instrucción de importación de
com.amazonaws.services.kinesis.model.Record
asoftware.amazon.kinesis.retrieval.KinesisClientRecord
.// import com.amazonaws.services.kinesis.model.Record import software.amazon.kinesis.retrieval.KinesisClientRecord
-
Actualice la firma del método del controlador de mensajes.
//def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5 def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5
A continuación, se presenta un ejemplo actualizado de cómo proporcionar el controlador de mensajes en KCL 2.x:
import software.amazon.kinesis.retrieval.KinesisClientRecord def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5 val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(Seconds(10)) .storageLevel(StorageLevel.MEMORY_ONLY) .buildWithMessageHandler(addFive)
Para obtener información sobre cómo migrar de KCL 1.x a KCL 2.x, consulte Migración de consumidores de KCL 1.x a KCL 2.x.
-
Migración de proveedores de AWS credenciales del SDK 1.x al AWS 2.x
Los proveedores de credenciales se utilizan para obtener AWS credenciales con las que interactuar. AWS Se han realizado varios cambios en las interfaces y clases relacionadas con los proveedores de credenciales en el SDK 2.x. Puede encontrar más información aquíorg.apache.spark.streaming.kinesis.SparkAWSCredentials
) y clases de implementación que devuelven la versión 1.x de los proveedores de AWS credenciales. Estos proveedores de credenciales son necesarios para inicializar los clientes de Kinesis. Por ejemplo, si utiliza este método SparkAWSCredentials.provider
en las aplicaciones, necesitará actualizar los códigos para utilizar la versión 2.x de los proveedores de credenciales. AWS
A continuación se muestra un ejemplo del uso de los proveedores de credenciales en AWS el SDK 1.x:
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
import com.amazonaws.auth.AWSCredentialsProvider
val basicSparkCredentials = SparkAWSCredentials.builder
.basicCredentials("accessKey", "secretKey")
.build()
val credentialProvider = basicSparkCredentials.provider
assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
Para migrar a SDK 2.x:
-
Cambie la instrucción de importación de
com.amazonaws.auth.AWSCredentialsProvider
asoftware.amazon.awssdk.auth.credentials.AwsCredentialsProvider
//import com.amazonaws.auth.AWSCredentialsProvider import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
-
Actualice los códigos restantes que utilizan esta clase.
import org.apache.spark.streaming.kinesis.SparkAWSCredentials import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider val basicSparkCredentials = SparkAWSCredentials.builder .basicCredentials("accessKey", "secretKey") .build() val credentialProvider = basicSparkCredentials.provider assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")
Migración de clientes de AWS servicios del AWS SDK 1.x al 2.x
AWS los clientes de servicio tienen diferentes nombres de paquete en la versión 2.x (es decir,software.amazon.awssdk
), mientras que en el SDK 1.x los usa. com.amazonaws
Para obtener más información sobre los cambios de los clientes, consulte aquí. Si utiliza estos clientes de servicio en los códigos, deberá realizar la migración correspondiente de los clientes.
A continuación, se muestra un ejemplo de cómo crear un cliente en SDK 1.x:
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient();
AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
Para migrar a 2.x:
-
Cambie las instrucciones de importación para los clientes de servicio. Tomemos como ejemplo los clientes de DynamoDB. Tendría que cambiar
com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
ocom.amazonaws.services.dynamodbv2.document.DynamoDB
asoftware.amazon.awssdk.services.dynamodb.DynamoDbClient
.// import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient // import com.amazonaws.services.dynamodbv2.document.DynamoDB import software.amazon.awssdk.services.dynamodb.DynamoDbClient
-
Actualización de los códigos que inicializan los clientes
// AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient(); // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient(); DynamoDbClient ddbClient = DynamoDbClient.create(); DynamoDbClient ddbClient = DynamoDbClient.builder().build();
Para obtener más información sobre la migración del AWS SDK de la versión 1.x a la 2.x, consulte ¿Qué diferencias hay entre el AWS SDK para Java 1.x y 2.x?
Ejemplos de código para aplicaciones de transmisión
import java.net.URI
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.services.kinesis.KinesisClient
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest
import software.amazon.awssdk.regions.Region
import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.kinesis.KinesisInputDStream
object KinesisWordCountASLSDKV2 {
def main(args: Array[String]): Unit = {
val appName = "demo-app"
val streamName = "demo-kinesis-test"
val endpointUrl = "https://kinesis.us-west-2.amazonaws.com"
val regionName = "us-west-2"
// Determine the number of shards from the stream using the low-level Kinesis Client
// from the AWS Java SDK.
val credentialsProvider = DefaultCredentialsProvider.create
require(credentialsProvider.resolveCredentials() != null,
"No AWS credentials found. Please specify credentials using one of the methods specified " +
"in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html")
val kinesisClient = KinesisClient.builder()
.credentialsProvider(credentialsProvider)
.region(Region.US_WEST_2)
.endpointOverride(URI.create(endpointUrl))
.httpClientBuilder(ApacheHttpClient.builder())
.build()
val describeStreamRequest = DescribeStreamRequest.builder()
.streamName(streamName)
.build()
val numShards = kinesisClient.describeStream(describeStreamRequest)
.streamDescription
.shards
.size
// In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard.
// This is not a necessity; if there are less receivers/DStreams than the number of shards,
// then the shards will be automatically distributed among the receivers and each receiver
// will receive data from multiple shards.
val numStreams = numShards
// Spark Streaming batch interval
val batchInterval = Milliseconds(2000)
// Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
// on sequence number of records that have been received. Same as batchInterval for this
// example.
val kinesisCheckpointInterval = batchInterval
// Setup the SparkConfig and StreamingContext
val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2")
val ssc = new StreamingContext(sparkConfig, batchInterval)
// Create the Kinesis DStreams
val kinesisStreams = (0 until numStreams).map { i =>
KinesisInputDStream.builder
.streamingContext(ssc)
.streamName(streamName)
.endpointUrl(endpointUrl)
.regionName(regionName)
.initialPosition(new Latest())
.checkpointAppName(appName)
.checkpointInterval(kinesisCheckpointInterval)
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.metricsLevel(MetricsLevel.DETAILED)
.metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
.build()
}
// Union all the streams
val unionStreams = ssc.union(kinesisStreams)
// Convert each line of Array[Byte] to String, and split into words
val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
// Map each word to a (word, 1) tuple so we can reduce by key to count the words
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
// Print the first 10 wordCounts
wordCounts.print()
// Start the streaming context and await termination
ssc.start()
ssc.awaitTermination()
}
}
Consideraciones a la hora de utilizar el conector de Spark Kinesis actualizado
-
Si sus aplicaciones utilizan la
Kinesis-producer-library
con una versión del JDK inferior a la 11, es posible que encuentre excepciones comojava.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter
. Esto sucede porque EMR 7.0 incluye JDK 17 de forma predeterminada y los módulos J2EE se han eliminado de las bibliotecas estándar desde Java 11 en adelante. Esto se puede solucionar si agrega la siguiente dependencia en el archivo pom. Reemplace la versión de biblioteca por la que crea conveniente.<dependency> <groupId>javax.xml.bind</groupId> <artifactId>jaxb-api</artifactId> <version>${jaxb-api.version}</version> </dependency>
-
El jar del conector de Spark Kinesis se encuentra en esta ruta después de crear un clúster de EMR:
/usr/lib/spark/connector/lib/