Amazon Managed Service für Apache Flink war zuvor als Amazon Kinesis Data Analytics für Apache Flink bekannt.
Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Fügen Sie Streaming-Datenquellen zu Managed Service für Apache Flink hinzu
Apache Flink bietet Konnektoren zum Lesen aus Dateien, Sockets, Sammlungen und benutzerdefinierten Quellen. In Ihrem Anwendungscode verwenden Sie eine Apache Flink-Quelle
Verwenden Sie Kinesis-Datenstreams
Die KinesisStreamsSource
-Quelle stellt Streaming-Daten aus einem Amazon Kinesis Data Stream für Ihre Anwendung bereit.
Erstellen eines KinesisStreamsSource
Das folgende Code-Beispiel zeigt das Erstellen eines KinesisStreamsSource
:
// Configure the KinesisStreamsSource Configuration sourceConfig = new Configuration(); sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST // Create a new KinesisStreamsSource to read from specified Kinesis Stream. KinesisStreamsSource<String> kdsSource = KinesisStreamsSource.<String>builder() .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream") .setSourceConfig(sourceConfig) .setDeserializationSchema(new SimpleStringSchema()) .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used. .build();
Weitere Informationen zur Verwendung von finden Sie unter Amazon Kinesis Data Streams ConnectorKinesisStreamsSource
Erstellen Sie einenKinesisStreamsSource
, der einen Verbraucher verwendet EFO
Der unterstützt KinesisStreamsSource
jetzt Enhanced Fan-Out (EFO)
Wenn ein Kinesis-Verbraucher verwendetEFO, stellt ihm der Kinesis Data Streams-Dienst seine eigene dedizierte Bandbreite zur Verfügung, anstatt dass der Verbraucher die feste Bandbreite des Streams mit den anderen Verbrauchern teilt, die aus dem Stream lesen.
Weitere Informationen zur Verwendung EFO mit Kinesis Consumer finden Sie unter FLIP-128: Verbesserter Lüfterausgang für AWS Kinesis-Verbraucher
Sie aktivieren den EFO Consumer, indem Sie die folgenden Parameter für den Kinesis-Consumer festlegen:
READER_TYPE: Setzen Sie diesen Parameter auf, damit Ihre Anwendung einen EFO Consumer EFOfür den Zugriff auf die Kinesis Data Stream-Daten verwendet.
EFO_ CONSUMER _NAME: Legen Sie für diesen Parameter einen Zeichenkettenwert fest, der für die Nutzer dieses Streams eindeutig ist. Die Wiederverwendung eines Verbrauchernamens in demselben Kinesis Data Stream führt dazu, dass der vorherige Verbraucher, der diesen Namen verwendet hat, beendet wird.
Um a KinesisStreamsSource
zur Verwendung zu konfigurierenEFO, fügen Sie dem Verbraucher die folgenden Parameter hinzu:
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO); sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
Ein Beispiel für eine Managed Service for Apache Flink-Anwendung, die einen EFO Verbraucher verwendet, finden Sie in unserem öffentlichen Kinesis Connectors-Beispiel auf
Verwenden Sie Amazon MSK
Die KafkaSource
Quelle stellt Streaming-Daten aus einem MSK Amazon-Thema für Ihre Anwendung bereit.
Erstellen eines KafkaSource
Das folgende Code-Beispiel zeigt das Erstellen eines KafkaSource
:
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
Weitere Informationen zur Verwendung von KafkaSource
finden Sie unter MSKReplikation.