Transforme los datos mediante operadores en Managed Service for Apache Flink con DataStream API - Managed Service para Apache Flink

Amazon Managed Service para Apache Flink Amazon se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Transforme los datos mediante operadores en Managed Service for Apache Flink con DataStream API

Para transformar los datos entrantes en un Managed Service para Apache Flink, utiliza un operador de Apache Flink. Un operador de Apache Flink transforma uno o más flujos de datos en un nuevo flujo de datos. El nuevo flujo de datos contiene datos modificados del flujo de datos original. Apache Flink proporciona más de 25 operadores de procesamiento de flujos prediseñados. Para obtener más información, consulte Operadores en la documentación de Apache Flink.

Utilice operadores de transformación

A continuación se muestra un ejemplo de una transformación de texto sencilla en uno de los campos de un flujo de JSON datos.

Este código crea un flujo de datos transformado. El nuevo flujo de datos tiene los mismos datos que el flujo original, con la cadena “ Company” anexada al contenido del campo TICKER.

DataStream<ObjectNode> output = input.map( new MapFunction<ObjectNode, ObjectNode>() { @Override public ObjectNode map(ObjectNode value) throws Exception { return value.put("TICKER", value.get("TICKER").asText() + " Company"); } } );

Utilice operadores de agregación

A continuación se muestra un ejemplo de operador de agregación. El código crea un flujo de datos agregado. El operador crea una ventana de caída de 5 segundos y devuelve la suma de los valores PRICE de los registros de la ventana con el mismo valor TICKER.

DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText()) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .reduce((node1, node2) -> { double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble(); node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal)); return node1; });

Para ver más ejemplos de código, consulteEjemplos para crear aplicaciones de Managed Service for Apache Flink y trabajar con ellas.