Trasforma i dati utilizzando gli operatori in Managed Service for Apache Flink con DataStream API - Servizio gestito per Apache Flink

Il servizio gestito da Amazon per Apache Flink era precedentemente noto come Analisi dei dati Amazon Kinesis per Apache Flink.

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Trasforma i dati utilizzando gli operatori in Managed Service for Apache Flink con DataStream API

Per trasformare i dati in entrata in un servizio gestito per Apache Flink viene utilizzato un operatore Apache Flink. Un operatore Apache Flink trasforma uno o più flussi di dati in un nuovo flusso di dati. Il nuovo flusso di dati contiene dati modificati dal flusso di dati originale. Apache Flink offre più di 25 operatori di elaborazione di flussi predefiniti. Per ulteriori informazioni, consulta Operatori nella documentazione di Apache Flink.

Questo argomento contiene le sezioni seguenti:

Usa gli operatori di trasformazione

Di seguito è riportato un esempio di una semplice trasformazione del testo su uno dei campi di un flusso di JSON dati.

Questo codice crea un flusso di dati trasformato. Il nuovo flusso di dati contiene gli stessi dati del flusso originale, con la stringa " Company" aggiunta al contenuto del campo TICKER.

DataStream<ObjectNode> output = input.map( new MapFunction<ObjectNode, ObjectNode>() { @Override public ObjectNode map(ObjectNode value) throws Exception { return value.put("TICKER", value.get("TICKER").asText() + " Company"); } } );

Utilizza operatori di aggregazione

Di seguito è riportato un esempio di operatore di aggregazione. Il codice crea un flusso di dati aggregato. L'operatore crea una finestra a cascata di 5 secondi e restituisce la somma dei valori PRICE per i record nella finestra con lo stesso valore TICKER.

DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText()) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .reduce((node1, node2) -> { double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble(); node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal)); return node1; });

Per altri esempi di codice, consultaEsempi di creazione e utilizzo di Managed Service per applicazioni Apache Flink.