使用 Managed Service for Apache Flink 中的儲存槽寫入資料 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Managed Service for Apache Flink 中的儲存槽寫入資料

在應用程式程式碼中,您可以使用任何 Apache Flink 接收器連接器來寫入外部系統,包括 Kinesis Data Streams 和 DynamoDB 等 AWS 服務。

Apache Flink 也為檔案和通訊端提供接收器,而且您可以實作自訂接收器。在幾個支援的接收器中,經常使用下列項目:

使用 Kinesis 資料串流

Apache Flink 在《Apache Flink 文件》中提供了 Kinesis Data Streams 連接器的相關資訊。

如需使用 Kinesis 資料串流進行輸入和輸出的應用程式範例,請參閱 教學課程:開始使用 Managed Service for Apache Flink 中的 DataStream API

使用 Apache Kafka 和 Amazon Managed Streaming for Apache Kafka (MSK)

Apache Flink Kafka 連接器為將資料發佈至 Apache Kafka 和 Amazon 提供廣泛的支援MSK,包括一次性保證。若要了解如何寫入 Kafka,請參閱 Apache Flink 文件中的 Kafka Connectors 範例

使用 Amazon S3

您可以使用 Apache Flink StreamingFileSink 將物件寫入 Amazon S3 儲存貯體。

如需如何將物件寫入 S3 的範例,請參閱範例:寫入 Amazon S3 儲存貯體

使用 Firehose

FlinkKinesisFirehoseProducer 是可靠、可擴展的 Apache Flink 接收器,用於使用 Firehose 服務儲存應用程式輸出。本節說明如何建立 Maven 專案以建立和使用 FlinkKinesisFirehoseProducer

建立 FlinkKinesisFirehoseProducer

以下程式碼範例示範如何建立 FlinkKinesisFirehoseProducer

Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);

FlinkKinesisFirehoseProducer 程式碼範例

下列程式碼範例示範如何建立和設定 FlinkKinesisFirehoseProducer,以及將資料從 Apache Flink 資料串流傳送至 Firehose 服務。

package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants; import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() { /* * com.amazonaws.services.kinesisanalytics.flink.connectors.config. * ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with. */ Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties); ProducerConfigConstants config = new ProducerConfigConstants(); return sink; } private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException { /* * com.amazonaws.services.kinesisanalytics.flink.connectors.config. * ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with. */ Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * if you would like to use runtime configuration properties, uncomment the * lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); // Kinesis Firehose sink input.addSink(createFirehoseSinkFromStaticConfig()); // If you would like to use runtime configuration properties, uncomment the // lines below // input.addSink(createFirehoseSinkFromApplicationProperties()); env.execute("Flink Streaming Java API Skeleton"); } }

如需如何使用 Firehose 接收器的完整教學課程,請參閱 範例:寫入 Firehose