Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。
本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 Apache Flink 受管理服務中的運算子來轉換資料 DataStream API
若要在 Managed Service for Apache Flink 中轉換傳入的資料,請使用 Apache Flink 運算子。Apache Flink 運算子可將一或多個資料串流轉換為新的資料串流。新的資料串流包含來自原始資料串流的修改資料。Apache Flink 提供了超過 25 個預先建置的串流處理運算子。如需詳細資訊,請參閱 Apache Flink 文件中的運算子
使用轉換運算子
以下是在JSON資料串流的其中一個欄位上進行簡單文字轉換的範例。
此程式碼會建立轉換後的資料串流。新資料串流具有與原始串流相同的資料,並在 TICKER
欄位內容後面附加 Company
字串。
DataStream<ObjectNode> output = input.map( new MapFunction<ObjectNode, ObjectNode>() { @Override public ObjectNode map(ObjectNode value) throws Exception { return value.put("TICKER", value.get("TICKER").asText() + " Company"); } } );
使用彙總運算子
以下是彙總運算子的範例。程式碼會建立彙總的資料串流。運算子會建立 5 秒的翻轉視窗,並傳回視窗中具有相同 TICKER
值之記錄的 PRICE
值總和。
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText()) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .reduce((node1, node2) -> { double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble(); node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal)); return node1; });
如需更多程式碼範例,請參閱為 Apache Flink 應用程式建立及使用受管理服務的範例。