

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Fügen Sie Streaming-Datenquellen zu Managed Service für Apache Flink hinzu
<a name="how-sources"></a>

Apache Flink bietet Konnektoren zum Lesen aus Dateien, Sockets, Sammlungen und benutzerdefinierten Quellen. In Ihrem Anwendungscode verwenden Sie eine [Apache Flink-Quelle](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources), um Daten aus einem Stream zu empfangen. In diesem Abschnitt werden die Quellen beschrieben, die für Amazon-Services verfügbar sind.

## Verwenden Sie Kinesis-Datenstreams
<a name="input-streams"></a>

Das `KinesisStreamsSource` stellt Streaming-Daten aus einem Amazon Kinesis Kinesis-Datenstream für Ihre Anwendung bereit. 

### Erstellen eines `KinesisStreamsSource`
<a name="input-streams-create"></a>

Das folgende Code-Beispiel zeigt das Erstellen eines `KinesisStreamsSource`:

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

Weitere Informationen zur Verwendung von finden Sie unter [Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/) in der Apache Flink-Dokumentation und in [unserem öffentlichen KinesisConnectors Beispiel auf](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) Github. `KinesisStreamsSource`

### Erstellen Sie einen`KinesisStreamsSource`, der einen EFO-Consumer verwendet
<a name="input-streams-efo"></a>

Der unterstützt `KinesisStreamsSource` jetzt [Enhanced Fan-Out (EFO)](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/). 

Wenn ein Kinesis-Verbraucher EFO verwendet, stellt ihm der Kinesis Data Streams-Service seine eigene dedizierte Bandbreite zur Verfügung, anstatt dass der Verbraucher die feste Bandbreite des Streams mit den anderen Verbrauchern teilt, die aus dem Stream lesen.

Weitere Informationen zur Verwendung von EFO mit Kinesis Consumer finden Sie unter [FLIP-128: Verbesserter Lüfterausgang](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers) für Kinesis-Verbraucher. AWS 

Sie aktivieren den EFO-Consumer, indem Sie die folgenden Parameter für den Kinesis-Consumer festlegen:
+ **READER\$1TYPE:** Setzen Sie diesen Parameter auf **EFO**, damit Ihre Anwendung einen EFO-Consumer für den Zugriff auf die Kinesis Data Stream-Daten verwendet. 
+ **EFO\$1CONSUMER\$1NAME:** Setzen Sie diesen Parameter auf einen Zeichenfolgenwert, der unter den Verbrauchern dieses Streams eindeutig ist. Die Wiederverwendung eines Verbrauchernamens in demselben Kinesis Data Stream führt dazu, dass der vorherige Verbraucher, der diesen Namen verwendet hat, beendet wird. 

Um einen `KinesisStreamsSource` für die Verwendung von EFO zu konfigurieren, fügen Sie dem Verbraucher die folgenden Parameter hinzu:

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

Ein Beispiel für eine Managed Service for Apache Flink-Anwendung, die einen EFO-Consumer verwendet, finden Sie in [unserem öffentlichen Kinesis Connectors-Beispiel](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) auf Github.

## Verwenden Sie Amazon MSK
<a name="input-msk"></a>

Die `KafkaSource`-Quelle stellt Streaming-Daten aus einem Amazon MSK-Thema für Ihre Anwendung bereit. 

### Erstellen eines `KafkaSource`
<a name="input-msk-create"></a>

Das folgende Code-Beispiel zeigt das Erstellen eines `KafkaSource`:

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

Weitere Informationen zur Verwendung von `KafkaSource` finden Sie unter [MSK-Replikation](earlier.md#example-msk).