

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 檢閱 DataStream API 元件
<a name="how-datastream"></a>

Apache Flink 應用程式使用 [Apache Flink DataStream API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/) 來轉換資料串流中的資料。

本節說明移動、轉換和追蹤資料的不同元件：
+ [使用連接器透過 DataStream API 在 Managed Service for Apache Flink 中移動資料](how-connectors.md)：這些元件可以在應用程式與外部資料來源和目的地之間移動資料。
+ [使用 Managed Service for Apache Flink 中的運算子搭配 DataStream API 轉換資料](how-operators.md)：這些元件可以轉換或分組應用程式中的資料元素。
+ [使用 DataStream API 追蹤 Managed Service for Apache Flink 中的事件](how-time.md)：本主題說明 Managed Service for Apache Flink 如何在使用 DataStream API 時追蹤事件。

# 使用連接器透過 DataStream API 在 Managed Service for Apache Flink 中移動資料
<a name="how-connectors"></a>

在 Amazon Managed Service for Apache Flink DataStream API 中，*連接器*是可將資料移入和移出 Managed Service for Apache Flink 應用程式的軟體元件。連接器是靈活的整合，可讓您從檔案和目錄讀取。連接器包含用於與 Amazon 服務和第三方系統互動的完整模組。

連接器包含下列類型：
+ [新增串流資料來源](how-sources.md)：從 Kinesis 資料串流、檔案或其他資料來源向應用程式提供資料。
+ [使用接收器寫入資料](how-sinks.md)：從您的應用程式將資料傳送至 Kinesis 資料串流、Firehose 串流或其他資料目的地。
+ [使用非同步 I/O](how-async.md)：提供對資料來源 (例如資料庫) 的非同步存取，以富集串流事件。

## 可用的連接器
<a name="how-connectors-list"></a>

Apache Flink 架構包含用於存取各種來源之資料的連接器。如需 Apache Flink 架構中可用連接器的相關資訊，請參閱 [Apache Flink 文件](https://nightlies.apache.org/flink/flink-docs-release-1.15/)中的[連接器](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/connectors/)。

**警告**  
如果您的應用程式在 Flink 1.6、1.8、1.11 或 1.13 上執行，並且想要在中東 （阿拉伯聯合大公國）、亞太區域 （海德拉巴）、以色列 （特拉維夫）、歐洲 （蘇黎世）、中東 （阿拉伯聯合大公國）、亞太區域 （墨爾本） 或亞太區域 （雅加達） 區域執行，您可能需要使用更新的連接器重建應用程式封存，或升級至 Flink 1.18。  
Apache Flink 連接器存放在自己的開放原始碼儲存庫中。如果您要升級至 1.18 版或更新版本，則必須更新您的相依性。若要存取 Apache Flink AWS 連接器的儲存庫，請參閱 [flink-connector-aws](https://github.com/apache/flink-connector-aws)。  
之前的 Kinesis 來源`org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer`已停止，未來可能會隨著 Flink 版本而移除。請改用 [Kinesis 來源](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-source)。  
`FlinkKinesisConsumer` 和 之間沒有狀態相容性`KinesisStreamsSource`。如需詳細資訊，請參閱 Apache Flink 文件中的[將現有任務遷移至新的 Kinesis 串流來源](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#migrating-existing-jobs-to-new-kinesis-streams-source-from-kinesis-consumer)。  
 以下是建議的準則：  


**連接器升級**  

| Flink 版本 | 使用的連接器 | Resolution | 
| --- | --- | --- | 
| 1.19、1.20 | Kinesis 來源 |  升級至 Managed Service for Apache Flink 1.19 和 1.20 版時，請確定您使用的是最新的 Kinesis Data Streams 來源連接器。這必須是任何 5.0.0 版或更新版本。如需詳細資訊，請參閱 [Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/)。  | 
| 1.19、1.20 | Kinesis 接收器 |  升級至 Managed Service for Apache Flink 1.19 和 1.20 版時，請確定您使用的是最新的 Kinesis Data Streams 接收器連接器。這必須是任何 5.0.0 版或更新版本。如需詳細資訊，請參閱 [Kinesis Streams Sink](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-sink)。  | 
| 1.19、1.20 | DynamoDB 串流來源 |  升級至 Managed Service for Apache Flink 1.19 和 1.20 版時，請確定您使用的是最新的 DynamoDB Streams 來源連接器。這必須是任何 5.0.0 版或更新版本。如需詳細資訊，請參閱 [Amazon DynamoDB 連接器](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/)。  | 
| 1.19、1.20 | DynamoDB 接收器 | 升級至 Managed Service for Apache Flink 1.19 和 1.20 版時，請確定您使用的是最新的 DynamoDB 接收器連接器。這必須是任何 5.0.0 版或更新版本。如需詳細資訊，請參閱 [Amazon DynamoDB 連接器](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/)。 | 
| 1.19、1.20 | Amazon SQS 接收器 |  升級至 Managed Service for Apache Flink 1.19 和 1.20 版時，請確定您使用的是最新的 Amazon SQS 接收器連接器。這必須是任何 5.0.0 版或更新版本。如需詳細資訊，請參閱 [Amazon SQS Sink](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/sqs/)。  | 
| 1.19、1.20 | Amazon Managed Service for Prometheus Sink |  升級至 Managed Service for Apache Flink 1.19 和 1.20 版時，請確定您使用的是最新的 Amazon Managed Service for Prometheus 接收器連接器。這必須是任何 1.0.0 版或更新版本。如需詳細資訊，請參閱 [Prometheus Sink](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/prometheus/)。  | 

# 將串流資料來源新增至 Managed Service for Apache Flink
<a name="how-sources"></a>

Apache Flink 提供了連接器，用於從檔案、通訊端、集合和自訂來源讀取資料。在應用程式的程式碼中，您可以使用 [Apache Flink 來源](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources)接收來自串流的資料。本節說明可用於 Amazon 服務的來源。

## 使用 Kinesis 資料串流
<a name="input-streams"></a>

`KinesisStreamsSource` 會從 Amazon Kinesis 資料串流提供串流資料到您的應用程式。

### 建立 `KinesisStreamsSource`
<a name="input-streams-create"></a>

以下程式碼範例示範如何建立 `KinesisStreamsSource`：

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

如需使用 的詳細資訊`KinesisStreamsSource`，請參閱 Apache Flink 文件中的 [Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/)，以及 [Github 上的公有 KinesisConnectors 範例](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors)。

### 建立`KinesisStreamsSource`使用 EFO 取用者的
<a name="input-streams-efo"></a>

現在`KinesisStreamsSource`支援[增強型廣發 (EFO)](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/)。

如果 Kinesis 取用者使用 EFO，Kinesis Data Streams 服務會提供專屬頻寬，而不是讓取用者與其他從串流讀取的取用者共用串流的固定頻寬。

如需搭配 Kinesis 取用者使用 EFO 的詳細資訊，請參閱 [ FLIP-128：增強的 AWS Kinesis 取用者扇出](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers)。

您可以在 Kinesis 取用者上設定下列參數來啟用 EFO 取用者：
+ **READER\$1TYPE：**將此參數設定為 **EFO**，讓您的應用程式使用 EFO 取用者存取 Kinesis Data Stream 資料。
+ **EFO\$1CONSUMER\$1NAME**：將此參數設定為字串值，確保在此串流的取用者中保持唯一。在相同的 Kinesis 資料串流中重複使用取用者名稱，將導致先前使用該名稱的使用者遭到終止。

若要設定 `KinesisStreamsSource` 使用 EFO，請將下列參數新增至取用者：

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

如需使用 EFO 消費者的 Managed Service for Apache Flink 應用程式範例，請參閱 [Github 上的公有 Kinesis Connectors 範例](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors)。

## 使用 Amazon MSK
<a name="input-msk"></a>

此 `KafkaSource` 來源將來自 Amazon MSK 主題的串流資料提供給應用程式。

### 建立 `KafkaSource`
<a name="input-msk-create"></a>

以下程式碼範例示範如何建立 `KafkaSource`：

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

如需如何使用 `KafkaSource` 的詳細資訊，請參閱 [MSK 複寫](earlier.md#example-msk)。

# 在 Managed Service for Apache Flink 中使用接收器寫入資料
<a name="how-sinks"></a>

在應用程式程式碼中，您可以使用任何 [Apache Flink 接收器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/overview/)連接器來寫入外部系統，包括 Kinesis Data Streams 和 DynamoDB 等 AWS 服務。

Apache Flink 也為檔案和通訊端提供接收器，而且您可以實作自訂接收器。在數個支援的接收器中，經常使用下列項目：

## 使用 Kinesis 資料串流
<a name="sinks-streams"></a>

Apache Flink 在《Apache Flink 文件》中提供了 [Kinesis Data Streams 連接器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/)的相關資訊。

如需使用 Kinesis 資料串流進行輸入和輸出的應用程式範例，請參閱 [教學課程：開始使用 Managed Service for Apache Flink 中的 DataStream API](getting-started.md)。

## 使用 Apache Kafka 和 Amazon Managed Streaming for Apache Kafka (MSK)
<a name="sinks-MSK"></a>

[Apache Flink Kafka 連接器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#kafka-sink)提供廣泛的支援，可將資料發佈至 Apache Kafka 和 Amazon MSK，包括恰好一次的保證。若要了解如何寫入 Kafka，請參閱 Apache Flink 文件中的 [Kafka Connectors 範例](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConnectors)。

## 使用 Amazon S3
<a name="sinks-s3"></a>

您可以使用 Apache Flink `StreamingFileSink` 將物件寫入 Amazon S3 儲存貯體。

如需如何將物件寫入 S3 的範例，請參閱[範例：寫入 Amazon S3 儲存貯體](earlier.md#examples-s3)。

## 使用 Firehose
<a name="sinks-firehose"></a>

`FlinkKinesisFirehoseProducer` 是可靠、可擴展的 Apache Flink 接收器，用於使用 [Firehose](https://docs.aws.amazon.com/firehose/latest/dev/) 服務存放應用程式輸出。本節說明如何建立 Maven 專案以建立和使用 `FlinkKinesisFirehoseProducer`。

**Topics**
+ [建立 `FlinkKinesisFirehoseProducer`](#sinks-firehose-create)
+ [`FlinkKinesisFirehoseProducer` 程式碼範例](#sinks-firehose-sample)

### 建立 `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-create"></a>

以下程式碼範例示範如何建立 `FlinkKinesisFirehoseProducer`：

```
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
```

### `FlinkKinesisFirehoseProducer` 程式碼範例
<a name="sinks-firehose-sample"></a>

下列程式碼範例示範如何建立和設定 ，`FlinkKinesisFirehoseProducer`以及將資料從 Apache Flink 資料串流傳送至 Firehose 服務。

```
 
package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class StreamingJob {

	private static final String region = "us-east-1";
	private static final String inputStreamName = "ExampleInputStream";
	private static final String outputStreamName = "ExampleOutputStream";

	private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
	}

	private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
			throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
				applicationProperties.get("ConsumerConfigProperties")));
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Properties outputProperties = new Properties();
		outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(), outputProperties);
		ProducerConfigConstants config = new ProducerConfigConstants();
		return sink;
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(),
				applicationProperties.get("ProducerConfigProperties"));
		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		/*
		 * if you would like to use runtime configuration properties, uncomment the
		 * lines below
		 * DataStream<String> input = createSourceFromApplicationProperties(env);
		 */

		DataStream<String> input = createSourceFromStaticConfig(env);

		// Kinesis Firehose sink
		input.addSink(createFirehoseSinkFromStaticConfig());

		// If you would like to use runtime configuration properties, uncomment the
		// lines below
		// input.addSink(createFirehoseSinkFromApplicationProperties());

		env.execute("Flink Streaming Java API Skeleton");
	}
}
```

如需如何使用 Firehose 接收器的完整教學課程，請參閱 [範例：寫入 Firehose](earlier.md#get-started-exercise-fh)。

# 在 Managed Service for Apache Flink 中使用非同步 I/O
<a name="how-async"></a>

非同步 I/O 運算子使用外部資料來源 (例如資料庫) 來富集串流資料。Managed Service for Apache Flink 以非同步方式富集串流事件，以便批次處理請求來提高效率。

如需詳細資訊，請參閱 Apache Flink 文件中的[非同步 I/O](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/asyncio/)。

# 使用 Managed Service for Apache Flink 中的運算子搭配 DataStream API 轉換資料
<a name="how-operators"></a>

若要在 Managed Service for Apache Flink 中轉換傳入的資料，請使用 Apache Flink *運算子*。Apache Flink 運算子可將一或多個資料串流轉換為新的資料串流。新的資料串流包含來自原始資料串流的修改資料。Apache Flink 提供了超過 25 個預先建置的串流處理運算子。如需詳細資訊，請參閱 Apache Flink 文件中的[運算子](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/)。

**Topics**
+ [使用轉換運算子](#how-operators-transform)
+ [使用彙總運算子](#how-operators-agg)

## 使用轉換運算子
<a name="how-operators-transform"></a>

以下是在 JSON 資料串流的其中一個欄位上進行簡單文字轉換的範例。

此程式碼會建立轉換後的資料串流。新資料串流具有與原始串流相同的資料，並在 `TICKER` 欄位內容後面附加 ` Company` 字串。

```
DataStream<ObjectNode> output = input.map(
    new MapFunction<ObjectNode, ObjectNode>() {
        @Override
        public ObjectNode map(ObjectNode value) throws Exception {
            return value.put("TICKER", value.get("TICKER").asText() + " Company");
        }
    }
);
```

## 使用彙總運算子
<a name="how-operators-agg"></a>

以下是彙總運算子的範例。程式碼會建立彙總的資料串流。運算子會建立 5 秒的翻轉視窗，並傳回視窗中具有相同 `TICKER` 值之記錄的 `PRICE` 值總和。

```
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((node1, node2) -> {
        double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();
        node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));
    return node1;
});
```

如需更多程式碼範例，請參閱 [建立和使用 Managed Service for Apache Flink 應用程式的範例](examples-collapsibles.md)。

# 使用 DataStream API 追蹤 Managed Service for Apache Flink 中的事件
<a name="how-time"></a>

Managed Service for Apache Flink 使用下列時間戳記來追蹤事件：
+ **處理時間：**指正在執行相應操作的機器的系統時間。
+ **事件時間：**指每個個別事件在其生產設備上發生的時間。
+ **擷取時間：**指事件進入 Managed Service for Apache Flink 服務的時間。

您可以使用 設定串流環境使用的時間`setStreamTimeCharacteristic`。

```
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
```

如需時間戳記的詳細資訊，請參閱 Apache Flink 文件中的[產生浮水印](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/event-time/generating_watermarks/)。