Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
将流数据源添加到适用于 Apache Flink 的托管服务
Apache Flink 提供连接器以从文件、套接字、集合和自定义源中读取。在应用程序代码中,您可以使用 Apache Flink 源
使用 Kinesis 数据流
FlinkKinesisConsumer
源从 Amazon Kinesis 数据流中向应用程序提供流数据。
创建 FlinkKinesisConsumer
以下代码示例说明了如何创建 FlinkKinesisConsumer
:
Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); DataStream<string> input = env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
有关使用 FlinkKinesisConsumer
的更多信息,请参阅下载并查看 Apache Flink 流式传输 Java 代码。
创建FlinkKinesisConsumer
使用EFO消费者的
FlinkKinesisConsumer 现在支持增强型扇出 () EFO
如果 Kinesis 用户使用EFO,则 Kinesis Data Streams 服务会为其提供自己的专用带宽,而不是让使用者与其他从流中读取数据的使用者共享流的固定带宽。
有关与 Kinesis 使用者EFO一起使用的更多信息,请参阅 FLIP-128:Kinesi AWS s 消费者的增强型扇出
您可以通过在 EFO Kinesis 使用器上设置以下参数来启用使用器:
RECORD_ PUBLISHER _TYPE:将此参数设置为,EFO以便您的应用程序使用使用EFO者访问 Kinesis 数据流数据。
EFO_ CONSUMER _NAME:将此参数设置为在该直播的使用者中唯一的字符串值。在同一 Kinesis 数据流中重复使用使用者名称,会导致之前使用该名称的使用者被终止。
要配置FlinkKinesisConsumer
要使用的EFO,请向使用者添加以下参数:
consumerConfig.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO"); consumerConfig.putIfAbsent(EFO_CONSUMER_NAME, "basic-efo-flink-app");
有关使用使用者的适用于 Apache Flink 的托管服务应用程序的示例,EFO请参见。EFO消费者
使用亚马逊 MSK
该KafkaSource
来源向您的应用程序提供来自 Amazon MSK 主题的流数据。
创建 KafkaSource
以下代码示例说明了如何创建 KafkaSource
:
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
有关使用 KafkaSource
的更多信息,请参阅MSK复制。