Managed Service for Apache Flink の演算子を使用したデータの変換 DataStream API - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Managed Service for Apache Flink の演算子を使用したデータの変換 DataStream API

Apache Flink 用 Managed Service の受信データを変換するには、Apache Flink「オペレータ」を使用します。Apache Flink オペレータは 1 つ以上のデータストリームを新しいデータストリームに変換します。新しいデータストリームには、元のデータストリームから変更されたデータが含まれます。Apache Flink には 25 種類以上のストリーム処理オペレータがあらかじめ組み込まれています。詳細については、「Apache Flink ドキュメント」の「オペレータ」を参照してください。

このトピックには、次のセクションが含まれています。

変換演算子を使用する

以下は、JSONデータストリームのフィールドの 1 つに対する単純なテキスト変換の例です。

このコードは変換されたデータストリームを作成します。新しいデータストリームは、元のストリームと同じデータを持ち、 TICKER フィールドの内容に文字列 Company が追加されます。

DataStream<ObjectNode> output = input.map( new MapFunction<ObjectNode, ObjectNode>() { @Override public ObjectNode map(ObjectNode value) throws Exception { return value.put("TICKER", value.get("TICKER").asText() + " Company"); } } );

集計演算子を使用する

次は集約オペレータの例です。このコードは集約されたデータストリームを作成します。このオペレータは 5 秒間のタンブリングウィンドウを作成し、ウィンドウ内の同じ TICKER 値を持つレコードの PRICE 値の合計を返します。

DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText()) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .reduce((node1, node2) -> { double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble(); node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal)); return node1; });

その他のコード例については、「」を参照してくださいManaged Service for Apache Flink アプリケーションの作成と使用の例