

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# DataStream API コンポーネントを確認する
<a name="how-datastream"></a>

Apache Flink アプリケーションは「[Apache Flink データストリーム API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/)」を使用してデータストリーム内のデータを変換します。

このセクションでは、データを移動、変換、追跡するさまざまなコンポーネントについて説明します。
+ [コネクタと DataStream API を使用して、Managed Service for Apache Flink でデータを移動する](how-connectors.md): これらのコンポーネントは、アプリケーションと外部データソースおよび宛先との間でデータを移動します。
+ [DataStream API と Managed Service for Apache Flink のオペレーターを使用してデータを変換する](how-operators.md): これらのコンポーネントは、アプリケーション内のデータ要素を変換またはグループ化します。
+ [DataStream API を使用して Managed Service for Apache Flink のイベントをトラッキングする](how-time.md): このトピックでは、Apache Flink 用 Managed Service が DataStream API を使用する際にイベントをトラッキングする方法について説明します。

# コネクタと DataStream API を使用して、Managed Service for Apache Flink でデータを移動する
<a name="how-connectors"></a>

Amazon Managed Service for Apache Flink DataStream API では、「コネクタ」とはApache Flink 用 Managed Serviceアプリケーションとの間でデータをやり取りするソフトウェアコンポーネントです。コネクタは、ファイルやディレクトリからの読み取りが可能になる柔軟な統合です。コネクタは、Amazon のサービスやサードパーティのシステムとやり取りするための完全なモジュールで構成されています。

コネクターの種類には、次のものがあります。
+ [ストリーミングデータソースを追加する](how-sources.md): Kinesis データストリーム、ファイル、またはその他のデータソースからアプリケーションにデータを提供します。
+ [シンクを使用してデータを書き込む](how-sinks.md): アプリケーションから、Kinesis データストリーム、Firehose ストリーム、またはその他のデータ送信先にデータを送信します。
+ [非同期 I/O を使用する](how-async.md): データソース (データベースなど) への非同期アクセスを提供し、ストリームイベントを充実させます。

## 使用可能なコネクタ
<a name="how-connectors-list"></a>

Apache Flink フレームワークには、さまざまなソースのデータにアクセスするためのコネクターが含まれています。Apache Flink フレームワークで使用できるコネクタについては、「[Apache Flink ドキュメント](https://nightlies.apache.org/flink/flink-docs-release-1.15/)」の「[コネクタ](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/connectors/)」を参照してください。

**警告**  
Flink 1.6、1.8、1.11、または 1.13 で実行中のアプリケーションがあり、中東 (UAE)、アジアパシフィック (ハイデラバード)、イスラエル (テルアビブ)、欧州 (チューリッヒ)、アジアパシフィック (メルボルン)、またはアジアパシフィック (ジャカルタ) リージョンで実行したい場合は、更新されたコネクタを使用してアプリケーションアーカイブを再構築するか、Flink 1.18 にアップグレードすることが必要になる可能性があります。  
Apache Flink コネクタは、独自のオープンソースリポジトリに保存されます。バージョン 1.18 以降にアップグレードする場合は、依存関係を更新する必要があります。Apache Flink AWS コネクタのリポジトリにアクセスするには、[flink-connector-aws](https://github.com/apache/flink-connector-aws)」を参照してください。  
以前の Kinesis ソース `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer` は廃止されており、Flink の今後のリリースでは削除される可能性があります。代わりに [Kinesis ソース](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-source)を使用してください。  
`FlinkKinesisConsumer` と `KinesisStreamsSource` の間に状態の互換性はありません。詳細については、Apache Flink ドキュメントの「[Migrating existing jobs to new Kinesis Streams Source](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#migrating-existing-jobs-to-new-kinesis-streams-source-from-kinesis-consumer)」を参照してください。  
 以下は推奨されるガイドラインです。  


**コネクタのアップグレード**  

| Flink バージョン | 使用されるコネクタ | 解決策 | 
| --- | --- | --- | 
| 1.19、1.20 | Kinesis ソース |  Managed Service for Apache Flink バージョン 1.19 および 1.20 にアップグレードする場合は、最新の Kinesis Data Streams ソースコネクタを使用していることを確認してください。5.0.0 以降のバージョンである必要があります。詳細については、「[Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/)」を参照してください。  | 
| 1.19、1.20 | Kinesis シンク |  Managed Service for Apache Flink バージョン 1.19 および 1.20 にアップグレードする場合は、最新の Kinesis Data Streams シンクコネクタを使用していることを確認してください。5.0.0 以降のバージョンである必要があります。詳細については、「[Kinesis Streams Sink](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-sink)」を参照してください。  | 
| 1.19、1.20 | DynamoDB Streams ソース |  Managed Service for Apache Flink バージョン 1.19 および 1.20 にアップグレードする場合は、最新の DynamoDB Streams ソースコネクタを使用していることを確認してください。5.0.0 以降のバージョンである必要があります。詳細については、「[Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/)」を参照してください。  | 
| 1.19、1.20 | DynamoDB シンク | Managed Service for Apache Flink バージョン 1.19 および 1.20 にアップグレードする場合は、最新の DynamoDB シンクコネクタを使用していることを確認してください。5.0.0 以降のバージョンである必要があります。詳細については、「[Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/)」を参照してください。 | 
| 1.19、1.20 | Amazon SQS シンク |  Managed Service for Apache Flink バージョン 1.19 および 1.20 にアップグレードする場合は、最新の Amazon SQS シンクコネクタを使用していることを確認してください。5.0.0 以降のバージョンである必要があります。詳細については、「[Amazon SQS Sink](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/sqs/)」を参照してください。  | 
| 1.19、1.20 | Amazon Managed Service for Prometheus シンク |  Managed Service for Apache Flink バージョン 1.19 および 1.20 にアップグレードする場合は、最新の Amazon Managed Service for Prometheus シンクコネクタを使用していることを確認してください。1.0.0 以降のバージョンである必要があります。詳細については、「[Prometheus Sink](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/prometheus/)」を参照してください。  | 

# Managed Service for Apache Flink にストリーミングデータソースを追加する
<a name="how-sources"></a>

Apache Flink には、ファイル、ソケット、コレクション、カスタムソースから読み取るためのコネクタが用意されています。アプリケーションコードでは、「[Apache Flink ソース](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources)」を使用してストリームからデータを受信します。このセクションでは、Amazon サービスで利用できるソースについて説明します。

## Kinesis Data Streams を使用する
<a name="input-streams"></a>

`KinesisStreamsSource` は Amazon Kinesis データストリームからアプリケーションにストリーミングデータを提供します。

### `KinesisStreamsSource` を作成する
<a name="input-streams-create"></a>

次のコード例は、`KinesisStreamsSource` の作成を示しています。

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

`KinesisStreamsSource` の使用の詳細については、Apache Flink ドキュメントの「[Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/)」および [Github で が公開している KinesisConnectors の例](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors)を参照してください。

### EFO コンシューマーを使用する `KinesisStreamsSource` を作成する
<a name="input-streams-efo"></a>

`KinesisStreamsSource` で[拡張ファンアウト (EFO)](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/) がサポートされるようになりました。

Kinesis コンシューマーが EFO を使用する場合、Kinesis Data Streams サービスは、コンシューマーがストリームの固定帯域幅を、ストリームから読み取る他のコンシューマーと共有するのではなく、独自の専用帯域幅を提供します。

Kinesis コンシューマーで EFO を使用する方法の詳細については、[FLIP-128: Kinesis AWS コンシューマーの拡張ファンアウト](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers)」を参照してください。

EFO コンシューマーを有効にするには、Kinesis コンシューマーで次のパラメータを設定します。
+ **READER\$1TYPE:** アプリケーションが EFO コンシューマーを使用して Kinesis Data Streams データにアクセスできるようにするには、このパラメータを **EFO** に設定します。
+ **EFO\$1CONSUMER\$1NAME: **このパラメータを、このストリームのコンシューマー間で一意の文字列値に設定します。同じ Kinesis Data Stream でコンシューマー名を再利用すると、その名前を使用していた以前のコンシューマーは終了します。

EFO を使用するように `KinesisStreamsSource` を設定するには、コンシューマーに以下のパラメータを追加します。

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

EFO コンシューマーを使用する Managed Service for Apache Flink アプリケーションの例については、[Github で が公開している Kinesis コネクタの例](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors)を参照してください。

## Amazon MSK を使用する
<a name="input-msk"></a>

`KafkaSource` ソースは Amazon MSK トピックからアプリケーションにストリーミングデータを提供します。

### `KafkaSource` を作成する
<a name="input-msk-create"></a>

次のコード例は、`KafkaSource` の作成を示しています。

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

`KafkaSource` の使用方法の詳細については、「[MSK レプリケーション](earlier.md#example-msk)」を参照してください。

# Managed Service for Apache Flink でシンクを使用してデータを書き込む
<a name="how-sinks"></a>

アプリケーションコードでは、任意の [Apache Flink シンク](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/overview/) コネクタを使用して、Kinesis Data Streams や DynamoDB などの AWS サービスを含む外部システムに書き込むことができます。

Apache Flink には、ファイル用やソケット用のシンクも用意されており、カスタムシンクを実行することもできます。さまざまなシンクがサポートされていますが、その中でも、次のものがよく使用されます。

## Kinesis Data Streams を使用する
<a name="sinks-streams"></a>

Apache Flink では、「[Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/)」に関する情報が Apache Flink ドキュメントに記載されています。

Kinesis データストリームを入力と出力に使用するアプリケーションの例については、 [チュートリアル: Managed Service for Apache Flink で DataStream API 使用の概要](getting-started.md) を参照してください。

## Apache Kafka と Amazon Managed Streaming for Apache Kafka (MSK) を使用する
<a name="sinks-MSK"></a>

[Apache Flink Kafka コネクタ](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#kafka-sink)は、Apache Kafka と Amazon MSK にデータを公開するための広範なサポートを提供します。これには、1 回限りの保証が含まれます。Kafka に書き込む方法については、Apache Flink ドキュメントの「[Kafka Connectors examples](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConnectors)」を参照してください。

## Amazon S3 を使用する
<a name="sinks-s3"></a>

Amazon S3 バケットにオブジェクトを書き込むには、Apache Flink `StreamingFileSink` を使用できます。

S3 にオブジェクトを書き込む方法の例については、 [「例: Amazon S3 バケットに書き込む」](earlier.md#examples-s3) を参照してください。

## Firehose を使用する
<a name="sinks-firehose"></a>

`FlinkKinesisFirehoseProducer` は、[Firehose](https://docs.aws.amazon.com/firehose/latest/dev/) サービスを使用してアプリケーション出力を保存するための、信頼性が高くスケーラブルな Apache Flink シンクです。このセクションでは、Maven プロジェクトをセットアップして `FlinkKinesisFirehoseProducer` を作成・使用するための設定方法について説明します。

**Topics**
+ [`FlinkKinesisFirehoseProducer` を作成する](#sinks-firehose-create)
+ [`FlinkKinesisFirehoseProducer` コード例](#sinks-firehose-sample)

### `FlinkKinesisFirehoseProducer` を作成する
<a name="sinks-firehose-create"></a>

次のコード例は、`FlinkKinesisFirehoseProducer` の作成を示しています。

```
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
```

### `FlinkKinesisFirehoseProducer` コード例
<a name="sinks-firehose-sample"></a>

次のコード例は、`FlinkKinesisFirehoseProducer` を作成して設定し、Apache Flink データストリームから Firehose サービスにデータを送信する方法を示しています。

```
 
package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class StreamingJob {

	private static final String region = "us-east-1";
	private static final String inputStreamName = "ExampleInputStream";
	private static final String outputStreamName = "ExampleOutputStream";

	private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
	}

	private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
			throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
				applicationProperties.get("ConsumerConfigProperties")));
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Properties outputProperties = new Properties();
		outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(), outputProperties);
		ProducerConfigConstants config = new ProducerConfigConstants();
		return sink;
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(),
				applicationProperties.get("ProducerConfigProperties"));
		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		/*
		 * if you would like to use runtime configuration properties, uncomment the
		 * lines below
		 * DataStream<String> input = createSourceFromApplicationProperties(env);
		 */

		DataStream<String> input = createSourceFromStaticConfig(env);

		// Kinesis Firehose sink
		input.addSink(createFirehoseSinkFromStaticConfig());

		// If you would like to use runtime configuration properties, uncomment the
		// lines below
		// input.addSink(createFirehoseSinkFromApplicationProperties());

		env.execute("Flink Streaming Java API Skeleton");
	}
}
```

Firehose シンクの使用方法に関する詳細なチュートリアルについては、「[「例: Firehose に書き込む」](earlier.md#get-started-exercise-fh)」を参照してください。

# Managed Service for Apache Flink で非同期 I/O を使用する
<a name="how-async"></a>

非同期 I/O オペレータは、データベースなどの外部データソースを使用してストリームデータを強化します。Apache Flink 用 Managed Serviceはストリームイベントを非同期的に強化するため、リクエストを一括処理して効率を高めることができます。

詳細については、Apache Flink ドキュメントの「[Asynchronous I/O](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/asyncio/)」を参照してください。

# DataStream API と Managed Service for Apache Flink のオペレーターを使用してデータを変換する
<a name="how-operators"></a>

Apache Flink 用 Managed Service の受信データを変換するには、Apache Flink「オペレータ」を使用します。Apache Flink オペレータは 1 つ以上のデータストリームを新しいデータストリームに変換します。新しいデータストリームには、元のデータストリームから変更されたデータが含まれます。Apache Flink には 25 種類以上のストリーム処理オペレータがあらかじめ組み込まれています。詳細については、Apache Flink ドキュメントの「[オペレーター](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/)」を参照してください。

**Topics**
+ [変換オペレーターを使用する](#how-operators-transform)
+ [集約オペレーターを使用する](#how-operators-agg)

## 変換オペレーターを使用する
<a name="how-operators-transform"></a>

JSON データストリームのいずれか１つのフィールドの簡単なテキスト変換の例を次に示します。

このコードは変換されたデータストリームを作成します。新しいデータストリームは、元のストリームと同じデータを持ち、 `TICKER` フィールドの内容に文字列 ` Company` が追加されます。

```
DataStream<ObjectNode> output = input.map(
    new MapFunction<ObjectNode, ObjectNode>() {
        @Override
        public ObjectNode map(ObjectNode value) throws Exception {
            return value.put("TICKER", value.get("TICKER").asText() + " Company");
        }
    }
);
```

## 集約オペレーターを使用する
<a name="how-operators-agg"></a>

次は集約オペレータの例です。このコードは集約されたデータストリームを作成します。このオペレータは 5 秒間のタンブリングウィンドウを作成し、ウィンドウ内の同じ `TICKER` 値を持つレコードの `PRICE` 値の合計を返します。

```
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((node1, node2) -> {
        double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();
        node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));
    return node1;
});
```

コード例については、「[Managed Service for Apache Flink アプリケーションの作成と操作の例](examples-collapsibles.md)」を参照してください。

# DataStream API を使用して Managed Service for Apache Flink のイベントをトラッキングする
<a name="how-time"></a>

Apache Flink 用 Managed Service は、次のタイムスタンプを使用してイベントを追跡します。
+ 「**Processing Time:**」それぞれの操作を実行しているマシンのシステム時間を指します。
+ 「**イベント時間:**」各イベントが発生デバイスで発生した時刻を指します。
+ 「**取り込み時間:**」Apache Flink サービス用 Managed Service にイベントが入るまでの時間を指します。

`setStreamTimeCharacteristic` を使用してストリーミング環境が使用する時間を設定します。

```
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
```

詳細については、「Apache Flink ドキュメント」の「[Generating Watermarks](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/event-time/generating_watermarks/)」を参照してください。