

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Managed Service for Apache Flink でシンクを使用してデータを書き込む
<a name="how-sinks"></a>

アプリケーションコードでは、任意の [Apache Flink シンク](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/overview/) コネクタを使用して、Kinesis Data Streams や DynamoDB などの AWS サービスを含む外部システムに書き込むことができます。

Apache Flink には、ファイル用やソケット用のシンクも用意されており、カスタムシンクを実行することもできます。さまざまなシンクがサポートされていますが、その中でも、次のものがよく使用されます。

## Kinesis Data Streams を使用する
<a name="sinks-streams"></a>

Apache Flink では、「[Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/)」に関する情報が Apache Flink ドキュメントに記載されています。

Kinesis データストリームを入力と出力に使用するアプリケーションの例については、 [チュートリアル: Managed Service for Apache Flink で DataStream API 使用の概要](getting-started.md) を参照してください。

## Apache Kafka と Amazon Managed Streaming for Apache Kafka (MSK) を使用する
<a name="sinks-MSK"></a>

[Apache Flink Kafka コネクタ](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#kafka-sink)は、Apache Kafka と Amazon MSK にデータを公開するための広範なサポートを提供します。これには、1 回限りの保証が含まれます。Kafka に書き込む方法については、Apache Flink ドキュメントの「[Kafka Connectors examples](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConnectors)」を参照してください。

## Amazon S3 を使用する
<a name="sinks-s3"></a>

Amazon S3 バケットにオブジェクトを書き込むには、Apache Flink `StreamingFileSink` を使用できます。

S3 にオブジェクトを書き込む方法の例については、 [「例: Amazon S3 バケットに書き込む」](earlier.md#examples-s3) を参照してください。

## Firehose を使用する
<a name="sinks-firehose"></a>

`FlinkKinesisFirehoseProducer` は、[Firehose](https://docs.aws.amazon.com/firehose/latest/dev/) サービスを使用してアプリケーション出力を保存するための、信頼性が高くスケーラブルな Apache Flink シンクです。このセクションでは、Maven プロジェクトをセットアップして `FlinkKinesisFirehoseProducer` を作成・使用するための設定方法について説明します。

**Topics**
+ [`FlinkKinesisFirehoseProducer` を作成する](#sinks-firehose-create)
+ [`FlinkKinesisFirehoseProducer` コード例](#sinks-firehose-sample)

### `FlinkKinesisFirehoseProducer` を作成する
<a name="sinks-firehose-create"></a>

次のコード例は、`FlinkKinesisFirehoseProducer` の作成を示しています。

```
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
```

### `FlinkKinesisFirehoseProducer` コード例
<a name="sinks-firehose-sample"></a>

次のコード例は、`FlinkKinesisFirehoseProducer` を作成して設定し、Apache Flink データストリームから Firehose サービスにデータを送信する方法を示しています。

```
 
package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class StreamingJob {

	private static final String region = "us-east-1";
	private static final String inputStreamName = "ExampleInputStream";
	private static final String outputStreamName = "ExampleOutputStream";

	private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
	}

	private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
			throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
				applicationProperties.get("ConsumerConfigProperties")));
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Properties outputProperties = new Properties();
		outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(), outputProperties);
		ProducerConfigConstants config = new ProducerConfigConstants();
		return sink;
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(),
				applicationProperties.get("ProducerConfigProperties"));
		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		/*
		 * if you would like to use runtime configuration properties, uncomment the
		 * lines below
		 * DataStream<String> input = createSourceFromApplicationProperties(env);
		 */

		DataStream<String> input = createSourceFromStaticConfig(env);

		// Kinesis Firehose sink
		input.addSink(createFirehoseSinkFromStaticConfig());

		// If you would like to use runtime configuration properties, uncomment the
		// lines below
		// input.addSink(createFirehoseSinkFromApplicationProperties());

		env.execute("Flink Streaming Java API Skeleton");
	}
}
```

Firehose シンクの使用方法に関する詳細なチュートリアルについては、「[「例: Firehose に書き込む」](earlier.md#get-started-exercise-fh)」を参照してください。