

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 Managed Service for Apache Flink 中的運算子搭配 DataStream API 轉換資料
<a name="how-operators"></a>

若要在 Managed Service for Apache Flink 中轉換傳入的資料，請使用 Apache Flink *運算子*。Apache Flink 運算子可將一或多個資料串流轉換為新的資料串流。新的資料串流包含來自原始資料串流的修改資料。Apache Flink 提供了超過 25 個預先建置的串流處理運算子。如需詳細資訊，請參閱 Apache Flink 文件中的[運算子](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/)。

**Topics**
+ [使用轉換運算子](#how-operators-transform)
+ [使用彙總運算子](#how-operators-agg)

## 使用轉換運算子
<a name="how-operators-transform"></a>

以下是在 JSON 資料串流的其中一個欄位上進行簡單文字轉換的範例。

此程式碼會建立轉換後的資料串流。新資料串流具有與原始串流相同的資料，並在 `TICKER` 欄位內容後面附加 ` Company` 字串。

```
DataStream<ObjectNode> output = input.map(
    new MapFunction<ObjectNode, ObjectNode>() {
        @Override
        public ObjectNode map(ObjectNode value) throws Exception {
            return value.put("TICKER", value.get("TICKER").asText() + " Company");
        }
    }
);
```

## 使用彙總運算子
<a name="how-operators-agg"></a>

以下是彙總運算子的範例。程式碼會建立彙總的資料串流。運算子會建立 5 秒的翻轉視窗，並傳回視窗中具有相同 `TICKER` 值之記錄的 `PRICE` 值總和。

```
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((node1, node2) -> {
        double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();
        node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));
    return node1;
});
```

如需更多程式碼範例，請參閱 [建立和使用 Managed Service for Apache Flink 應用程式的範例](examples-collapsibles.md)。