Transformez les données à l'aide d'opérateurs dans Managed Service pour Apache Flink avec DataStream API - Service géré pour Apache Flink

Le service géré Amazon pour Apache Flink était auparavant connu sous le nom d’Amazon Kinesis Data Analytics pour Apache Flink.

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Transformez les données à l'aide d'opérateurs dans Managed Service pour Apache Flink avec DataStream API

Pour transformer les données entrantes dans un service géré pour Apache Flink, vous devez utiliser un opérateur Apache Flink. Un opérateur Apache Flink transforme un ou plusieurs flux de données en un nouveau flux de données. Le nouveau flux de données contient des données modifiées par rapport au flux de données d’origine. Apache Flink fournit plus de 25 opérateurs de traitement de flux prédéfinis. Pour plus d'informations, consultez la section Opérateurs dans la documentation d'Apache Flink.

Utiliser des opérateurs de transformation

Voici un exemple de transformation de texte simple sur l'un des champs d'un flux de JSON données.

Ce code crée un flux de données transformé. Le nouveau flux de données contient les mêmes données que le flux d’origine, la chaîne «  Company » étant ajoutée au contenu du champ TICKER.

DataStream<ObjectNode> output = input.map( new MapFunction<ObjectNode, ObjectNode>() { @Override public ObjectNode map(ObjectNode value) throws Exception { return value.put("TICKER", value.get("TICKER").asText() + " Company"); } } );

Utiliser des opérateurs d'agrégation

Voici un exemple d’opérateur d’agrégation. Le code crée un flux de données agrégé. L’opérateur crée une fenêtre variable de 5 secondes et renvoie la somme des valeurs PRICE des enregistrements de la fenêtre avec la même valeur TICKER.

DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText()) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .reduce((node1, node2) -> { double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble(); node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal)); return node1; });

Pour plus d'exemples de code, consultezExemples de création et d'utilisation d'un service géré pour les applications Apache Flink.