Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。
本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
將串流資料來源新增至 Apache Flink 的受管理服務
Apache Flink 提供了連接器,用於從檔案、通訊端、集合和自訂來源讀取資料。在應用程式的程式碼中,您可以使用 Apache Flink 來源
使用 Kinesis 資料串流
此 FlinkKinesisConsumer
來源將來自 Amazon Kinesis 資料串流的串流資料提供給應用程式。
建立 FlinkKinesisConsumer
以下程式碼範例示範如何建立 FlinkKinesisConsumer
:
Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); DataStream<string> input = env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
如需如何使用 FlinkKinesisConsumer
的詳細資訊,請參閱 下載並檢查阿帕奇 Flink 流 Java 代碼。
創建使FlinkKinesisConsumer
用EFO消費者
FlinkKinesisConsumer 現在支援增強型扇出 (EFO)
如果 Kinesis 取用者使用EFO,Kinesis Data Streams 服務會提供專屬頻寬,而不是讓消費者與其他從串流讀取的消費者共用串流的固定頻寬。
如需與 Kinesis 取用者EFO搭配使用的詳細資訊,請參閱 FLIP-128:適用於 AWS Kinesis 消費者的增強型扇出
您可以在 Kinesis 取用EFO者上設定下列參數來啟用取用者:
RECORD_ PUBLISHER _TYPE:將此參數設定EFO為,讓您的應用程式使用取用EFO者存取 Kinesis 資料串流資料。
EFO_ CONSUMER _NAME:將此參數設置為此流的消費者中唯一的字符串值。在相同的 Kinesis 資料串流中重複使用取用者名稱,將導致先前使用該名稱的使用者遭到終止。
若要設定FlinkKinesisConsumer
要使用的EFO,請將下列參數新增至取用者:
consumerConfig.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO"); consumerConfig.putIfAbsent(EFO_CONSUMER_NAME, "basic-efo-flink-app");
如需使用取用EFO者之 Apache Flink 應用程式的受管理服務範例,請參閱增強型扇出 (EFO) 取用者。
使用 Amazon MSK
來KafkaSource
源會從 Amazon MSK 主題向您的應用程式提供串流資料。
建立 KafkaSource
以下程式碼範例示範如何建立 KafkaSource
:
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
如需如何使用 KafkaSource
的詳細資訊,請參閱 MSK 複寫。