

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Trasforma i dati utilizzando gli operatori in Managed Service for Apache Flink con l'API DataStream
<a name="how-operators"></a>

Per trasformare i dati in entrata in un servizio gestito per Apache Flink viene utilizzato un operatore *Apache Flink*. Un operatore Apache Flink trasforma uno o più flussi di dati in un nuovo flusso di dati. Il nuovo flusso di dati contiene dati modificati dal flusso di dati originale. Apache Flink offre più di 25 operatori di elaborazione di flussi predefiniti. Per ulteriori informazioni, consulta [Operatori](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/) nella documentazione di Apache Flink.

**Topics**
+ [

## Usa gli operatori di trasformazione
](#how-operators-transform)
+ [

## Usa gli operatori di aggregazione
](#how-operators-agg)

## Usa gli operatori di trasformazione
<a name="how-operators-transform"></a>

Di seguito è riportato un esempio di semplice trasformazione del testo su uno dei campi di un flusso di dati JSON. 

Questo codice crea un flusso di dati trasformato. Il nuovo flusso di dati contiene gli stessi dati del flusso originale, con la stringa "` Company`" aggiunta al contenuto del campo `TICKER`.

```
DataStream<ObjectNode> output = input.map(
    new MapFunction<ObjectNode, ObjectNode>() {
        @Override
        public ObjectNode map(ObjectNode value) throws Exception {
            return value.put("TICKER", value.get("TICKER").asText() + " Company");
        }
    }
);
```

## Usa gli operatori di aggregazione
<a name="how-operators-agg"></a>

Di seguito è riportato un esempio di operatore di aggregazione. Il codice crea un flusso di dati aggregato. L'operatore crea una finestra a cascata di 5 secondi e restituisce la somma dei valori `PRICE` per i record nella finestra con lo stesso valore `TICKER`.

```
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((node1, node2) -> {
        double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();
        node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));
    return node1;
});
```

Per ulteriori esempi di codice, consulta [Esempi di creazione e utilizzo di Managed Service per applicazioni Apache Flink](examples-collapsibles.md). 