本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
將串流資料來源新增至 Managed Service for Apache Flink
Apache Flink 提供了連接器,用於從檔案、通訊端、集合和自訂來源讀取資料。在應用程式的程式碼中,您可以使用 Apache Flink 來源
使用 Kinesis 資料串流
KinesisStreamsSource 會從 Amazon Kinesis 資料串流提供串流資料到您的應用程式。
建立 KinesisStreamsSource
以下程式碼範例示範如何建立 KinesisStreamsSource:
// Configure the KinesisStreamsSource Configuration sourceConfig = new Configuration(); sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST // Create a new KinesisStreamsSource to read from specified Kinesis Stream. KinesisStreamsSource<String> kdsSource = KinesisStreamsSource.<String>builder() .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream") .setSourceConfig(sourceConfig) .setDeserializationSchema(new SimpleStringSchema()) .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used. .build();
如需使用 的詳細資訊KinesisStreamsSource,請參閱 Apache Flink 文件中的 Amazon Kinesis Data Streams Connector
建立KinesisStreamsSource使用 EFO 取用者的
現在KinesisStreamsSource支援增強型廣發 (EFO)
如果 Kinesis 取用者使用 EFO,Kinesis Data Streams 服務會提供專屬頻寬,而不是讓取用者與其他從串流讀取的取用者共用串流的固定頻寬。
如需搭配 Kinesis 取用者使用 EFO 的詳細資訊,請參閱 FLIP-128:增強的 AWS Kinesis 取用者扇出
您可以在 Kinesis 取用者上設定下列參數來啟用 EFO 取用者:
READER_TYPE:將此參數設定為 EFO,讓您的應用程式使用 EFO 取用者存取 Kinesis Data Stream 資料。
EFO_CONSUMER_NAME:將此參數設定為字串值,確保在此串流的取用者中保持唯一。在相同的 Kinesis 資料串流中重複使用取用者名稱,將導致先前使用該名稱的使用者遭到終止。
若要設定 KinesisStreamsSource 使用 EFO,請將下列參數新增至取用者:
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO); sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
如需使用 EFO 消費者的 Managed Service for Apache Flink 應用程式範例,請參閱 Github 上的公有 Kinesis Connectors 範例
使用 Amazon MSK
此 KafkaSource 來源將來自 Amazon MSK 主題的串流資料提供給應用程式。
建立 KafkaSource
以下程式碼範例示範如何建立 KafkaSource:
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
如需如何使用 KafkaSource 的詳細資訊,請參閱 MSK 複寫。