Amazon Managed Service para Apache Flink Amazon se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.
Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Cree una aplicación con Apache Beam
En este ejercicio, creará una aplicación de Managed Service para Apache Flink que transforma datos usando Apache Beam
nota
Para configurar los requisitos previos necesarios para este ejercicio, primero complete el ejercicio Tutorial: Comience a utilizar la DataStream API en Managed Service for Apache Flink.
Este tema contiene las siguientes secciones:
- Cree recursos dependientes
- Escriba registros de muestra en el flujo de entrada
- Descargue y examine el código de la aplicación
- Compila el código de la aplicación
- Cargue el código Java de streaming de Apache Flink
- Cree y ejecute la aplicación Managed Service for Apache Flink
- Limpie los recursos AWS
- Siguientes pasos
Cree recursos dependientes
Antes de crear una aplicación de Managed Service para Apache Flink para este ejercicio, debe crear los siguientes recursos dependientes:
Dos flujos de datos de Kinesis (
ExampleInputStream
yExampleOutputStream
)Un bucket de Amazon S3 para almacenar el código de la aplicación (
ka-app-code-
)<username>
Puede crear los flujos de Kinesis y el bucket de Amazon S3 usando la consola. Si desea obtener instrucciones para crear estos recursos, consulte los siguientes temas:
Creación y actualización de flujos de datos en la Guía para desarrolladores de Amazon Kinesis Data Streams. Asigne un nombre a sus flujos de datos
ExampleInputStream
yExampleOutputStream
.¿Cómo se puede crear un bucket de S3? en la guía de usuario de Amazon Simple Storage Service. Asigne al bucket de Amazon S3 un nombre único globalmente añadiendo su nombre de inicio de sesión, por ejemplo,
ka-app-code-
.<username>
Escriba registros de muestra en el flujo de entrada
En esta sección, se utiliza un script de Python para escribir cadenas asignadas al azar al flujo para que la aplicación realice el procesamiento.
nota
Esta sección requiere AWS SDK for Python (Boto)
-
Cree un archivo denominado
ping.py
con el siguiente contenido:import json import boto3 import random kinesis = boto3.client('kinesis') while True: data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat']) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey")
-
Ejecute el script
ping.py
:$ python ping.py
Mantenga el script en ejecución mientras completa el resto del tutorial.
Descargue y examine el código de la aplicación
El código de la aplicación Java de este ejemplo está disponible en GitHub. Para descargar el código de la aplicación, haga lo siguiente:
Si aún no lo ha hecho, instale el cliente Git. Para obtener más información, consulte Installing Git
. Clone el repositorio remoto con el siguiente comando:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
Vaya al directorio
amazon-kinesis-data-analytics-java-examples/Beam
.
El código de la aplicación se encuentra en el archivo BasicBeamStreamingJob.java
. Tenga en cuenta lo siguiente en relación con el código de la aplicación:
La aplicación utiliza Apache Beam ParDo
para procesar los registros entrantes mediante la invocación de una función de transformación personalizada llamada PingPongFn
.El código para invocar la función
PingPongFn
es el siguiente:.apply("Pong transform", ParDo.of(new PingPongFn())
Las aplicaciones de Managed Service para Apache Flink que usan Apache Beam requieren los siguientes componentes. Si no incluye estos componentes y versiones en su
pom.xml
, la aplicación carga las versiones incorrectas desde las dependencias del entorno y, dado que las versiones no coinciden, la aplicación se bloquea durante el tiempo de ejecución.<jackson.version>2.10.2</jackson.version> ... <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-jaxb-annotations</artifactId> <version>2.10.2</version> </dependency>
La función de
PingPongFn
transformación pasa los datos de entrada al flujo de salida, a menos que los datos de entrada sean ping, en cuyo caso emite la cadena pong\n al flujo de salida.El código de la función de transformación es el siguiente:
private static class PingPongFn extends DoFn<KinesisRecord, byte[]> { private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class); @ProcessElement public void processElement(ProcessContext c) { String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8); if (content.trim().equalsIgnoreCase("ping")) { LOG.info("Ponged!"); c.output("pong\n".getBytes(StandardCharsets.UTF_8)); } else { LOG.info("No action for: " + content); c.output(c.element().getDataAsBytes()); } } }
Compila el código de la aplicación
Para compilar la aplicación, haga lo siguiente:
Si aún no lo ha hecho, instale Java y Maven. Para obtener más información, consulte Complete los requisitos previos requeridos en el tutorial de Tutorial: Comience a utilizar la DataStream API en Managed Service for Apache Flink.
Compile la aplicación con el siguiente comando:
mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
nota
El código fuente proporcionado se basa en bibliotecas de Java 11.
Al compilar la aplicación se crea el JAR archivo de la aplicación (target/basic-beam-app-1.0.jar
).
Cargue el código Java de streaming de Apache Flink
En esta sección, cargará su código de aplicación en el bucket de Amazon S3 que creó en la sección Cree recursos dependientes.
-
En la consola Amazon S3, elija ka-app-code -
<username>
bucket y selecciona Cargar. -
En el paso Seleccionar archivos, elija Añadir archivos. Vaya al archivo
basic-beam-app-1.0.jar
que creó en el paso anterior. No es necesario cambiar ninguno de los ajustes del objeto, por lo tanto, elija Cargar.
El código de la aplicación ya está almacenado en un bucket de Amazon S3 al que la aplicación puede acceder.
Cree y ejecute la aplicación Managed Service for Apache Flink
Siga estos pasos para crear, configurar, actualizar y ejecutar la aplicación mediante la consola.
Creación de la aplicación
Abra la consola del servicio gestionado para Apache Flink en /flink https://console.aws.amazon.com
-
En el panel de Managed Service para Apache Flink, seleccione Crear aplicación de análisis.
-
En la página Managed Service para Apache Flink: crear aplicación, proporcione los detalles de la aplicación de la siguiente manera:
-
En Nombre de la aplicación, escriba
MyApplication
. -
En Tiempo de ejecución, escriba Apache Flink.
nota
Apache Beam no es compatible actualmente con la versión 1.19 o posterior de Apache Flink.
Seleccione Apache Flink versión 1.15 en el menú desplegable de versiones.
-
-
Para los permisos de acceso, seleccione Crear o actualizar rol. IAM
kinesis-analytics-MyApplication-us-west-2
-
Elija Crear aplicación.
nota
Al crear una aplicación de servicio gestionado para Apache Flink mediante la consola, tiene la opción de crear un IAM rol y una política para la aplicación. La aplicación utiliza este rol y la política para acceder a los recursos dependientes. Estos IAM recursos se nombran con el nombre de la aplicación y la región de la siguiente manera:
-
Política:
kinesis-analytics-service-
MyApplication
-us-west-2
-
Rol:
kinesis-analytics-MyApplication-
us-west-2
Edite la IAM política
Edite la IAM política para añadir permisos de acceso a las transmisiones de datos de Kinesis.
Abra la IAM consola en. https://console.aws.amazon.com/iam/
-
Elija Políticas. Elija la política
kinesis-analytics-service-MyApplication-us-west-2
que la consola creó en su nombre en la sección anterior. -
En la página Resumen, elija Editar política. Seleccione la JSONpestaña.
-
Añada la sección subrayada de la siguiente política de ejemplo a la política. Reemplace la cuenta de muestra IDs (
012345678901
) por tu ID de cuenta.{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:
012345678901
:log-group:*", "arn:aws:s3:::ka-app-code-<username>
/basic-beam-app-1.0.jar" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:
] }012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleOutputStream" }
Configura la aplicación
-
En la MyApplicationpágina, elija Configurar.
-
En la página Configurar aplicación, proporcione la Ubicación del código:
-
Para el bucket de Amazon S3, introduzca
ka-app-code-
.<username>
-
En Ruta al objeto de Amazon S3, introduzca
basic-beam-app-1.0.jar
.
-
-
En Acceso a los recursos de la aplicación, en Permisos de acceso, elija Crear o actualizar IAM rol
kinesis-analytics-MyApplication-us-west-2
. -
Introduzca lo siguiente:
ID de grupo Clave Valor BeamApplicationProperties
InputStreamName
ExampleInputStream
BeamApplicationProperties
OutputStreamName
ExampleOutputStream
BeamApplicationProperties
AwsRegion
us-west-2
-
En Monitorización, asegúrese de que el Nivel de métricas de monitorización se ha establecido en Aplicación.
-
Para el CloudWatch registro, active la casilla Activar.
-
Elija Actualizar.
nota
Si decide habilitar el CloudWatch registro, Managed Service for Apache Flink crea un grupo de registros y un flujo de registros automáticamente. Los nombres de estos recursos son los siguientes:
-
Grupo de registro:
/aws/kinesis-analytics/MyApplication
-
Flujo de registro:
kinesis-analytics-log-stream
Este flujo de registro se utiliza para supervisar la aplicación. No es el mismo flujo de registro que utiliza la aplicación para enviar los resultados.
Ejecución de la aplicación
Para ver el gráfico de trabajos de Flink, ejecute la aplicación, abra el panel de Apache Flink y elija el trabajo de Flink que desee.
Puede comprobar las métricas del servicio gestionado para Apache Flink en la CloudWatch consola para comprobar que la aplicación funciona.
Limpie los recursos AWS
Esta sección incluye procedimientos para limpiar los AWS recursos creados en el tutorial Tumbling Window.
Este tema contiene las siguientes secciones:
Elimine su aplicación Managed Service for Apache Flink
Abra la consola de Managed Service for Apache Flink en /flink https://console.aws.amazon.com
en el panel Servicio gestionado para Apache Flink, elija. MyApplication
En la página de la aplicación, seleccione Eliminar y, a continuación, confirme la eliminación.
Elimine sus transmisiones de datos de Kinesis
Abra la consola de Kinesis en https://console.aws.amazon.com /kinesis.
En el panel Kinesis Data Streams, ExampleInputStreamelija.
En la ExampleInputStreampágina, elija Eliminar Kinesis Stream y, a continuación, confirme la eliminación.
En la página de transmisiones de Kinesis, elija, elija Acciones ExampleOutputStream, elija Eliminar y, a continuación, confirme la eliminación.
Elimine el objeto y el bucket de Amazon S3
Abra la consola Amazon S3 en https://console.aws.amazon.com/s3/
. Elija el ka-app-code -
<username>
balde.Elija Eliminar y luego ingrese el nombre del bucket para confirmar la eliminación.
Elimine sus IAM recursos
Abre la IAM consola en https://console.aws.amazon.com/iam/
. En la barra de navegación, seleccione Políticas.
En el control de filtros, introduzca kinesis.
Elija la política kinesis-analytics-service- MyApplication -us-west-2.
Seleccione Acciones de política y, a continuación, Eliminar.
En la barra de navegación, seleccione Roles.
Elija el rol kinesis-analytics- MyApplication -us-west-2.
Elija Eliminar rol y, a continuación, confirme la eliminación.
CloudWatch Elimine sus recursos
Abre la CloudWatch consola en https://console.aws.amazon.com/cloudwatch/
. En la barra de navegación, elija Registros.
Elija el grupo de registros MyApplication/aws/kinesis-analytics/.
Elija Eliminar grupo de registro y, a continuación, confirme la eliminación.
Siguientes pasos
Ya que ha creado y ejecutado una aplicación básica de Managed Service para Apache Flink que transforma los datos usando Apache Beam, consulte la siguiente aplicación para encontrar un ejemplo de una solución más avanzada de Managed Service para Apache Flink.
Taller sobre streaming de Managed Service para Apache Flink
: en este taller, analizamos un ejemplo integral que combina aspectos de transmisión por lotes y streaming en una canalización uniforme de Apache Beam.