將串流資料來源新增至 Apache Flink 的受管理服務 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

將串流資料來源新增至 Apache Flink 的受管理服務

Apache Flink 提供了連接器,用於從檔案、通訊端、集合和自訂來源讀取資料。在應用程式的程式碼中,您可以使用 Apache Flink 來源接收來自串流的資料。本節說明可用於 Amazon 服務的來源

使用 Kinesis 資料串流

FlinkKinesisConsumer 來源將來自 Amazon Kinesis 資料串流的串流資料提供給應用程式。

建立 FlinkKinesisConsumer

以下程式碼範例示範如何建立 FlinkKinesisConsumer

Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); DataStream<string> input = env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));

如需如何使用 FlinkKinesisConsumer 的詳細資訊,請參閱 下載並檢查阿帕奇 Flink 流 Java 代碼

創建使FlinkKinesisConsumer用EFO消費者

FlinkKinesisConsumer 現在支援增強型扇出 (EFO)

如果 Kinesis 取用者使用EFO,Kinesis Data Streams 服務會提供專屬頻寬,而不是讓消費者與其他從串流讀取的消費者共用串流的固定頻寬。

如需與 Kinesis 取用者EFO搭配使用的詳細資訊,請參閱 FLIP-128:適用於 AWS Kinesis 消費者的增強型扇出

您可以在 Kinesis 取用EFO者上設定下列參數來啟用取用者:

  • RECORD_ PUBLISHER _TYPE:將此參數設定EFO為,讓您的應用程式使用取用EFO者存取 Kinesis 資料串流資料。

  • EFO_ CONSUMER _NAME:將此參數設置為此流的消費者中唯一的字符串值。在相同的 Kinesis 資料串流中重複使用取用者名稱,將導致先前使用該名稱的使用者遭到終止。

若要設定FlinkKinesisConsumer要使用的EFO,請將下列參數新增至取用者:

consumerConfig.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO"); consumerConfig.putIfAbsent(EFO_CONSUMER_NAME, "basic-efo-flink-app");

如需使用取用EFO者之 Apache Flink 應用程式的受管理服務範例,請參閱增強型扇出 (EFO) 取用者

使用 Amazon MSK

KafkaSource源會從 Amazon MSK 主題向您的應用程式提供串流資料。

建立 KafkaSource

以下程式碼範例示範如何建立 KafkaSource

KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

如需如何使用 KafkaSource 的詳細資訊,請參閱 MSK 複寫