

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Managed Service for Apache Flink にストリーミングデータソースを追加する
<a name="how-sources"></a>

Apache Flink には、ファイル、ソケット、コレクション、カスタムソースから読み取るためのコネクタが用意されています。アプリケーションコードでは、「[Apache Flink ソース](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources)」を使用してストリームからデータを受信します。このセクションでは、Amazon サービスで利用できるソースについて説明します。

## Kinesis Data Streams を使用する
<a name="input-streams"></a>

`KinesisStreamsSource` は Amazon Kinesis データストリームからアプリケーションにストリーミングデータを提供します。

### `KinesisStreamsSource` を作成する
<a name="input-streams-create"></a>

次のコード例は、`KinesisStreamsSource` の作成を示しています。

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

`KinesisStreamsSource` の使用の詳細については、Apache Flink ドキュメントの「[Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/)」および [Github で が公開している KinesisConnectors の例](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors)を参照してください。

### EFO コンシューマーを使用する `KinesisStreamsSource` を作成する
<a name="input-streams-efo"></a>

`KinesisStreamsSource` で[拡張ファンアウト (EFO)](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/) がサポートされるようになりました。

Kinesis コンシューマーが EFO を使用する場合、Kinesis Data Streams サービスは、コンシューマーがストリームの固定帯域幅を、ストリームから読み取る他のコンシューマーと共有するのではなく、独自の専用帯域幅を提供します。

Kinesis コンシューマーで EFO を使用する方法の詳細については、[FLIP-128: Kinesis AWS コンシューマーの拡張ファンアウト](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers)」を参照してください。

EFO コンシューマーを有効にするには、Kinesis コンシューマーで次のパラメータを設定します。
+ **READER\$1TYPE:** アプリケーションが EFO コンシューマーを使用して Kinesis Data Streams データにアクセスできるようにするには、このパラメータを **EFO** に設定します。
+ **EFO\$1CONSUMER\$1NAME: **このパラメータを、このストリームのコンシューマー間で一意の文字列値に設定します。同じ Kinesis Data Stream でコンシューマー名を再利用すると、その名前を使用していた以前のコンシューマーは終了します。

EFO を使用するように `KinesisStreamsSource` を設定するには、コンシューマーに以下のパラメータを追加します。

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

EFO コンシューマーを使用する Managed Service for Apache Flink アプリケーションの例については、[Github で が公開している Kinesis コネクタの例](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors)を参照してください。

## Amazon MSK を使用する
<a name="input-msk"></a>

`KafkaSource` ソースは Amazon MSK トピックからアプリケーションにストリーミングデータを提供します。

### `KafkaSource` を作成する
<a name="input-msk-create"></a>

次のコード例は、`KafkaSource` の作成を示しています。

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

`KafkaSource` の使用方法の詳細については、「[MSK レプリケーション](earlier.md#example-msk)」を参照してください。