

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 查看 DataStream API 组件
<a name="how-datastream"></a>

你的 Apache Flink 应用程序使用 [Apache Flink DataStream API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/) 来转换数据流中的数据。

本节介绍用于移动、转换和跟踪数据的不同组件：
+ [使用 API 在适用于 Apache Flink 的托管服务中使用连接器移动数据 DataStream](how-connectors.md)：这些组件在您的应用程序与外部数据源和目标之间移动数据。
+ [使用 API 在 Apache Flink 托管服务中使用运算符转换数据 DataStream](how-operators.md)：这些组件对应用程序中的数据元素进行转换或分组。
+ [使用 API 在适用于 Apache Flink 的托管服务中跟踪事件 DataStream](how-time.md)：本主题介绍适用于 Apache Flink 的托管服务在使用 API 时如何跟踪事件。 DataStream 

# 使用 API 在适用于 Apache Flink 的托管服务中使用连接器移动数据 DataStream
<a name="how-connectors"></a>

在适用于 Apache Flink 的亚马逊托管服务 DataStream API 中，*连接器*是将数据移入和移出适用于 Apache Flink 的托管服务应用程序的软件组件。连接器是灵活集成的组件，可让您读取文件和目录。连接器包含用于与 Amazon 服务和第三方系统交互的完整模块。

连接器类型包括：
+ [添加流数据源](how-sources.md)：从 Kinesis 数据流、文件或其他数据源中向应用程序提供数据。
+ [使用接收器写入数据](how-sinks.md)：将数据从应用程序发送到 Kinesis 数据流、Firehose 流或其他数据目标。
+ [使用异步 I/O](how-async.md)：提供对数据源（例如数据库）的异步访问以丰富流事件。

## 可用的连接器
<a name="how-connectors-list"></a>

Apache Flink 框架包含用于从各种源中访问数据的连接器。[有关 Apache Flink 框架中可用的连接器的信息，请参阅 Apache Flink 文档中的[连接器](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/connectors/)。](https://nightlies.apache.org/flink/flink-docs-release-1.15/)

**警告**  
如果您的应用程序在 Flink 1.6、1.8、1.11 或 1.13 上运行，并且想要在中东（阿联酋）、亚太地区（海得拉巴）、以色列（特拉维夫）、欧洲（苏黎世）、亚太地区（墨尔本）或亚太地区（雅加达）区域运行，则可能需要使用更新的连接器重建应用程序存档或升级到 Flink 1.18。  
Apache Flink 连接器存储在它们自己的开源存储库中。如果您要升级到 1.18 或更高版本，则必须更新依赖项。要访问 Apache Flink AWS 连接器的存储库，请参阅。[flink-connector-aws](https://github.com/apache/flink-connector-aws)  
以前的 Kinesis 源代码 `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer` 已停用，并可能在 Flink 的未来版本中移除。改为使用 [Kinesis 源](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-source)。  
`FlinkKinesisConsumer` 和 `KinesisStreamsSource` 之间不存在状态兼容性。有关详细信息，请参阅 Apache Flink 文档中的[将现有作业迁移到新的 Kinesis Streams 源](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#migrating-existing-jobs-to-new-kinesis-streams-source-from-kinesis-consumer)。  
 以下是推荐的指导方针：  


**连接器升级**  

| Flink 版本 | 使用的连接器 | 解决方案 | 
| --- | --- | --- | 
| 1.19、1.20 | Kinesis 源 |  升级到 Managed Service for Apache Flink 1.19 和 1.20 版时，请确保使用的是最新的 Kinesis Data Streams 源连接器。版本必须为 5.0.0 或更高版本。有关更多信息，请参阅 [Amazon Kinesis Data Streams 连接器](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/)。  | 
| 1.19、1.20 | Kinesis 接收器 |  升级到 Managed Service for Apache Flink 1.19 和 1.20 版时，请确保使用的是最新的 Kinesis Data Streams 接收器连接器。版本必须为 5.0.0 或更高版本。有关更多信息，请参阅 [Kinesis Streams 接收器](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-sink)。  | 
| 1.19、1.20 | DynamoDB Streams 源 |  升级到 Managed Service for Apache Flink 1.19 和 1.20 版时，请确保使用的是最新的 DynamoDB Streams 源连接器。版本必须为 5.0.0 或更高版本。有关更多信息，请参阅 [Amazon DynamoDB 连接器](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/)。  | 
| 1.19、1.20 | DynamoDB 接收器 | 升级到 Managed Service for Apache Flink 1.19 和 1.20 版时，请确保使用的是最新的 DynamoDB 接收器连接器。版本必须为 5.0.0 或更高版本。有关更多信息，请参阅 [Amazon DynamoDB 连接器](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/)。 | 
| 1.19、1.20 | Amazon SQS 接收器 |  升级到 Managed Service for Apache Flink 1.19 和 1.20 版时，请确保使用的是最新的 Amazon SQS 接收器连接器。版本必须为 5.0.0 或更高版本。有关更多信息，请参阅 [Amazon SQS 接收器](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/sqs/)。  | 
| 1.19、1.20 | Amazon Managed Service for Prometheus 接收器 |  升级到 Managed Service for Apache Flink 1.19 和 1.20 版时，请确保使用的是最新的 Amazon Managed Service for Prometheus 接收器连接器。版本必须为 1.0.0 或更高版本。有关更多信息，请参阅 [Prometheus 接收器](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/prometheus/)。  | 

# 将流数据源添加到 Managed Service for Apache Flink
<a name="how-sources"></a>

Apache Flink 提供连接器以从文件、套接字、集合和自定义源中读取。在应用程序代码中，您可以使用 [Apache Flink 源](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources)以从流中接收数据。本节介绍了可用于 Amazon 服务的源。

## 使用 Kinesis 数据流
<a name="input-streams"></a>

`KinesisStreamsSource` 从 Amazon Kinesis 数据流向应用程序提供流数据。

### 创建 `KinesisStreamsSource`
<a name="input-streams-create"></a>

以下代码示例说明了如何创建 `KinesisStreamsSource`：

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

有关使用的更多信息`KinesisStreamsSource`，请参阅 [Apache Flink 文档中的 Amazon Kinesis Data Stream](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/) s Connector [和我们在 Github 上的 KinesisConnectors 公开示例](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors)。

### 创建使用 EFO 使用者的 `KinesisStreamsSource`
<a name="input-streams-efo"></a>

`KinesisStreamsSource` 现在支持[增强型扇出功能（EFO）](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/)。

如果 Kinesis 使用者使用 EFO，则 Kinesis Data Streams 服务会为其提供自己的专用带宽，而不是让其与从流中读取数据的其他使用者共享流的固定带宽。

有关在 Kinesis 消费端上使用 EFO 的更多信息，[请参阅 FLIP-128：Kinesis 消费者的增强型扇出 AWS](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers)。

您可以通过在 Kinesis 使用者上设置以下参数来启用 EFO 使用者：
+ **READER\$1TYPE：**将此参数设置为 **EFO**，让您的应用程序使用 EFO 使用者访问 Kinesis Data Stream 数据。
+ **EFO\$1CONSUMER\$1NAME：**将此参数设置为该流使用者中的唯一字符串值。在同一 Kinesis 数据流中重复使用使用者名称，会导致之前使用该名称的使用者被终止。

要将 a 配置`KinesisStreamsSource`为使用 EFO，请向使用者添加以下参数：

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

有关使用 EFO 使用者的 Managed Service for Apache Flink 应用程序的示例，请参阅 [Github 上 KinesisConnectors 公开示例](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors)。

## 使用 Amazon MSK
<a name="input-msk"></a>

`KafkaSource`源从 Amazon MSK 主题向您的应用程序提供流数据。

### 创建 `KafkaSource`
<a name="input-msk-create"></a>

以下代码示例说明了如何创建 `KafkaSource`：

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

有关使用 `KafkaSource` 的更多信息，请参阅[MSK 复制](earlier.md#example-msk)。

# 在 Managed Service for Apache Flink 中使用接收器写入数据
<a name="how-sinks"></a>

在应用程序代码中，您可以使用任何 [Apache Flink 接收器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/overview/)连接器写入外部系统，包括 AWS 服务，例如 Kinesis Data Streams 和 DynamoDB。

Apache Flink 还提供文件和套接字接收器，并且您可以实施自定义接收器。在支持的几种接收器中，以下是经常使用的接收器：

## 使用 Kinesis 数据流
<a name="sinks-streams"></a>

Apache Flink 在 Apache Flink 文档中提供了有关 [Kinesis Data Streams 连接器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/)的信息。

有关使用 Kinesis 数据流进行输入和输出的应用程序示例，请参见。[教程：开始使用 Apache Flink 托管服务中的 DataStream API](getting-started.md)

## 使用 Apache Kafka 和 Amazon Managed Streaming for Apache Kafka（MSK）
<a name="sinks-MSK"></a>

[Apache Flink Kafka 连接器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#kafka-sink)为向 Apache Kafka 和 Amazon MSK 发布数据提供广泛支持，包括一次性担保。要了解如何写入 Kafka，请参阅 Apache Flink 文档中的 [Kafka 连接器示例](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConnectors)。

## 使用 Amazon S3
<a name="sinks-s3"></a>

您可以使用 Apache Flink `StreamingFileSink` 以将对象写入到 Amazon S3 存储桶中。

有关如何将对象写入到 S3 的示例，请参阅[示例：写入 Amazon S3 存储桶](earlier.md#examples-s3)。

## 使用 Firehose
<a name="sinks-firehose"></a>

`FlinkKinesisFirehoseProducer` 是一个可靠且可扩展的 Apache Flink 接收器，可以使用 [Firehose](https://docs.aws.amazon.com/firehose/latest/dev/) 服务存储应用程序输出。本节介绍了如何设置 Maven 项目以创建和使用 `FlinkKinesisFirehoseProducer`。

**Topics**
+ [创建 `FlinkKinesisFirehoseProducer`](#sinks-firehose-create)
+ [`FlinkKinesisFirehoseProducer` 代码示例](#sinks-firehose-sample)

### 创建 `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-create"></a>

以下代码示例说明了如何创建 `FlinkKinesisFirehoseProducer`：

```
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
```

### `FlinkKinesisFirehoseProducer` 代码示例
<a name="sinks-firehose-sample"></a>

以下代码示例说明了如何创建和配置 `FlinkKinesisFirehoseProducer`，并将数据从 Apache Flink 数据流发送到 Firehose 服务。

```
 
package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class StreamingJob {

	private static final String region = "us-east-1";
	private static final String inputStreamName = "ExampleInputStream";
	private static final String outputStreamName = "ExampleOutputStream";

	private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
	}

	private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
			throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
				applicationProperties.get("ConsumerConfigProperties")));
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Properties outputProperties = new Properties();
		outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(), outputProperties);
		ProducerConfigConstants config = new ProducerConfigConstants();
		return sink;
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(),
				applicationProperties.get("ProducerConfigProperties"));
		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		/*
		 * if you would like to use runtime configuration properties, uncomment the
		 * lines below
		 * DataStream<String> input = createSourceFromApplicationProperties(env);
		 */

		DataStream<String> input = createSourceFromStaticConfig(env);

		// Kinesis Firehose sink
		input.addSink(createFirehoseSinkFromStaticConfig());

		// If you would like to use runtime configuration properties, uncomment the
		// lines below
		// input.addSink(createFirehoseSinkFromApplicationProperties());

		env.execute("Flink Streaming Java API Skeleton");
	}
}
```

有关如何使用 Firehose 接收器的完整教程，请参阅 [示例：写入 Firehose](earlier.md#get-started-exercise-fh)。

# I/O 在 Apache Flink 的托管服务中使用异步
<a name="how-async"></a>

异步 I/O 运算符使用外部数据源（例如数据库）来丰富流数据。Managed Service for Apache Flink 异步丰富了流事件，因此可以对请求进行批处理以提高效率。

有关更多信息，请参阅 Apache Flink 文档中的[异步 IO](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/asyncio/)。

# 使用 API 在 Apache Flink 托管服务中使用运算符转换数据 DataStream
<a name="how-operators"></a>

要在中转换传入数据，您可以使用 Apache Flink *运算符*。Apache Flink 运算符将一个或多个数据流转换为新的数据流。新数据流包含来自原始数据流的修改的数据。Apache Flink 提供超过 25 个预构建的流处理运算符。有关更多信息，请参阅 pache Flink 文档中的[运算符](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/)。

**Topics**
+ [使用转换运算符](#how-operators-transform)
+ [使用聚合操作符](#how-operators-agg)

## 使用转换运算符
<a name="how-operators-transform"></a>

以下是对 JSON 数据流的某个字段进行简单文本转换的示例。

该代码创建转换的数据流。新数据流具有与原始流相同的数据，并在 `TICKER` 字段内容后面附加“` Company`”字符串。

```
DataStream<ObjectNode> output = input.map(
    new MapFunction<ObjectNode, ObjectNode>() {
        @Override
        public ObjectNode map(ObjectNode value) throws Exception {
            return value.put("TICKER", value.get("TICKER").asText() + " Company");
        }
    }
);
```

## 使用聚合操作符
<a name="how-operators-agg"></a>

以下是一个聚合运算符示例。该代码创建聚合的数据流。该运算符创建一个 5 秒的滚动窗口，并返回窗口中具有相同 `TICKER` 值的记录的 `PRICE` 值之和。

```
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((node1, node2) -> {
        double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();
        node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));
    return node1;
});
```

有关更多代码示例，请参阅 [创建和使用 Managed Service for Apache Flink 应用程序的示例](examples-collapsibles.md)。

# 使用 API 在适用于 Apache Flink 的托管服务中跟踪事件 DataStream
<a name="how-time"></a>

Managed Service for Apache Flink使用以下时间戳跟踪事件：
+ **处理时间：**指的是执行相应操作的计算机的系统时间。
+ **事件时间：**指的是在生成设备上发生每个事件的时间。
+ **提取时间：**指的是事件进入 Managed Service for Apache Flink 的时间。

您可以使用 `setStreamTimeCharacteristic` 设置流环境使用的时间 

```
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
```

有关时间戳的更多信息，请参阅 Apache Flink 文档中的[生成水印](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/event-time/generating_watermarks/)。