

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# Managed Service for Apache Flink 的 Java 示例
<a name="examples-new-java"></a>

以下示例演示如何创建以 Java 编写的应用程序。



**注意**  
大多数示例都设计为在本地、开发计算机和您选择的 IDE 上运行，以及在 Amazon Managed Service for Apache Flink 上运行。它们演示可用于传递应用程序参数的机制，以及如何正确设置依赖项，以便在不做任何更改的情况下在两个环境中运行应用程序。

## 提高序列化性能，定义自定义 TypeInfo
<a name="improving-serialization-performance-java"></a>

此示例说明了如何在记录或状态对象 TypeInfo 上定义自定义，以防止序列化回效率较低的 Kryo 序列化。例如，当您的对象包含 `List` 或 `Map` 时，这是必需的操作。有关更多信息，请参阅 Apache Flink 文档中的[数据类型和序列化](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#data-types--serialization)。该示例还展示如何测试对象的序列化是否回退到效率较低的 Kryo 序列化。

代码示例：[CustomTypeInfo](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/Serialization/CustomTypeInfo)

## 开始使用 DataStream API
<a name="getting-started-datastream-java"></a>

此示例显示一个简单的应用程序，该应用程序使用 `DataStream` API 从 Kinesis 数据流中读取数据并写入另一个 Kinesis 数据流。该示例演示如何使用正确的依赖项设置文件，构建 uber-JAR，然后解析配置参数，这样您就可以在本地、IDE 和 Amazon Managed Service for Apache Flink 上运行应用程序。

代码示例：[GettingStarted](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted)

## 开始使用 Table API 和 SQL
<a name="getting-started-table-java"></a>

此示例显示使用 `Table` API 和 SQL 的简单应用程序。其中演示如何在同一 Java 应用程序中将 `DataStream` API 与 `Table` API 或 SQL 集成。它还演示如何使用 `DataGen` 连接器在 Flink 应用程序内部生成随机测试数据，而无需使用外部数据生成器。

完整示例：[GettingStartedTable](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStartedTable)

## 使用 S3Sink (API) DataStream
<a name="s3-sink-java"></a>

此示例演示如何使用 `DataStream` API 的 `FileSink` 将 JSON 文件写入 S3 存储桶。

代码示例：[S3Sink](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/S3Sink)

## 使用 Kinesis 来源、标准或 EFO 使用者以及接收器 (API) DataStream
<a name="kinesis-EFO-sink-java"></a>

此示例演示如何使用标准使用者或 EFO 配置从 Kinesis 数据流使用的源，以及如何为 Kinesis 数据流设置接收器。

代码示例：[KinesisConnectors](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors)

## 使用亚马逊 Data Firehose 接收器 (API) DataStream
<a name="firehose-sink-java"></a>

此示例说明如何将数据发送到 Amazon Data Firehose（以前称为 Kinesis Data Firehose）。

代码示例：[KinesisFirehoseSink](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisFirehoseSink)

## 使用 Prometheus 接收器连接器
<a name="prometheus-sink-java"></a>

此示例演示如何使用 [Prometheus 接收器连接器](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/prometheus/)将时间序列数据写入 Prometheus。

代码示例：[PrometheusSink](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/PrometheusSink)

## 使用窗口聚合 (API) DataStream
<a name="windowing-aggregations-java"></a>

此示例演示 `DataStream` API 中四种类型的窗口聚合。

1. 基于处理时间的滑动窗口

1. 基于事件时间的滑动窗口

1. 基于处理时间的滚动窗口

1. 基于事件时间的滚动窗口

代码示例：[Windowing](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/Windowing) 

## 使用自定义指标
<a name="custom-metrics-java"></a>

此示例说明如何将自定义指标添加到您的 Flink 应用程序并将其发送到 CloudWatch 指标。

代码示例：[CustomMetrics](https://github.com/dzikosc/amazon-managed-service-for-apache-flink-examples/tree/main/java/CustomMetrics)

## 使用 Kafka 配置提供程序在运行时获取 mTLS 的自定义密钥库和信任存储库
<a name="kafka-keystore-mTLS"></a>

此示例说明如何使用 Kafka 配置提供程序设置自定义密钥库和信任存储库，其中包含用于 Kafka 连接器的 mTLS 身份验证的证书。此技术允许您从 Amazon S3 加载所需的自定义证书以及应用程序启动 AWS Secrets Manager 时的密钥。

代码示例：[kafka-mtls-keystore-ConfigProviders](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConfigProviders/Kafka-mTLS-Keystore-ConfigProviders)

## 使用 Kafka 配置提供程序在运行时获取用于 SASL/SCRAM 身份验证的密钥
<a name="kafka-secrets"></a>

此示例说明如何使用 Kafka 配置提供程序从 Amazon S3 获取证书 AWS Secrets Manager 并从 Amazon S3 下载信任库，以便在 Kafka 连接器上设置 SASL/SCRAM 身份验证。此技术允许您从 Amazon S3 加载所需的自定义证书以及应用程序启动 AWS Secrets Manager 时的密钥。

代码示例：[Kafka--SASL\$1SSL ConfigProviders](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConfigProviders/Kafka-SASL_SSL-ConfigProviders)

## 使用 Kafka 配置提供程序在运行时通过 Table API/SQL 获取 mTLS 的自定义密钥库和信任存储库
<a name="kafka-custom-keystore"></a>

此示例说明如何使用 Table API /SQL 中的 Kafka 配置提供程序来设置自定义密钥库和信任存储库，其中包含用于 Kafka 连接器的 mTLS 身份验证的证书。此技术允许您从 Amazon S3 加载所需的自定义证书以及应用程序启动 AWS Secrets Manager 时的密钥。

代码示例：[kafka-mtls-keystore-SQL-ConfigProviders](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConfigProviders/Kafka-mTLS-Keystore-Sql-ConfigProviders)

## 使用侧输出来拆分流
<a name="side-output"></a>

此示例说明如何利用 Apache Flink 中的[侧输出](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/)在指定属性上拆分流。当尝试在流应用程序中实现死信队列（DLQ）的概念时，这种模式特别有用。

代码示例：[SideOutputs](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/SideOutputs)

## 使用 Async 调 I/O 用外部端点
<a name="async-i-o"></a>

此示例说明如何使用 [Apache Flink 异步 I/O](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/operators/asyncio/) 以非阻塞方式调用外部端点，并对可恢复的错误进行重试。

代码示例：[AsyncIO](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/AsyncIO)