入門 (Scala) - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

入門 (Scala)

注意

從 1.15 版開始,Flink 不含 Scala。應用程式現在可以使用任何 Scala 版本的 Java API。Flink 仍會在內部幾個關鍵元件中使用 Scala,但不會將 Scala 公開到使用者程式碼類別載入器中。因此,您必須將 Scala 相依性新增至 JAR 封存檔。

如需 Flink 1.15 中的 Scala 變更之詳細資訊,請參閱在 1.15 版中移除了 Scala 相依性

在本練習中,您會建立 Managed Service for Apache Flink 應用程式,以 Kinesis 串流做為來源和接收器。

建立相依資源

在為本練習建立 Managed Service for Apache Flink 應用程式之前,先建立下列相依資源:

  • 兩個 Kinesis 串流,用於輸入和輸出。

  • Amazon S3 儲存貯體,用來儲存應用程式的程式碼 (ka-app-code-<username>)

您可以在主控台中建立 Kinesis 串流和 Amazon S3 儲存貯體。如需建立這些資源的相關指示,請參閱以下主題:

  • 《Amazon Kinesis Data Streams 開發人員指南》中的建立和更新資料串流。為資料串流 ExampleInputStreamExampleOutputStream 命名。

    建立資料串流 (AWS CLI)

    • 若要建立第一個串流 (ExampleInputStream),請使用下列 Amazon Kinesis create-stream AWS CLI 命令。

      aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
    • 若要建立應用程式用來寫入輸出的第二個串流,請執行相同的命令,將串流名稱變更為 ExampleOutputStream

      aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  • 《Amazon Simple Storage Service 使用者指南》中的如何建立 S3 儲存貯體透過附加登入名稱 (例如 ka-app-code-<username>),為 Amazon S3 儲存貯體提供全域唯一的名稱。

其他資源

建立應用程式時,Managed Service for Apache Flink 會建立下列 Amazon CloudWatch 資源 (如果尚不存在該資源):

  • 名為 /AWS/KinesisAnalytics-java/MyApplication 的日誌群組。

  • 名為 kinesis-analytics-log-stream 的日誌串流。

將範例記錄寫入輸入串流

在本節,您會使用 Python 指令碼將範例記錄寫入供應用程式處理的串流。

注意
注意

本節中的 Python 指令碼會使用 AWS CLI。您必須 AWS CLI 將 設定為使用您的帳戶登入資料和預設區域。若要設定您的 AWS CLI,請輸入下列項目:

aws configure
  1. 使用下列內容建立名為 stock.py 的檔案:

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
  2. 執行 stock.py 指令碼:

    $ python stock.py

    在完成教學課程的其餘部分時,讓指令碼保持執行狀態。

下載並檢查應用程式程式碼

此範例的 Python 應用程式的程式碼可從 GitHub 下載。若要下載應用程式的程式碼,請執行下列動作:

  1. 如果您尚未安裝 Git 用戶端,請先安裝。如需詳細資訊,請參閱安裝 Git

  2. 使用以下指令複製遠端儲存庫:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. 導覽至 amazon-kinesis-data-analytics-java-examples/scala/GettingStarted 目錄。

請留意下列與應用程式的程式碼相關的資訊:

  • build.sbt 檔案包含應用程式的組態和相依性資訊,包括 Managed Service for Apache Flink 程式庫。

  • BasicStreamingJob.scala 檔案包含定義應用程式功能的主要方法。

  • 應用程式使用 Kinesis 來源從來源串流讀取。以下程式碼片段會建立 Kinesis 來源:

    private def createSource: FlinkKinesisConsumer[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val inputProperties = applicationProperties.get("ConsumerConfigProperties") new FlinkKinesisConsumer[String](inputProperties.getProperty(streamNameKey, defaultInputStreamName), new SimpleStringSchema, inputProperties) }

    應用程式也會使用 Kinesis 接收器寫入結果串流。以下程式碼片段會建立 Kinesis 目的地:

    private def createSink: KinesisStreamsSink[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val outputProperties = applicationProperties.get("ProducerConfigProperties") KinesisStreamsSink.builder[String] .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema) .setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName)) .setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode)) .build }
  • 應用程式會建立來源與目的地連接器,以使用 StreamExecutionEnvironment 物件來存取外部資源。

  • 應用程式會使用動態應用程式屬性來建立來源與目的地連接器。會讀取執行期應用程式的屬性,來設定連接器。如需執行期屬性的詳細資訊,請參閱執行期屬性

編譯和上傳應用程式的程式碼

在本節中,您會編譯應用程式的程式碼,並將其上傳至在建立相依資源一節建立的 Amazon S3 儲存貯體。

編譯應用程式的程式碼

在本節中,您將使用 SBT 建置工具來建置應用程式的 Scala 程式碼。若要安裝 SBT,請參閱使用 cs 安裝程式安裝 sbt。您還需要安裝 Java 開發套件 (JDK)。請參閱完成練習的先決條件

  1. 請將應用程式的程式碼編譯並封裝成 JAR 檔案,以使用應用程式的程式碼。您可以使用 SBT 編譯和封裝程式碼:

    sbt assembly
  2. 如果應用程式成功編譯,則會建立下列檔案:

    target/scala-3.2.0/getting-started-scala-1.0.jar
上傳 Apache Flink 串流 Scala 程式碼

在本節中,您會建立 Amazon S3 儲存貯體並上傳您應用程式的程式碼。

  1. 開啟位於 https://console.aws.amazon.com/s3/ 的 Amazon S3 主控台。

  2. 選擇建立儲存貯體

  3. 儲存貯體名稱欄位中,輸入 ka-app-code-<username>。新增尾碼至儲存貯體名稱,例如您的使用者名稱,使其成為全域唯一的。選擇 Next (下一步)

  4. 設定選項中,保留原有設定並選擇下一步

  5. 設定許可步驟中,保留原有設定並選擇下一步

  6. 選擇建立儲存貯體

  7. 選擇 ka-app-code-<username> 儲存貯體,然後選擇上傳

  8. 選取檔案步驟中,選擇 新增檔案。導覽至您在上一步驟中建立的 getting-started-scala-1.0.jar 檔案。

  9. 您不需要變更物件的任何設定,因此請選擇上傳

您的應用程式的程式碼現在儲存在您的應用程式可以存取的 Amazon S3 儲存貯體中。