在 Step Functions CSV 數中使用分佈式地圖複製大規模數 - AWS Step Functions

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

在 Step Functions CSV 數中使用分佈式地圖複製大規模數

本教學課程可協助您開始在「分散式」模式中使用Map狀態。設置為分佈式的Map狀態稱為分佈式地圖狀態。您可以在工作流程中使用分散式地圖狀態來迭代大規模的 Amazon S3 資料來源。狀Map態會將每個版序當做子工作流程執行執行來執行,以達到高並行性。如需分散式模式的詳細資訊,請參閱分散式模式中的對應狀態

在本教學中,您會使用「分散式地圖」狀態來迭代 Amazon S3 儲存貯體中的CSV檔案。然後,您將其內容連同子工作流程執行ARN的內容一起傳回到另一個 Amazon S3 儲存貯體中。您可以先在工作流程 Studio 中建立工作流程原型。接下來,您將Map狀態的處理模式設定為「分散式」、將CSV檔案指定為資料集,並將其位置提供給Map狀態。您也可以指定子工作流程執行的工作流程類型,分散式對應狀態開始為 Express

除了這些設定之外,您還可以針對本教學課程中使用的範例工作流程指定其他組態,例如同時執行子項工作流程的最大數目,以及匯出Map結果的位置。

必要條件

  • 將CSV檔案上傳到 Amazon S3 儲存貯體。您必須在CSV檔案中定義標題列。若要取得有關對CSV檔案施加的大小限制以及如何指定標頭列的資訊,請參閱〈〉CSV文件在 Amazon S3 存儲桶

  • 在該儲存貯體內建立另一個 Amazon S3 儲存貯體和一個資料夾,以將Map狀態結果匯出到。

重要

確保您的 Amazon S3 存儲桶位於相同 AWS 帳戶 以及 AWS 區域 作為你的狀態機。

步驟 1:建立工作流程原型

在此步驟中,您可以使用工作流程 Studio 建立工作流程的原型。工作流程 Studio 是 Step Functions 控制台中提供的可視化工作流程設計器。您可以分別從「流程」和「API動作」標籤中選擇所需的狀態和動作。您將使用拖放工作流 Studio 的功能來創建工作流原型。

  1. 開啟 Step Functions 主控台,然後選擇建立狀態機器

  2. 在「選擇範本」對話方塊中,選取「空白」。

  3. 選擇選以在中開啟工作流程工作室設計模式

  4. 從「程」索引標籤中,將地圖狀態拖放至標示為「在此處拖曳第一個狀態」的空白狀態

  5. 組態索引標籤中,對於狀態名稱,輸入Process data

  6. 從「動」標籤中,拖曳 AWS Lambda 叫用API動作並將其放置在處理程序資料狀態中。

  7. 重新命名 AWS Lambda 呼叫狀態為Process CSV data

第 2 步:配置地圖狀態的必填字段

在此步驟中,您可以設定「分散式對應」狀態的下列必要欄位:

  • ItemReader— 指定資料集及其Map狀態可從中讀取輸入的位置。

  • ItemProcessor— 指定下列值:

    • ProcessorConfigExecutionTypeMode和設定為DISTRIBUTEDEXPRESS分別。這會設定Map狀態的處理模式和「分散式對映」狀態啟動之子工作流程執行的工作流程類型。

    • StartAt—「地圖」工作流程中的第一個狀態。

    • States— 定義 Map 工作流程,這是每個子工作流程執行中要重複的一組步驟。

  • ResultWriter— 指定 Step Functions 寫入分散式地圖狀態結果的 Amazon S3 位置。

    重要

    確定您用來匯出地圖執行結果的 Amazon S3 儲存貯體是相同的 AWS 帳戶 以及 AWS 區域 作為你的狀態機。否則,您的狀態機執行將失敗並顯示States.ResultWriterFailed錯誤。

若要設定必要欄位:
  1. 選擇處理資料狀態,然後在組態索引標籤中執行下列動作:

    1. 對於「處理」模式,請選擇「分散

    2. 對於項目來源,請選擇 Amazon S3,然後從 S3 項目來源下拉式清單中選擇 S3 中的CSV檔案

    3. 執行下列動作以指定CSV檔案的 Amazon S3 位置:

      1. 對於 S3 物件,請從下拉式清單中選取 [輸入儲存貯體和金鑰]。

      2. 在儲存體中,輸入包含CSV檔案的 Amazon S3 儲存貯體的名稱。例如:amzn-s3-demo-source-bucket

      3. 金鑰中,輸入您儲存CSV檔案的 Amazon S3 物件名稱。您還必須在此欄位中指定CSV檔案的名稱。例如:csvDataset/ratings.csv

    4. 對於CSV檔案,您還必須指定欄標題的位置。若要這麼做,請選擇 [其他組態],然後針對CSV標頭位置保留預設選取 [第一列] (如果CSV檔案的第一列是標頭)。否則,請選擇「定」以在狀態機定義中指定標頭。如需詳細資訊,請參閱ReaderConfig

    5. 針對子項執行類型,選擇快速

  2. 匯出位置中,若要將地圖執行結果匯出到特定的 Amazon S3 位置,請選擇將地圖狀態的輸出匯出到 Amazon S3

  3. 請執行下列操作:

    1. 對於 S3 儲存貯體,請從下拉式清單中選擇 [輸入儲存貯體名稱和前綴]

    2. 在儲存體中,輸入您要將結果匯出到的 Amazon S3 儲存貯體的名稱。例如:mapOutputs

    3. 在「字首」中,輸入您要儲存結果的資料夾名稱。例如:resultData

步驟 3:設定其他選項

除了分散式貼圖狀態的必要設定之外,您還可以指定其他選項。這些可能包括同時執行子項工作流程的最大數目,以及要將Map狀態結果匯出至的位置。

  1. 選擇「處理資料」狀態。然後,在「項目來源」中,選擇「其他組態」。

  2. 請執行下列操作:

    1. 選擇 [修改項目],為每個子工作流程執行指定自訂JSON輸入。 ItemSelector

    2. 輸入以下JSON輸入:

      { "index.$": "$$.Map.Item.Index", "value.$": "$$.Map.Item.Value" }

      若要取得有關如何建立自訂輸入的資訊,請參閱〈〉ItemSelector (地圖)

  3. 在「執行階段」設定中,對於並行限制,指定「分散式對應」狀態可以啟動的並行子工作流程執行數目。例如,​輸入 100

  4. 在瀏覽器上開啟新視窗或索引標籤,並完成您將在此工作流程中使用的 Lambda 函數的設定,如中所述步驟 4:設定 Lambda 函數

步驟 4:設定 Lambda 函數

重要

確保您的 Lambda 函數處於相同 AWS 區域 作為你的狀態機。

  1. 開啟 Lambda 主控台,然後選擇建立函數

  2. Create function (建立函數) 頁面上,選擇 Author from scratch (從頭開始撰寫)

  3. 在「基本資訊」區段中,設定 Lambda 函數:

    1. 針對 函數名稱 ,請輸入 distributedMapLambda

    2. 針對 Runtime (執行時間),選擇 Node.js

    3. 保留所有預設選項,然後選擇 [建立功能]。

    4. 建立 Lambda 函數後,複製頁面右上角顯示的函數的 Amazon 資源名稱 (ARN)。您需要在工作流程原型中提供此功能。下面是一個例子ARN:

      arn:aws:lambda:us-east-2:123456789012:function:distributedMapLambda
  4. 複製 Lambda 函數的下列程式碼,並將其貼到distributedMapLambda頁面的程式碼來源區段中。

    exports.handler = async function(event, context) { console.log("Received Input:\n", event); return { 'statusCode' : 200, 'inputReceived' : event //returns the input that it received } };
  5. 選擇部署。部署函數後,請選擇「測試」以查看 Lambda 函數的輸出。

步驟 5:更新工作流程原型

在 Step Functions 數主控台中,您將更新工作流程以新增 Lambda 函數ARN。

  1. 返回您在其中建立工作流程原型的標籤或視窗。

  2. 選擇「處理CSV資料」步驟,然後在「組態」索引標籤中執行下列動作:

    1. 針對「整合類型」,選擇最佳化

    2. 函數名稱中,開始輸入 Lambda 函數的名稱。從出現的下拉式清單中選擇函數,或選擇 [輸入函數名稱] 並提供 Lambda 函數ARN。

步驟 6:檢閱自動產生的 Amazon 州語言定義並儲存工作流程

當您將狀態從「動作」和「流程」索引標籤拖放到畫布上時,Work flow Studio 會自動即時編寫工作流程的 Amazon States 語言定義。您可以視需要編輯此定義。

  1. (選擇性) 在檢查器面板面板上選擇「定義」並檢視狀態機定義。

    提示

    您也可以檢視工作流程工作室中程式碼編輯器的ASL定義。在程式碼編輯器中,您也可以編輯工作流程的ASL定義。

    下列範例程式碼顯示為您的工作流程自動產生的 Amazon States 語言定義。

    { "Comment": "Using Map state in Distributed mode", "StartAt": "Process data", "States": { "Process data": { "Type": "Map", "MaxConcurrency": 100, "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-source-bucket", "Key": "csvDataset/ratings.csv" } }, "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" }, "StartAt": "Process CSV data", "States": { "Process CSV data": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-2:123456789012:function:distributedMapLambda" }, "End": true } } }, "Label": "Processdata", "End": true, "ResultWriter": { "Resource": "arn:aws:states:::s3:putObject", "Parameters": { "Bucket": "mapOutputs", "Prefix": "resultData" } }, "ItemSelector": { "index.$": "$$.Map.Item.Index", "value.$": "$$.Map.Item.Value" } } } }
  2. 指定狀態機的名稱。若要執行此操作,請選擇的預設狀態機器名稱旁邊的編輯圖示MyStateMachine。然後,在 [狀態機器組態] 中,在 [狀態機器名稱] 方塊中指定名稱

    針對本教學課程,輸入名稱 DistributedMapDemo

  3. (選擇性) 在狀態機器組態中,指定其他工作流程設定,例如狀態機器類型及其執行角色。

    在本教學課程中,請保留狀態機器組態中的所有預設選項。

  4. 在 [確認角色建立] 對話方塊中,選擇 [確認] 以繼續。

    您也可以選擇 [檢視角色設定] 以返回 [狀態機器組態]。

    注意

    如果刪除 Step Functions 所建立的IAM角色,則 Step Functions 稍後無法重新建立。同樣地,如果您修改角色 (例如,從原則中的主參與者中移除 Step Functions),IAM則 Step Functions 稍後無法還原其原始設定。

步驟 7:運行狀態機

執行是您執行工作流程以執行工作的狀態機器執行個體。

  1. DistributedMapDemo頁面上,選擇 [開始執行]。

  2. 在 [開始執行] 對話方塊中,執行下列動作:

    1. (選擇性) 輸入自訂執行名稱,以覆寫產生的預設值。

      非ASCII名稱和記錄

      Step Functions 接受包含非ASCII字元的狀態機器、執行項目、活動和標籤的名稱。由於此類字元不適用於 Amazon CloudWatch,因此我們建議您僅使用ASCII字元,以便在中追蹤指標 CloudWatch。

    2. (選擇性) 在「入」方塊中,以JSON格式輸入輸入值以執行工作流程。

    3. 選擇 Start execution (開始執行)

    4. 「Step Functions」主控台會將您引導至標題為您的執行 ID 的頁面。此頁面稱為「執行詳細資訊」頁面。在此頁面上,您可以在執行進行時或完成之後複查執行結果。

      若要複查執行結果,請在「圖形」檢視中選擇個別狀態,然後選擇步驟詳情窗格上的個別索引標籤,分別檢視每個狀態的詳細資訊,包括輸入、輸出和定義。如需有關可在「執行詳細資訊」頁面檢視之執行資訊的詳細資訊,請參閱執行細節概述

    例如,選擇Map狀態,然後選擇 [對應執行] 以開啟 [對映執行詳細資訊] 頁面。在此頁面上,您可以檢視分散式對應狀態的所有執行詳細資訊,以及其啟動的子工作流程執行項目。如需有關此頁面的資訊,請參閱檢視地圖執行