

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Transformez les données à l'aide d'opérateurs dans Managed Service pour Apache Flink avec l'API DataStream
<a name="how-operators"></a>

Pour transformer les données entrantes dans un service géré pour Apache Flink, vous devez utiliser un *opérateur* Apache Flink. Un opérateur Apache Flink transforme un ou plusieurs flux de données en un nouveau flux de données. Le nouveau flux de données contient des données modifiées par rapport au flux de données d’origine. Apache Flink fournit plus de 25 opérateurs de traitement de flux prédéfinis. Pour plus d'informations, consultez la section [Opérateurs](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/) dans la documentation d'Apache Flink.

**Topics**
+ [Utiliser des opérateurs de transformation](#how-operators-transform)
+ [Utiliser des opérateurs d'agrégation](#how-operators-agg)

## Utiliser des opérateurs de transformation
<a name="how-operators-transform"></a>

Voici un exemple de transformation de texte simple sur l’un des champs d’un flux de données JSON. 

Ce code crée un flux de données transformé. Le nouveau flux de données contient les mêmes données que le flux d’origine, la chaîne « ` Company` » étant ajoutée au contenu du champ `TICKER`.

```
DataStream<ObjectNode> output = input.map(
    new MapFunction<ObjectNode, ObjectNode>() {
        @Override
        public ObjectNode map(ObjectNode value) throws Exception {
            return value.put("TICKER", value.get("TICKER").asText() + " Company");
        }
    }
);
```

## Utiliser des opérateurs d'agrégation
<a name="how-operators-agg"></a>

Voici un exemple d’opérateur d’agrégation. Le code crée un flux de données agrégé. L’opérateur crée une fenêtre variable de 5 secondes et renvoie la somme des valeurs `PRICE` des enregistrements de la fenêtre avec la même valeur `TICKER`.

```
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((node1, node2) -> {
        double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();
        node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));
    return node1;
});
```

Pour plus d’exemples de code, consultez [Exemples de création et d'utilisation d'un service géré pour les applications Apache Flink](examples-collapsibles.md). 