在 Apache Flink 托管服务中使用接收器写入数据 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Apache Flink 托管服务中使用接收器写入数据

在您的应用程序代码中,您可以使用任何 Apache Flink 接收器连接器写入外部系统,包括 AWS 服务,例如 Kinesis Data Streams 和 DynamoDB。

Apache Flink 还为文件和套接字提供了接收器,你可以实现自定义接收器。在支持的几个接收器中,以下是经常使用的:

使用 Kinesis 数据流

Apache Flink 在 Apache Flink 文档中提供了有关 Kinesis Data Streams 连接器的信息。

有关使用 Kinesis 数据流进行输入和输出的应用程序示例,请参见。教程:开始使用适用于 Apache Flink 的托管服务 DataStream API中的

使用 Apache Kafka 和亚马逊托管流媒体适用于 Apache Kafka () MSK

Apache Flink Kafka 连接器为向 Apache Kafka 和亚马逊发布数据提供了广泛支持MSK,包括一次性担保。要了解如何写入 Kafka,请参阅 Apache Flink 文档中的 Kafka 连接器示例

使用亚马逊 S3

您可以使用 Apache Flink StreamingFileSink 以将对象写入到 Amazon S3 存储桶中。

有关如何将对象写入到 S3 的示例,请参阅示例:写入 Amazon S3 存储桶

使用 Firehose

FlinkKinesisFirehoseProducer是一款可靠、可扩展的 Apache Flink 接收器,用于使用 Firehose 服务存储应用程序输出。本节介绍了如何设置 Maven 项目以创建和使用 FlinkKinesisFirehoseProducer

创建 FlinkKinesisFirehoseProducer

以下代码示例说明了如何创建 FlinkKinesisFirehoseProducer

Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);

FlinkKinesisFirehoseProducer 代码示例

以下代码示例演示了如何创建和配置以及如何将数据从 Apache Flink 数据流发送到 Firehose 服务。FlinkKinesisFirehoseProducer

package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants; import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() { /* * com.amazonaws.services.kinesisanalytics.flink.connectors.config. * ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with. */ Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties); ProducerConfigConstants config = new ProducerConfigConstants(); return sink; } private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException { /* * com.amazonaws.services.kinesisanalytics.flink.connectors.config. * ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with. */ Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * if you would like to use runtime configuration properties, uncomment the * lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); // Kinesis Firehose sink input.addSink(createFirehoseSinkFromStaticConfig()); // If you would like to use runtime configuration properties, uncomment the // lines below // input.addSink(createFirehoseSinkFromApplicationProperties()); env.execute("Flink Streaming Java API Skeleton"); } }

有关如何使用 Firehose 接收器的完整教程,请参阅。示例:写入 Firehose