

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 建立和使用 Managed Service for Apache Flink 應用程式的範例
<a name="examples-collapsibles"></a>

本節提供在 Managed Service for Apache Flink 中建立及使用應用程式的範例。其中包含範例程式碼和逐步指示，可協助您建立 Managed Service for Apache Flink 應用程式並測試結果。

在探索這些範例之前，建議先檢閱以下內容：
+ [運作方式](how-it-works.md)
+ [教學課程：開始使用 Managed Service for Apache Flink 中的 DataStream API](getting-started.md)

**注意**  
這些範例假設您使用美國東部 （維吉尼亞北部） 區域 (us-east-1)。如果您使其他區域，請相應地更新應用程式的程式碼、命令和 IAM 角色。

**Topics**
+ [Managed Service for Apache Flink 的 Java 範例](examples-new-java.md)
+ [Managed Service for Apache Flink 的 Python 範例](examples-new-python.md)
+ [Managed Service for Apache Flink 的 Scala 範例](examples-new-scala.md)

# Managed Service for Apache Flink 的 Java 範例
<a name="examples-new-java"></a>

下列範例示範如何建立以 Java 撰寫的應用程式。



**注意**  
大多數範例都是為了在本機、您的開發機器和您選擇的 IDE 以及 Amazon Managed Service for Apache Flink 上執行而設計。它們示範了您可以用來傳遞應用程式參數的機制，以及如何正確設定相依性，以在兩個環境中執行應用程式，而不會進行任何變更。

## 改善序列化效能，定義自訂 TypeInfo
<a name="improving-serialization-performance-java"></a>

此範例說明如何在您的記錄或狀態物件上定義自訂 TypeInfo，以防止序列化回到效率較低的 Kryo 序列化。例如，當您的物件包含 `List`或 時，這是必要的`Map`。如需詳細資訊，請參閱 Apache Flink 文件中的[資料類型和序列化](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#data-types--serialization)。此範例也會示範如何測試物件的序列化是否回到效率較低的 Kryo 序列化。

程式碼範例：[CustomTypeInfo](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/Serialization/CustomTypeInfo)

## DataStream API 入門
<a name="getting-started-datastream-java"></a>

此範例顯示一個簡單的應用程式，使用 `DataStream` API 從 Kinesis 資料串流讀取和寫入另一個 Kinesis 資料串流。此範例示範如何使用正確的相依性設定 檔案、建置 uber-JAR，然後剖析組態參數，讓您可以在本機、IDE 和 Amazon Managed Service for Apache Flink 上執行應用程式。

程式碼範例： [GettingStarted](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted)

## 開始使用資料表 API 和 SQL
<a name="getting-started-table-java"></a>

此範例顯示使用 `Table` API 和 SQL 的簡單應用程式。它示範如何將 `DataStream` API 與相同 Java 應用程式中的 `Table` API 或 SQL 整合。它還示範如何使用`DataGen`連接器從 Flink 應用程式本身產生隨機測試資料，而不需要外部資料產生器。

完整範例： [GettingStartedTable](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStartedTable)

## 使用 S3Sink (DataStream API)
<a name="s3-sink-java"></a>

此範例示範如何使用 `DataStream` API 的 將 JSON 檔案`FileSink`寫入 S3 儲存貯體。

程式碼範例：[S3Sink](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/S3Sink)

## 使用 Kinesis 來源、標準或 EFO 取用者和接收器 (DataStream API)
<a name="kinesis-EFO-sink-java"></a>

此範例示範如何使用標準取用者或 EFO 從 Kinesis 資料串流設定來源取用，以及如何將接收器設定為 Kinesis 資料串流。

程式碼範例：[KinesisConnectors](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors)

## 使用 Amazon Data Firehose 接收器 (DataStream API)
<a name="firehose-sink-java"></a>

此範例示範如何將資料傳送至 Amazon Data Firehose （先前稱為 Kinesis Data Firehose)。

程式碼範例：[KinesisFirehoseSink](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisFirehoseSink)

## 使用 Prometheus 接收器連接器
<a name="prometheus-sink-java"></a>

此範例示範如何使用 [Prometheus 接收器連接器](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/prometheus/)將時間序列資料寫入 Prometheus。

程式碼範例：[PrometheusSink](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/PrometheusSink)

## 使用視窗彙總 (DataStream API)
<a name="windowing-aggregations-java"></a>

此範例示範 `DataStream` API 中的四種視窗化彙總類型。

1. 根據處理時間的滑動視窗

1. 根據事件時間的滑動視窗

1. 根據處理時間的輪轉時段

1. 根據事件時間的輪轉時段

程式碼範例：[視窗調整](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/Windowing) 

## 使用自訂指標
<a name="custom-metrics-java"></a>

此範例說明如何將自訂指標新增至 Flink 應用程式，並將其傳送至 CloudWatch 指標。

程式碼範例：[CustomMetrics](https://github.com/dzikosc/amazon-managed-service-for-apache-flink-examples/tree/main/java/CustomMetrics)

## 使用 Kafka 組態提供者在執行時間擷取 mTLS 的自訂金鑰存放區和信任存放區
<a name="kafka-keystore-mTLS"></a>

此範例說明如何使用 Kafka 組態提供者，為 Kafka 連接器設定具有 mTLS 身分驗證憑證的自訂金鑰存放區和信任存放區。此技術可讓您從 Amazon S3 載入必要的自訂憑證，並從應用程式啟動 AWS Secrets Manager 時載入秘密。

程式碼範例：[Kafka-mTLS-Keystore-ConfigProviders](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConfigProviders/Kafka-mTLS-Keystore-ConfigProviders)

## 使用 Kafka 組態提供者在執行時間擷取 SASL/SCRAM 身分驗證的秘密
<a name="kafka-secrets"></a>

此範例說明如何使用 Kafka 組態提供者從 Amazon S3 擷取憑證 AWS Secrets Manager ，並從 Amazon S3 下載信任存放區，以在 Kafka 連接器上設定 SASL/SCRAM 身分驗證。 Amazon S3 此技術可讓您從 Amazon S3 載入必要的自訂憑證，並從應用程式啟動 AWS Secrets Manager 時載入秘密。

程式碼範例：[Kafka-SASL\$1SSL-ConfigProviders](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConfigProviders/Kafka-SASL_SSL-ConfigProviders)

## 使用 Kafka 組態提供者，在執行時間透過資料表 API/SQL 擷取 mTLS 的自訂金鑰存放區和信任存放區
<a name="kafka-custom-keystore"></a>

此範例說明如何使用資料表 API /SQL 中的 Kafka 組態提供者，為 Kafka 連接器設定具有 mTLS 身分驗證憑證的自訂金鑰存放區和信任存放區。此技術可讓您從 Amazon S3 載入必要的自訂憑證，並從應用程式啟動 AWS Secrets Manager 時載入秘密。

程式碼範例：[Kafka-mTLS-Keystore-Sql-ConfigProviders](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConfigProviders/Kafka-mTLS-Keystore-Sql-ConfigProviders)

## 使用側邊輸出分割串流
<a name="side-output"></a>

此範例說明如何利用 Apache Flink 中的 [Side Outputs](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/) 在指定的屬性上分割串流。此模式在嘗試在串流應用程式中實作無效字母佇列 (DLQ) 的概念時特別有用。

程式碼範例：[SideOutputs](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/SideOutputs)

## 使用非同步 I/O 呼叫外部端點
<a name="async-i-o"></a>

此範例說明如何使用 [Apache Flink 非同步 I/O](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/operators/asyncio/) 以非封鎖方式呼叫外部端點，並重試可復原的錯誤。

程式碼範例：[AsyncIO](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/AsyncIO)

# Managed Service for Apache Flink 的 Python 範例
<a name="examples-new-python"></a>

下列範例示範如何建立以 Python 撰寫的應用程式。

**注意**  
大多數範例都是為了在本機、您的開發機器和您選擇的 IDE 以及 Amazon Managed Service for Apache Flink 上執行而設計。它們示範了您可以用來傳遞應用程式參數的簡單機制，以及如何正確設定相依性以在兩個環境中執行應用程式，而不會變更。

**專案相依性**

大多數 PyFlink 範例需要一或多個做為 JAR 檔案的相依性，例如 Flink 連接器。在 Amazon Managed Service for Apache Flink 上部署時，這些相依性必須與應用程式一起封裝。

下列範例已包含 工具，可讓您在本機執行應用程式以進行開發和測試，並正確封裝所需的相依性。此工具需要使用 Java JDK11 和 Apache Maven。如需特定指示，請參閱每個範例中包含的 README。

**範例**

## 開始使用 PyFlink
<a name="getting-started-pyflink"></a>

此範例示範使用內嵌在 Python 程式碼中的 SQL 的 PyFlink 應用程式的基本結構。此專案也為包含連接器等 JAR 相依性的任何 PyFlink 應用程式提供骨架。README 區段提供如何在本機執行 Python 應用程式以進行開發的詳細指引。此範例也說明如何在 PyFlink 應用程式中包含單一 JAR 相依性、此範例中的 Kinesis SQL 連接器。

程式碼範例： [GettingStarted](https://github.com/dzikosc/amazon-managed-service-for-apache-flink-examples/tree/main/python/GettingStarted)

## 新增 Python 相依性
<a name="add-python-dependencies"></a>

此範例示範如何以最一般的方式將 Python 相依性新增至 PyFlink 應用程式。此方法適用於簡單的相依性，例如 Boto3，或包含 PyArrow 等 C 程式庫的複雜相依性。

程式碼範例：[PythonDependencies](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/PythonDependencies)

## 使用視窗彙總 (DataStream API)
<a name="windowing-aggregations-python"></a>

此範例示範在 Python 應用程式中內嵌的 SQL 中的四種視窗化彙總類型。

1. 根據處理時間的滑動視窗

1. 根據事件時間的滑動視窗

1. 根據處理時間的輪轉時段

1. 根據事件時間的輪轉時段

程式碼範例：[視窗調整](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/Windowing)

## 使用 S3 接收器
<a name="s3-sink-python"></a>

此範例示範如何使用內嵌在 Python 應用程式中的 SQL，將輸出寫入 Amazon S3 做為 JSON 檔案。您必須啟用 S3 接收器的檢查點，才能將檔案寫入和輪換至 Amazon S3。

程式碼範例：[S3Sink](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/S3Sink)

## 使用使用者定義函數 (UDF)
<a name="UDF-python"></a>

此範例示範如何定義使用者定義的函數、在 Python 中實作，以及將其用於在 Python 應用程式中執行的 SQL 程式碼。

程式碼範例：[UDF](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/UDF)

## 使用 Amazon Data Firehose 接收器
<a name="Firehose-sink-python"></a>

此範例示範如何使用 SQL 將資料傳送至 Amazon Data Firehose。

程式碼範例：[FirehoseSink](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/FirehoseSink)

# Managed Service for Apache Flink 的 Scala 範例
<a name="examples-new-scala"></a>

下列範例示範如何搭配使用 Scala 與 Apache Flink 來建立應用程式。



## 設定多步驟應用程式
<a name="getting-started-scala"></a>

此範例示範如何在 Scala 中設定 Flink 應用程式。它示範如何設定 SBT 專案以包含相依性並建置 uber-JAR。

程式碼範例： [GettingStarted](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/scala/GettingStarted)