

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# DataStream API と Managed Service for Apache Flink のオペレーターを使用してデータを変換する
<a name="how-operators"></a>

Apache Flink 用 Managed Service の受信データを変換するには、Apache Flink「オペレータ」を使用します。Apache Flink オペレータは 1 つ以上のデータストリームを新しいデータストリームに変換します。新しいデータストリームには、元のデータストリームから変更されたデータが含まれます。Apache Flink には 25 種類以上のストリーム処理オペレータがあらかじめ組み込まれています。詳細については、Apache Flink ドキュメントの「[オペレーター](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/)」を参照してください。

**Topics**
+ [変換オペレーターを使用する](#how-operators-transform)
+ [集約オペレーターを使用する](#how-operators-agg)

## 変換オペレーターを使用する
<a name="how-operators-transform"></a>

JSON データストリームのいずれか１つのフィールドの簡単なテキスト変換の例を次に示します。

このコードは変換されたデータストリームを作成します。新しいデータストリームは、元のストリームと同じデータを持ち、 `TICKER` フィールドの内容に文字列 ` Company` が追加されます。

```
DataStream<ObjectNode> output = input.map(
    new MapFunction<ObjectNode, ObjectNode>() {
        @Override
        public ObjectNode map(ObjectNode value) throws Exception {
            return value.put("TICKER", value.get("TICKER").asText() + " Company");
        }
    }
);
```

## 集約オペレーターを使用する
<a name="how-operators-agg"></a>

次は集約オペレータの例です。このコードは集約されたデータストリームを作成します。このオペレータは 5 秒間のタンブリングウィンドウを作成し、ウィンドウ内の同じ `TICKER` 値を持つレコードの `PRICE` 値の合計を返します。

```
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((node1, node2) -> {
        double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();
        node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));
    return node1;
});
```

コード例については、「[Managed Service for Apache Flink アプリケーションの作成と操作の例](examples-collapsibles.md)」を参照してください。