Managed Service for Apache Flink にストリーミングデータソースを追加する - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Managed Service for Apache Flink にストリーミングデータソースを追加する

Apache Flink には、ファイル、ソケット、コレクション、カスタムソースから読み取るためのコネクタが用意されています。アプリケーションコードでは、「Apache Flink ソース」を使用してストリームからデータを受信します。このセクションでは、Amazon サービスで使用できるソースについて説明します。

Kinesis データストリームを使用する

FlinkKinesisConsumer ソースは Amazon Kinesis データストリームからアプリケーションにストリーミングデータを提供します。

FlinkKinesisConsumer の作成

次のコード例は、FlinkKinesisConsumer の作成を示しています。

Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); DataStream<string> input = env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));

FlinkKinesisConsumer の使用方法の詳細については、「Apache Flink ストリーミング Java コードをダウンロードして調べる」を参照してください。

EFO コンシューマーFlinkKinesisConsumerを使用する を作成する

拡張ファンアウト (EFO) をサポートする FlinkKinesisConsumer ようになりました。

Kinesis コンシューマーが を使用する場合EFO、Kinesis Data Streams サービスは、コンシューマーにストリームの固定帯域幅をストリームから読み取る他のコンシューマーと共有させるのではなく、独自の専用帯域幅を提供します。

を Kinesis コンシューマーEFOで使用する方法の詳細については、FLIP「-128: Enhanced Fan Out for AWS Kinesis Consumers」を参照してください。

EFO コンシューマーを有効にするには、Kinesis コンシューマーで次のパラメータを設定します。

  • RECORD_PUBLISHER_TYPE: アプリケーションEFOがEFOコンシューマーを使用して Kinesis Data Stream データにアクセスするように、このパラメータを に設定します。

  • EFO_CONSUMER_NAME: このパラメータを、このストリームのコンシューマー間で一意の文字列値に設定します。同じ Kinesis Data Stream でコンシューマー名を再利用すると、その名前を使用していた以前のコンシューマーは終了します。

を使用するFlinkKinesisConsumerように を設定するにはEFO、コンシューマーに次のパラメータを追加します。

consumerConfig.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO"); consumerConfig.putIfAbsent(EFO_CONSUMER_NAME, "basic-efo-flink-app");

EFO コンシューマーを使用する Managed Service for Apache Flink アプリケーションの例については、「」を参照してくださいEFO コンシューマー

Amazon を使用する MSK

KafkaSource ソースは、Amazon MSKトピックからアプリケーションにストリーミングデータを提供します。

KafkaSource の作成

次のコード例は、KafkaSource の作成を示しています。

KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

KafkaSource の使用方法の詳細については、「MSK レプリケーション」を参照してください。