Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
将流数据源添加到适用于 Apache Flink 的托管服务
Apache Flink 提供连接器以从文件、套接字、集合和自定义源中读取。在应用程序代码中,您可以使用 Apache Flink 源
使用 Kinesis 数据流
将通过 Amazon Kinesis 数据流向您的应用程序KinesisStreamsSource
提供流式传输数据。
创建 KinesisStreamsSource
以下代码示例说明了如何创建 KinesisStreamsSource
:
// Configure the KinesisStreamsSource Configuration sourceConfig = new Configuration(); sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST // Create a new KinesisStreamsSource to read from specified Kinesis Stream. KinesisStreamsSource<String> kdsSource = KinesisStreamsSource.<String>builder() .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream") .setSourceConfig(sourceConfig) .setDeserializationSchema(new SimpleStringSchema()) .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used. .build();
有关使用的更多信息KinesisStreamsSource
,请参阅 Apache Flink 文档中的 Amazon Kinesis Data Stream
创建使用 KinesisStreamsSource
EFO 消费端的
KinesisStreamsSource
现在支持增强型扇出 (EF
如果 Kinesis 使用者使用 EFO,则 Kinesis Data Streams 服务会为其提供自己的专用带宽,而不是让其与从流中读取数据的其他使用者共享流的固定带宽。
有关在 Kinesis 消费端上使用 EFO 的更多信息,请参阅 FLIP-128:Kinesis 消费者的增强型扇出 AWS
您可以通过在 Kinesis 使用者上设置以下参数来启用 EFO 使用者:
READER_TYPE:将此参数设置为 EFO,让您的应用程序使用 EFO 使用者访问 Kinesis 数据流数据。
EFO_CONSUMER_NAME:将此参数设置为该流使用者中的唯一字符串值。在同一 Kinesis 数据流中重复使用使用者名称,会导致之前使用该名称的使用者被终止。
要将 a 配置KinesisStreamsSource
为使用 EFO,请向消费端添加以下参数:
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO); sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
有关使用 EFO 使用者的适用于 Apache Flink 的托管服务应用程序的示例,请参阅我们在 Github 上公开的 Kinesis Connectors
使用亚马逊 MSK
KafkaSource
源从 Amazon MSK 主题向您的应用程序提供流数据。
创建 KafkaSource
以下代码示例说明了如何创建 KafkaSource
:
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
有关使用 KafkaSource
的更多信息,请参阅MSK 复制。