

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 Apache Beam 建立應用程式
<a name="examples-beam"></a>

在本練習中，您將使用 [Apache Beam](https://beam.apache.org/) 建立可轉換資料的 Managed Service for Apache Flink 應用程式。Apache Beam 是用於處理串流資料的程式設計模型。如需將 Apache Beam 與 Managed Service for Apache Flink 搭配使用的相關資訊，請參閱[將 Apache Beam 與 Managed Service for Apache Flink 應用程式搭配使用](how-creating-apps-beam.md)。

**注意**  
若要設定此練習的必要先決條件，請先完成 [教學課程：開始使用 Managed Service for Apache Flink 中的 DataStream API](getting-started.md) 練習。

**Topics**
+ [建立相依資源](#examples-beam-resources)
+ [將範例記錄寫入輸入串流](#examples-beam-write)
+ [下載並檢查應用程式程式碼](#examples-beam-download)
+ [編譯應用程式程式碼](#examples-beam-compile)
+ [上傳 Apache Flink 串流 Java 程式碼](#examples-beam-upload)
+ [建立並執行 Managed Service for Apache Flink 應用程式](#examples-beam-create-run)
+ [清除 AWS 資源](#examples-beam-cleanup)
+ [後續步驟](#examples-beam-nextsteps)

## 建立相依資源
<a name="examples-beam-resources"></a>

在為本練習建立 Managed Service for Apache Flink 應用程式之前，先建立下列相依資源：
+ 兩個 Kinesis 資料串流 (`ExampleInputStream` 和 `ExampleOutputStream`)
+ Amazon S3 儲存貯體，用來儲存應用程式的程式碼 (`ka-app-code-<username>`) 

您可以在主控台中建立 Kinesis 串流和 Amazon S3 儲存貯體。如需建立這些資源的相關指示，請參閱以下主題：
+ 《Amazon Kinesis Data Streams 開發人員指南》中的[建立和更新資料串流](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)**。為資料串流 **ExampleInputStream** 和 **ExampleOutputStream** 命名。
+ 《Amazon Simple Storage Service 使用者指南》中的[如何建立 S3 儲存貯體](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket.html)。**透過附加登入名稱 (例如 **ka-app-code-*<username>***)，為 Amazon S3 儲存貯體提供全域唯一的名稱。

## 將範例記錄寫入輸入串流
<a name="examples-beam-write"></a>

在本節中，您會透過 Python 指令碼將隨機字串寫入串流供應用程式處理。

**注意**  
本節需要 [適用於 Python (Boto) 的 AWS SDK](https://aws.amazon.com/developers/getting-started/python/)。

1. 使用下列內容建立名為 `ping.py` 的檔案：

   ```
   import json
   import boto3
   import random
   
   kinesis = boto3.client('kinesis')
   
   while True:
           data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat'])
           print(data)
           kinesis.put_record(
                   StreamName="ExampleInputStream",
                   Data=data,
                   PartitionKey="partitionkey")
   ```

1. 執行 `ping.py` 指令碼：

   ```
   $ python ping.py
   ```

   在完成教學課程的其餘部分時，讓指令碼保持執行狀態。

## 下載並檢查應用程式程式碼
<a name="examples-beam-download"></a>

此範例的 Java 應用程式的程式碼可從 GitHub 下載。若要下載應用程式的程式碼，請執行下列動作：

1. 如果您尚未安裝 Git 用戶端，請先安裝。如需詳細資訊，請參閱[安裝 Git](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)。

1. 使用以下指令複製遠端儲存庫：

   ```
   git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
   ```

1. 導覽至 `amazon-kinesis-data-analytics-java-examples/Beam` 目錄。

應用程式的程式碼位於 `BasicBeamStreamingJob.java` 檔案中。請留意下列與應用程式的程式碼相關的資訊：
+ 該應用程式使用 Apache Beam [ParDo](https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/transforms/ParDo.html)，透過調用稱為 `PingPongFn` 的自定義轉換函數來處理傳入的記錄。

  調用 `PingPongFn` 函數的代碼如下：

  ```
  .apply("Pong transform",
      ParDo.of(new PingPongFn())
  ```
+ 使用 Apache Beam 的 Managed Service for Apache Flink 應用程式需要下列元件。如果您未在 `pom.xml` 中包含這些元件和版本，應用程式會從環境相依性載入不正確的版本，而且由於版本不符合，應用程式會在執行期損毀。

  ```
  <jackson.version>2.10.2</jackson.version>
  ...
  <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-jaxb-annotations</artifactId>
      <version>2.10.2</version>
  </dependency>
  ```
+ `PingPongFn` 轉換函數會將輸入資料傳遞到輸出串流，除非輸入資料是 **ping**，在這種情況下，它發出字串 **pong\$1n** 到輸出串流。

  轉換函數的程式碼如下：

  ```
      private static class PingPongFn extends DoFn<KinesisRecord, byte[]> {
      private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class);
      
      @ProcessElement
      public void processElement(ProcessContext c) {
          String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8);
          if (content.trim().equalsIgnoreCase("ping")) {
              LOG.info("Ponged!");
              c.output("pong\n".getBytes(StandardCharsets.UTF_8));
          } else {
              LOG.info("No action for: " + content);
              c.output(c.element().getDataAsBytes());
          }
      }
  }
  ```

## 編譯應用程式程式碼
<a name="examples-beam-compile"></a>

若要編譯應用程式，請執行下列動作：

1. 如果尚未安裝 Java 和 Maven，請先安裝。如需詳細資訊，請參閱[教學課程：開始使用 Managed Service for Apache Flink 中的 DataStream API](getting-started.md)教學課程中的[完成必要的先決條件](getting-started.md#setting-up-prerequisites)。

1. 使用下列命令編譯應用程式：

   ```
   mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
   ```
**注意**  
提供的來源程式碼依賴於 Java 11 中的程式庫。

編譯應用程式會建立應用程式 JAR 檔案 (`target/basic-beam-app-1.0.jar`)。

## 上傳 Apache Flink 串流 Java 程式碼
<a name="examples-beam-upload"></a>

在本節中，您會將應用程式的程式碼上傳至在[建立相依資源](#examples-beam-resources)一節建立的 Amazon S3 儲存貯體。

1. 在 Amazon S3 主控台中，選擇 **ka-app-code-*<username>*** 儲存貯體，並選擇**上傳**。

1. 在**選取檔案**步驟中，選擇**新增檔案**。導覽至您在上一步驟中建立的 `basic-beam-app-1.0.jar` 檔案。

1. 您不需要變更物件的任何設定，因此請選擇**上傳**。

您的應用程式的程式碼現在儲存在您的應用程式可以存取的 Amazon S3 儲存貯體中。

## 建立並執行 Managed Service for Apache Flink 應用程式
<a name="examples-beam-create-run"></a>

依照以下步驟來使用主控台建立、設定、更新及執行應用程式。

### 建立應用程式
<a name="examples-beam-create"></a>

1. 登入 AWS 管理主控台，並在 https：//https://console.aws.amazon.com/flink 開啟 Amazon MSF 主控台。

1. 在 Managed Service for Apache Flink 儀表板上，選擇**建立分析應用程式**。

1. 在 **Managed Service for Apache Flink - 建立應用程式**頁面，提供應用程式詳細資訊，如下所示：
   + 在**應用程式名稱**中，輸入 **MyApplication**。
   + 對於**​執行期**，選擇 **​Apache Flink**。
**注意**  
Apache Beam 目前與 Apache Flink 1.19 版或更新版本不相容。
   + 從版本下拉式清單中選取 **Apache Flink 1.15** 版。

1. 對於**存取許可**，選擇**建立/更新 IAM 角色 `kinesis-analytics-MyApplication-us-west-2`**。

1. 選擇 **建立應用程式**。

**注意**  
使用主控台建立 Managed Service for Apache Flink 應用程式時，可以選擇是否為應用程式建立 IAM 角色和政策。應用程式使用此角色和政策來存取其相依資源。這些 IAM 資源會如下所述使用您的應用程式名稱和區域命名：  
政策：`kinesis-analytics-service-MyApplication-us-west-2`
角色：`kinesis-analytics-MyApplication-us-west-2`

### 編輯 IAM 政策
<a name="get-started-exercise-7-console-iam"></a>

編輯 IAM 政策來新增存取 Kinesis 資料串流的許可。

1. 前往 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 開啟 IAM 主控台。

1. 選擇**政策**。選擇主控台為您在上一節所建立的 **`kinesis-analytics-service-MyApplication-us-west-2`** 政策。

1. 在**摘要**頁面，選擇**編輯政策**。請選擇 **JSON** 標籤。

1. 將下列政策範例的反白部分新增至政策。使用您的帳戶 ID 取代範例帳戶 ID (*012345678901*)。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "logs:DescribeLogGroups",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:*",
                   "arn:aws:s3:::ka-app-code-<username>/basic-beam-app-1.0.jar"
               ]
           },
           {
               "Sid": "DescribeLogStreams",
               "Effect": "Allow",
               "Action": "logs:DescribeLogStreams",
               "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
           },
           {
               "Sid": "PutLogEvents",
               "Effect": "Allow",
               "Action": "logs:PutLogEvents",
               "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

### 設定應用程式
<a name="examples-beam-configure"></a>

1. 在**我的應用程式**頁面，選擇**設定**。

1. 在**設定應用程式**頁面，提供**程式碼位置**：
   + 對於 **Amazon S3 儲存貯體**，請輸入 **ka-app-code-*<username>***。
   + 對於 **Amazon S3 物件的路徑**，請輸入 **basic-beam-app-1.0.jar**。

1. 在**存取應用程式資源**下，對於**存取許可**，選擇**建立/更新 IAM 角色 `kinesis-analytics-MyApplication-us-west-2`**。

1. 輸入下列資料：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/managed-flink/latest/java/examples-beam.html)

1. 在**監控**下，確保**監控指標層級**設為**應用程式**。

1. 針對 **CloudWatch 記錄**，選取**啟用**核取方塊。

1. 選擇**更新**。

**注意**  
當您選擇啟用 CloudWatch 記錄時，Managed Service for Apache Flink 便會為您建立日誌群組和日誌串流。這些資源的名稱如下所示：  
日誌群組：`/aws/kinesis-analytics/MyApplication`
日誌串流：`kinesis-analytics-log-stream`
此日誌串流用於監控應用程式。這與應用程式用來傳送結果的日誌串流不同。

### 執行應用程式
<a name="examples-beam-run"></a>

透過執行應用程式、開啟 Apache Flink 儀表板並選擇所需的 Flink 作業，即可檢視 Flink 作業圖表。

您可以在 CloudWatch 主控台上查看 Managed Service for Apache Flink 指標，以確認應用程式是否正常運作。

## 清除 AWS 資源
<a name="examples-beam-cleanup"></a>

本節包含清除在輪轉時段教學課程中建立 AWS 之資源的程序。

**Topics**
+ [刪除 Managed Service for Apache Flink 應用程式](#examples-beam-cleanup-app)
+ [刪除您的 Kinesis 資料串流](#examples-beam-cleanup-stream)
+ [刪除您的 Amazon S3 物件和儲存貯體](#examples-beam-cleanup-s3)
+ [刪除您的 IAM 資源](#examples-beam-cleanup-iam)
+ [刪除您的 CloudWatch 資源](#examples-beam-cleanup-cw)

### 刪除 Managed Service for Apache Flink 應用程式
<a name="examples-beam-cleanup-app"></a>

1. 登入 AWS 管理主控台，並在 https：//https://console.aws.amazon.com/flink 開啟 Amazon MSF 主控台。

1. 在 Managed Service for Apache Flink 面板中，選擇 **MyApplication**。

1. 在應用程式的頁面，選擇**刪除**，然後確認刪除。

### 刪除您的 Kinesis 資料串流
<a name="examples-beam-cleanup-stream"></a>

1. 在以下網址開啟 Kinesis 主控台：[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)。

1. 在 Kinesis Data Streams 面板中，選擇 **ExampleInputStream**。

1. 在 **ExampleInputStream** 頁面，選擇**刪除 Kinesis 串流**，然後確認刪除。

1. 在 **Kinesis 串流**頁面，依序選擇 **ExampleOutputStream**、**動作**和**刪除**，然後確認刪除。

### 刪除您的 Amazon S3 物件和儲存貯體
<a name="examples-beam-cleanup-s3"></a>

1. 在以下網址開啟 Amazon S3 主控台：[https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)。

1. 選擇 **ka-app-code-*<username>* 儲存貯體。**

1. 選擇**刪除**，然後輸入儲存貯體名稱以確認刪除。

### 刪除您的 IAM 資源
<a name="examples-beam-cleanup-iam"></a>

1. 前往 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 開啟 IAM 主控台。

1. 在導覽列中，選擇**政策**。

1. 在篩選器控制項中，輸入 **kinesis**。

1. 選擇 **kinesis-analytics-service-MyApplication-us-west-2** 政策。

1. 選擇**政策動作**，然後選擇**刪除**。

1. 在導覽列中，選擇**角色**。

1. 選擇 **kinesis-analytics-MyApplication-us-west-2** 角色。

1. 選擇**刪除角色**，然後確認刪除。

### 刪除您的 CloudWatch 資源
<a name="examples-beam-cleanup-cw"></a>

1. 在以下網址開啟 CloudWatch 主控台：[https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/)。

1. 在導覽列中，選擇**日誌**。

1. 選擇 **/aws/kinesis-analytics/MyApplication** 日誌群組。

1. 選擇**刪除日誌群組**，然後確認刪除。

## 後續步驟
<a name="examples-beam-nextsteps"></a>

現在您已建立並執行使用 Apache Beam 轉換資料的基本 Managed Service for Apache Flink 應用程式，請參閱下列應用程式，取得更進階 Managed Service for Apache Flink 解決方案的範例。
+ **[Beam 用於 Managed Service for Apache Flink 串流研討會](https://streaming-analytics.workshop.aws/beam-on-kda/)**：在此研討會中，我們將探索一個端對端範例，將批次和串流方面結合在一個統一的 Apache Beam 管道中。