本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 Managed Service for Apache Flink 中的運算子搭配 DataStream API 轉換資料
若要在 Managed Service for Apache Flink 中轉換傳入的資料,請使用 Apache Flink 運算子。Apache Flink 運算子可將一或多個資料串流轉換為新的資料串流。新的資料串流包含來自原始資料串流的修改資料。Apache Flink 提供了超過 25 個預先建置的串流處理運算子。如需詳細資訊,請參閱 Apache Flink 文件中的運算子
使用轉換運算子
以下是在 JSON 資料串流的其中一個欄位上進行簡單文字轉換的範例。
此程式碼會建立轉換後的資料串流。新資料串流具有與原始串流相同的資料,並在 TICKER 欄位內容後面附加 Company 字串。
DataStream<ObjectNode> output = input.map( new MapFunction<ObjectNode, ObjectNode>() { @Override public ObjectNode map(ObjectNode value) throws Exception { return value.put("TICKER", value.get("TICKER").asText() + " Company"); } } );
使用彙總運算子
以下是彙總運算子的範例。程式碼會建立彙總的資料串流。運算子會建立 5 秒的翻轉視窗,並傳回視窗中具有相同 TICKER 值之記錄的 PRICE 值總和。
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText()) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .reduce((node1, node2) -> { double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble(); node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal)); return node1; });
如需更多程式碼範例,請參閱 建立和使用 Managed Service for Apache Flink 應用程式的範例。