Managed Service for Apache Flink でシンクを使用してデータを書き込む - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Managed Service for Apache Flink でシンクを使用してデータを書き込む

アプリケーションコードでは、任意の Apache Flink シンクコネクタを使用して、Kinesis Data Streams や DynamoDB などの AWS サービスを含む外部システムに書き込むことができます。

Apache Flink はファイルとソケット用のシンクも提供しており、カスタムシンクを実装できます。サポートされている複数のシンクの中で、以下が頻繁に使用されます。

Kinesis データストリームを使用する

Apache Flink では、「Kinesis Data Streams Connector」に関する情報が Apache Flink ドキュメントに記載されています。

Kinesis データストリームを入力と出力に使用するアプリケーションの例については、 チュートリアル: Managed Service for Apache Flink で の使用 DataStream APIを開始する を参照してください。

Apache Kafka と Amazon Managed Streaming for Apache Kafka を使用する (MSK)

Apache Flink Kafka コネクタは、1 回限りの保証を含めMSK、Apache Kafka と Amazon へのデータの公開を広範囲にサポートします。Kafka に書き込む方法については、Apache Flink ドキュメントの Kafka Connectors の例を参照してください。

Amazon S3 を使用する

Amazon S3 バケットにオブジェクトを書き込むには、Apache Flink StreamingFileSink を使用できます。

S3 にオブジェクトを書き込む方法の例については、 例: Amazon S3 バケットへの書き込み を参照してください。

Firehose を使用する

FlinkKinesisFirehoseProducer は、Firehose サービスを使用してアプリケーション出力を保存するための、信頼性が高くスケーラブルな Apache Flink シンクです。このセクションでは、Maven プロジェクトをセットアップして FlinkKinesisFirehoseProducer を作成・使用するための設定方法について説明します。

FlinkKinesisFirehoseProducer の作成

次のコード例は、FlinkKinesisFirehoseProducer の作成を示しています。

Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);

FlinkKinesisFirehoseProducer コード例

次のコード例は、 を作成して設定FlinkKinesisFirehoseProducerし、Apache Flink データストリームから Firehose サービスにデータを送信する方法を示しています。

package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants; import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() { /* * com.amazonaws.services.kinesisanalytics.flink.connectors.config. * ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with. */ Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties); ProducerConfigConstants config = new ProducerConfigConstants(); return sink; } private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException { /* * com.amazonaws.services.kinesisanalytics.flink.connectors.config. * ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with. */ Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * if you would like to use runtime configuration properties, uncomment the * lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); // Kinesis Firehose sink input.addSink(createFirehoseSinkFromStaticConfig()); // If you would like to use runtime configuration properties, uncomment the // lines below // input.addSink(createFirehoseSinkFromApplicationProperties()); env.execute("Flink Streaming Java API Skeleton"); } }

Firehose シンクの使用方法に関する完全なチュートリアルについては、「」を参照してください例: Firehose への書き込み