Amazon Managed Service para Apache Flink Amazon se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.
Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Comenzar (Scala)
nota
A partir de la versión 1.15, Flink es gratuito para Scala. Las aplicaciones ahora pueden usar Java API desde cualquier versión de Scala. Flink sigue utilizando Scala internamente en algunos componentes clave, pero no lo expone al cargador de clases del código de usuario. Por eso, debes añadir las dependencias de Scala a tus -archives. JAR
Para obtener más información sobre los cambios de Scala en Flink 1.15, consulte Scala Free in One Fifteen
En este ejercicio, creará una aplicación de Managed Service for Apache Flink para Scala con una transmisión de Kinesis como fuente y como receptor.
Este tema contiene las siguientes secciones:
Cree recursos dependientes
Antes de crear una aplicación de Managed Service para Apache Flink para este ejercicio, debe crear los siguientes recursos dependientes:
Dos flujos de Kinesis para entrada y salida.
Un bucket de Amazon S3 para almacenar el código de la aplicación (
ka-app-code-
)<username>
Puede crear los flujos de Kinesis y el bucket de Amazon S3 usando la consola. Si desea obtener instrucciones para crear estos recursos, consulte los siguientes temas:
Creación y actualización de flujos de datos en la Guía para desarrolladores de Amazon Kinesis Data Streams. Asigne un nombre a sus flujos de datos
ExampleInputStream
yExampleOutputStream
.Cómo crear flujos de datos (AWS CLI)
Para crear la primera transmisión (
ExampleInputStream
), utilice el siguiente comando AWS CLI create-stream de Amazon Kinesis.aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
Para crear el segundo flujo que la aplicación utilizará para escribir la salida, ejecute el mismo comando, cambiando el nombre a
ExampleOutputStream
.aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
¿Cómo se puede crear un bucket de S3? en la Guía de usuario de Amazon Simple Storage Service. Asigne al bucket de Amazon S3 un nombre único globalmente añadiendo su nombre de inicio de sesión, por ejemplo,
ka-app-code-
.<username>
Otros recursos
Al crear la aplicación, Managed Service for Apache Flink crea los siguientes CloudWatch recursos de Amazon si aún no existen:
Un grupo de registro llamado
/AWS/KinesisAnalytics-java/MyApplication
Un flujo de registro llamado
kinesis-analytics-log-stream
Escriba registros de muestra en el flujo de entrada
En esta sección, se utiliza un script de Python para escribir registros de muestra en el flujo para que la aplicación los procese.
nota
Esta sección requiere AWS SDK for Python (Boto)
nota
El script de Python en esta sección usa AWS CLI. Debe configurarlas AWS CLI para usar las credenciales de su cuenta y su región predeterminada. Para configurar la suya AWS CLI, introduzca lo siguiente:
aws configure
-
Cree un archivo denominado
stock.py
con el siguiente contenido:import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
-
Ejecute el script
stock.py
:$ python stock.py
Mantenga el script en ejecución mientras completa el resto del tutorial.
Descargue y examine el código de la aplicación
El código de la aplicación Python para este ejemplo está disponible en GitHub. Para descargar el código de la aplicación, haga lo siguiente:
Si aún no lo ha hecho, instale el cliente Git. Para obtener más información, consulte Installing Git
. Clone el repositorio remoto con el siguiente comando:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
Vaya al directorio
amazon-kinesis-data-analytics-java-examples/scala/GettingStarted
.
Tenga en cuenta lo siguiente en relación con el código de la aplicación:
Un archivo
build.sbt
contiene información sobre la configuración y las dependencias de la aplicación, incluidas las bibliotecas de Managed Service para Apache Flink.El archivo
BasicStreamingJob.scala
contiene el método principal que define la funcionalidad de la aplicación.La aplicación utiliza un origen de Kinesis para leer datos del flujo de origen. El siguiente fragmento crea el origen de Kinesis:
private def createSource: FlinkKinesisConsumer[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val inputProperties = applicationProperties.get("ConsumerConfigProperties") new FlinkKinesisConsumer[String](inputProperties.getProperty(streamNameKey, defaultInputStreamName), new SimpleStringSchema, inputProperties) }
La aplicación también utiliza un receptor de Kinesis para escribir en el flujo de resultado. El siguiente fragmento crea el receptor de Kinesis:
private def createSink: KinesisStreamsSink[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val outputProperties = applicationProperties.get("ProducerConfigProperties") KinesisStreamsSink.builder[String] .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema) .setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName)) .setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode)) .build }
La aplicación crea conectores de origen y receptor para acceder a recursos externos mediante un StreamExecutionEnvironment objeto.
La aplicación crea conectores de origen y recepción mediante las propiedades dinámicas de la aplicación. Las propiedades de tiempo de ejecución de la aplicación se leen para configurar los conectores. Para obtener más información sobre las propiedades de tiempo de ejecución, consulte Runtime Properties.
Compilación y carga del código de la aplicación
En esta sección, compilará y cargará su código de aplicación en el bucket de Amazon S3 que creó en la sección Cree recursos dependientes.
Compilación del código de la aplicación
En esta sección, utilizará la herramienta de SBT
Para usar el código de la aplicación, debe compilarlo y empaquetarlo en un JAR archivo. Puede compilar y empaquetar el código conSBT:
sbt assembly
-
Si la aplicación se compila correctamente, se crea el siguiente archivo:
target/scala-3.2.0/getting-started-scala-1.0.jar
Carga del código de Scala de streaming de Apache Flink
En esta sección, creará un bucket de Amazon S3 y cargará el código de la aplicación.
Abra la consola Amazon S3 en https://console.aws.amazon.com/s3/
. Elija Crear bucket
Escriba
ka-app-code-<username>
en el campo Nombre del bucket. Añada un sufijo al nombre del bucket, como su nombre de usuario, para que sea único a nivel global. Elija Siguiente.En el paso Configurar opciones, deje los ajustes tal y como están y elija Siguiente.
En el paso Establecer permisos, deje los ajustes tal y como están y elija Siguiente.
Elija Crear bucket.
Abra el bucket
ka-app-code-<username>
y elija Cargar.-
En el paso Seleccionar archivos, elija Añadir archivos. Vaya al archivo
getting-started-scala-1.0.jar
que creó en el paso anterior. No es necesario cambiar ninguno de los ajustes del objeto, por lo tanto, elija Cargar.
El código de la aplicación ya está almacenado en un bucket de Amazon S3 al que la aplicación puede acceder.