

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 將串流資料來源新增至 Managed Service for Apache Flink
<a name="how-sources"></a>

Apache Flink 提供了連接器，用於從檔案、通訊端、集合和自訂來源讀取資料。在應用程式的程式碼中，您可以使用 [Apache Flink 來源](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources)接收來自串流的資料。本節說明可用於 Amazon 服務的來源。

## 使用 Kinesis 資料串流
<a name="input-streams"></a>

`KinesisStreamsSource` 會從 Amazon Kinesis 資料串流提供串流資料到您的應用程式。

### 建立 `KinesisStreamsSource`
<a name="input-streams-create"></a>

以下程式碼範例示範如何建立 `KinesisStreamsSource`：

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

如需使用 的詳細資訊`KinesisStreamsSource`，請參閱 Apache Flink 文件中的 [Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/)，以及 [Github 上的公有 KinesisConnectors 範例](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors)。

### 建立`KinesisStreamsSource`使用 EFO 取用者的
<a name="input-streams-efo"></a>

現在`KinesisStreamsSource`支援[增強型廣發 (EFO)](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/)。

如果 Kinesis 取用者使用 EFO，Kinesis Data Streams 服務會提供專屬頻寬，而不是讓取用者與其他從串流讀取的取用者共用串流的固定頻寬。

如需搭配 Kinesis 取用者使用 EFO 的詳細資訊，請參閱 [ FLIP-128：增強的 AWS Kinesis 取用者扇出](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers)。

您可以在 Kinesis 取用者上設定下列參數來啟用 EFO 取用者：
+ **READER\$1TYPE：**將此參數設定為 **EFO**，讓您的應用程式使用 EFO 取用者存取 Kinesis Data Stream 資料。
+ **EFO\$1CONSUMER\$1NAME**：將此參數設定為字串值，確保在此串流的取用者中保持唯一。在相同的 Kinesis 資料串流中重複使用取用者名稱，將導致先前使用該名稱的使用者遭到終止。

若要設定 `KinesisStreamsSource` 使用 EFO，請將下列參數新增至取用者：

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

如需使用 EFO 消費者的 Managed Service for Apache Flink 應用程式範例，請參閱 [Github 上的公有 Kinesis Connectors 範例](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors)。

## 使用 Amazon MSK
<a name="input-msk"></a>

此 `KafkaSource` 來源將來自 Amazon MSK 主題的串流資料提供給應用程式。

### 建立 `KafkaSource`
<a name="input-msk-create"></a>

以下程式碼範例示範如何建立 `KafkaSource`：

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

如需如何使用 `KafkaSource` 的詳細資訊，請參閱 [MSK 複寫](earlier.md#example-msk)。