

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Tambahkan sumber data streaming ke Layanan Terkelola untuk Apache Flink
<a name="how-sources"></a>

Apache Flink menyediakan konektor untuk membaca dari file, soket, koleksi, dan sumber kustom. Dalam kode aplikasi Anda, Anda menggunakan [sumber Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources) untuk menerima data dari aliran. Bagian ini menjelaskan sumber yang tersedia untuk layanan Amazon.

## Gunakan aliran data Kinesis
<a name="input-streams"></a>

`KinesisStreamsSource`Menyediakan data streaming ke aplikasi Anda dari aliran data Amazon Kinesis. 

### Buat `KinesisStreamsSource`
<a name="input-streams-create"></a>

Contoh kode berikut mendemonstrasikan pembuatan `KinesisStreamsSource`:

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

Untuk informasi selengkapnya tentang penggunaan`KinesisStreamsSource`, lihat Konektor [Amazon Kinesis Data](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/) Streams dalam [dokumentasi Apache Flink dan KinesisConnectors ](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) contoh publik kami di Github.

### Buat `KinesisStreamsSource` yang menggunakan konsumen EFO
<a name="input-streams-efo"></a>

`KinesisStreamsSource`Sekarang mendukung [Enhanced Fan-Out (EFO](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/)). 

Jika konsumen Kinesis menggunakan EFO, layanan Kinesis Data Streams memberikan bandwidth khusus miliknya sendiri, bukan meminta konsumen berbagi bandwidth aliran tetap dengan konsumen lain yang membaca dari aliran.

Untuk informasi selengkapnya tentang penggunaan EFO dengan konsumen Kinesis, [lihat FLIP-128: Enhanced Fan Out](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers) untuk Konsumen Kinesis. AWS 

Anda mengaktifkan konsumen EFO dengan mengatur parameter berikut pada konsumen Kinesis:
+ **READER\$1TYPE:** Setel parameter ini ke **EFO** agar aplikasi Anda menggunakan konsumen EFO untuk mengakses data Kinesis Data Stream. 
+ **EFO\$1CONSUMER\$1NAME:** Atur parameter ini ke nilai string yang unik di antara konsumen aliran ini. Menggunakan kembali nama konsumen di Kinesis Data Stream yang sama akan menyebabkan konsumen sebelumnya yang menggunakan nama tersebut dihentikan. 

Untuk mengonfigurasi `KinesisStreamsSource` untuk menggunakan EFO, tambahkan parameter berikut ke konsumen:

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

Untuk contoh aplikasi Managed Service for Apache Flink yang menggunakan konsumen EFO, lihat contoh Konektor [Kinesis publik](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) kami di Github.

## Gunakan Amazon MSK
<a name="input-msk"></a>

Sumber `KafkaSource` menyediakan data streaming ke aplikasi Anda dari topik Amazon MSK. 

### Buat `KafkaSource`
<a name="input-msk-create"></a>

Contoh kode berikut mendemonstrasikan pembuatan `KafkaSource`:

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

Untuk informasi selengkapnya tentang cara menggunakan `KafkaSource`, lihat [Replikasi MSK](earlier.md#example-msk).