

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 API 在 Apache Flink 托管服务中使用运算符转换数据 DataStream
<a name="how-operators"></a>

要在中转换传入数据，您可以使用 Apache Flink *运算符*。Apache Flink 运算符将一个或多个数据流转换为新的数据流。新数据流包含来自原始数据流的修改的数据。Apache Flink 提供超过 25 个预构建的流处理运算符。有关更多信息，请参阅 pache Flink 文档中的[运算符](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/)。

**Topics**
+ [使用转换运算符](#how-operators-transform)
+ [使用聚合操作符](#how-operators-agg)

## 使用转换运算符
<a name="how-operators-transform"></a>

以下是对 JSON 数据流的某个字段进行简单文本转换的示例。

该代码创建转换的数据流。新数据流具有与原始流相同的数据，并在 `TICKER` 字段内容后面附加“` Company`”字符串。

```
DataStream<ObjectNode> output = input.map(
    new MapFunction<ObjectNode, ObjectNode>() {
        @Override
        public ObjectNode map(ObjectNode value) throws Exception {
            return value.put("TICKER", value.get("TICKER").asText() + " Company");
        }
    }
);
```

## 使用聚合操作符
<a name="how-operators-agg"></a>

以下是一个聚合运算符示例。该代码创建聚合的数据流。该运算符创建一个 5 秒的滚动窗口，并返回窗口中具有相同 `TICKER` 值的记录的 `PRICE` 值之和。

```
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((node1, node2) -> {
        double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();
        node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));
    return node1;
});
```

有关更多代码示例，请参阅 [创建和使用 Managed Service for Apache Flink 应用程序的示例](examples-collapsibles.md)。