本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
在 Step Functions CSV 數中使用分佈式地圖複製大規模數
本教學課程可協助您開始在「分散式」模式中使用Map
狀態。設置為分佈式的Map
狀態稱為分佈式地圖狀態。您可以在工作流程中使用分散式地圖狀態來迭代大規模的 Amazon S3 資料來源。狀Map
態會將每個版序當做子工作流程執行執行來執行,以達到高並行性。如需分散式模式的詳細資訊,請參閱分散式模式中的對應狀態。
在本教學中,您會使用「分散式地圖」狀態來迭代 Amazon S3 儲存貯體中的CSV檔案。然後,您將其內容連同子工作流程執行ARN的內容一起傳回到另一個 Amazon S3 儲存貯體中。您可以先在工作流程 Studio 中建立工作流程原型。接下來,您將Map狀態的處理模式設定為「分散式」、將CSV檔案指定為資料集,並將其位置提供給Map
狀態。您也可以指定子工作流程執行的工作流程類型,分散式對應狀態開始為 Express。
除了這些設定之外,您還可以針對本教學課程中使用的範例工作流程指定其他組態,例如同時執行子項工作流程的最大數目,以及匯出Map
結果的位置。
必要條件
將CSV檔案上傳到 Amazon S3 儲存貯體。您必須在CSV檔案中定義標題列。若要取得有關對CSV檔案施加的大小限制以及如何指定標頭列的資訊,請參閱〈〉CSV文件在 Amazon S3 存儲桶。
在該儲存貯體內建立另一個 Amazon S3 儲存貯體和一個資料夾,以將
Map
狀態結果匯出到。
重要
確保您的 Amazon S3 存儲桶位於相同 AWS 帳戶 以及 AWS 區域 作為你的狀態機。
步驟 1:建立工作流程原型
在此步驟中,您可以使用工作流程 Studio 建立工作流程的原型。工作流程 Studio 是 Step Functions 控制台中提供的可視化工作流程設計器。您可以分別從「流程」和「API動作」標籤中選擇所需的狀態和動作。您將使用拖放工作流 Studio 的功能來創建工作流原型。
開啟 Step Functions 主控台
,然後選擇建立狀態機器。 在「選擇範本」對話方塊中,選取「空白」。
-
選擇選取以在中開啟工作流程工作室設計模式。
從「流程」索引標籤中,將地圖狀態拖放至標示為「在此處拖曳第一個狀態」的空白狀態。
在組態索引標籤中,對於狀態名稱,輸入
Process data
。從「動作」標籤中,拖曳 AWS Lambda 叫用API動作並將其放置在處理程序資料狀態中。
-
重新命名 AWS Lambda 呼叫狀態為
Process CSV data
。
第 2 步:配置地圖狀態的必填字段
在此步驟中,您可以設定「分散式對應」狀態的下列必要欄位:
ItemReader— 指定資料集及其
Map
狀態可從中讀取輸入的位置。ItemProcessor— 指定下列值:
ProcessorConfig
—ExecutionType
將Mode
和設定為DISTRIBUTED
和EXPRESS
分別。這會設定Map
狀態的處理模式和「分散式對映」狀態啟動之子工作流程執行的工作流程類型。StartAt
—「地圖」工作流程中的第一個狀態。States
— 定義 Map 工作流程,這是每個子工作流程執行中要重複的一組步驟。
ResultWriter— 指定 Step Functions 寫入分散式地圖狀態結果的 Amazon S3 位置。
重要
確定您用來匯出地圖執行結果的 Amazon S3 儲存貯體是相同的 AWS 帳戶 以及 AWS 區域 作為你的狀態機。否則,您的狀態機執行將失敗並顯示
States.ResultWriterFailed
錯誤。
若要設定必要欄位:
選擇處理資料狀態,然後在組態索引標籤中執行下列動作:
對於「處理」模式,請選擇「分散
對於項目來源,請選擇 Amazon S3,然後從 S3 項目來源下拉式清單中選擇 S3 中的CSV檔案。
執行下列動作以指定CSV檔案的 Amazon S3 位置:
對於 S3 物件,請從下拉式清單中選取 [輸入儲存貯體和金鑰]。
在儲存貯體中,輸入包含CSV檔案的 Amazon S3 儲存貯體的名稱。例如:
amzn-s3-demo-source-bucket
。在金鑰中,輸入您儲存CSV檔案的 Amazon S3 物件名稱。您還必須在此欄位中指定CSV檔案的名稱。例如:
csvDataset/ratings.csv
。
對於CSV檔案,您還必須指定欄標題的位置。若要這麼做,請選擇 [其他組態],然後針對CSV標頭位置保留預設選取 [第一列] (如果CSV檔案的第一列是標頭)。否則,請選擇「給定」以在狀態機定義中指定標頭。如需詳細資訊,請參閱
ReaderConfig
。針對子項執行類型,選擇快速。
在匯出位置中,若要將地圖執行結果匯出到特定的 Amazon S3 位置,請選擇將地圖狀態的輸出匯出到 Amazon S3。
請執行下列操作:
對於 S3 儲存貯體,請從下拉式清單中選擇 [輸入儲存貯體名稱和前綴]
在儲存貯體中,輸入您要將結果匯出到的 Amazon S3 儲存貯體的名稱。例如:
mapOutputs
。在「字首」中,輸入您要儲存結果的資料夾名稱。例如:
resultData
。
步驟 3:設定其他選項
除了分散式貼圖狀態的必要設定之外,您還可以指定其他選項。這些可能包括同時執行子項工作流程的最大數目,以及要將Map
狀態結果匯出至的位置。
選擇「處理資料」狀態。然後,在「項目來源」中,選擇「其他組態」。
請執行下列操作:
選擇 [修改項目],為每個子工作流程執行指定自訂JSON輸入。 ItemSelector
輸入以下JSON輸入:
{
"index.$":
"$$.Map.Item.Index"
,"value.$":
"$$.Map.Item.Value"
}若要取得有關如何建立自訂輸入的資訊,請參閱〈〉
ItemSelector (地圖)
。
在「執行階段」設定中,對於並行限制,指定「分散式對應」狀態可以啟動的並行子工作流程執行數目。例如,輸入
100
。-
在瀏覽器上開啟新視窗或索引標籤,並完成您將在此工作流程中使用的 Lambda 函數的設定,如中所述步驟 4:設定 Lambda 函數。
步驟 4:設定 Lambda 函數
重要
確保您的 Lambda 函數處於相同 AWS 區域 作為你的狀態機。
-
開啟 Lambda 主控台
,然後選擇建立函數。 -
在 Create function (建立函數) 頁面上,選擇 Author from scratch (從頭開始撰寫)。
-
在「基本資訊」區段中,設定 Lambda 函數:
-
針對 函數名稱 ,請輸入
distributedMapLambda
。 -
針對 Runtime (執行時間),選擇 Node.js 。
-
保留所有預設選項,然後選擇 [建立功能]。
-
建立 Lambda 函數後,複製頁面右上角顯示的函數的 Amazon 資源名稱 (ARN)。您需要在工作流程原型中提供此功能。下面是一個例子ARN:
arn:aws:lambda:us-east-2:123456789012:function:distributedMapLambda
-
-
複製 Lambda 函數的下列程式碼,並將其貼到distributedMapLambda頁面的程式碼來源區段中。
exports.handler = async function(event, context) { console.log("Received Input:\n", event); return { 'statusCode' : 200, 'inputReceived' : event //returns the input that it received } };
-
選擇部署。部署函數後,請選擇「測試」以查看 Lambda 函數的輸出。
步驟 5:更新工作流程原型
在 Step Functions 數主控台中,您將更新工作流程以新增 Lambda 函數ARN。
返回您在其中建立工作流程原型的標籤或視窗。
選擇「處理CSV資料」步驟,然後在「組態」索引標籤中執行下列動作:
針對「整合類型」,選擇最佳化。
在函數名稱中,開始輸入 Lambda 函數的名稱。從出現的下拉式清單中選擇函數,或選擇 [輸入函數名稱] 並提供 Lambda 函數ARN。
步驟 6:檢閱自動產生的 Amazon 州語言定義並儲存工作流程
當您將狀態從「動作」和「流程」索引標籤拖放到畫布上時,Work flow Studio 會自動即時編寫工作流程的 Amazon States 語言定義。您可以視需要編輯此定義。
-
(選擇性) 在檢查器面板面板上選擇「定義」並檢視狀態機定義。
提示
您也可以檢視工作流程工作室中程式碼編輯器的ASL定義。在程式碼編輯器中,您也可以編輯工作流程的ASL定義。
下列範例程式碼顯示為您的工作流程自動產生的 Amazon States 語言定義。
{ "Comment": "Using Map state in Distributed mode", "StartAt": "Process data", "States": { "Process data": { "Type": "Map", "MaxConcurrency": 100, "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-source-bucket", "Key": "csvDataset/ratings.csv" } }, "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" }, "StartAt": "Process CSV data", "States": { "Process CSV data": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-2:123456789012:function:distributedMapLambda" }, "End": true } } }, "Label": "Processdata", "End": true, "ResultWriter": { "Resource": "arn:aws:states:::s3:putObject", "Parameters": { "Bucket": "mapOutputs", "Prefix": "resultData" } }, "ItemSelector": { "index.$": "$$.Map.Item.Index", "value.$": "$$.Map.Item.Value" } } } }
-
指定狀態機的名稱。若要執行此操作,請選擇的預設狀態機器名稱旁邊的編輯圖示MyStateMachine。然後,在 [狀態機器組態] 中,在 [狀態機器名稱] 方塊中指定名稱。
針對本教學課程,輸入名稱
DistributedMapDemo
。 -
(選擇性) 在狀態機器組態中,指定其他工作流程設定,例如狀態機器類型及其執行角色。
在本教學課程中,請保留狀態機器組態中的所有預設選項。
-
在 [確認角色建立] 對話方塊中,選擇 [確認] 以繼續。
您也可以選擇 [檢視角色設定] 以返回 [狀態機器組態]。
注意
如果刪除 Step Functions 所建立的IAM角色,則 Step Functions 稍後無法重新建立。同樣地,如果您修改角色 (例如,從原則中的主參與者中移除 Step Functions),IAM則 Step Functions 稍後無法還原其原始設定。
步驟 7:運行狀態機
執行是您執行工作流程以執行工作的狀態機器執行個體。
-
在DistributedMapDemo頁面上,選擇 [開始執行]。
-
在 [開始執行] 對話方塊中,執行下列動作:
-
(選擇性) 輸入自訂執行名稱,以覆寫產生的預設值。
非ASCII名稱和記錄
Step Functions 接受包含非ASCII字元的狀態機器、執行項目、活動和標籤的名稱。由於此類字元不適用於 Amazon CloudWatch,因此我們建議您僅使用ASCII字元,以便在中追蹤指標 CloudWatch。
-
(選擇性) 在「輸入」方塊中,以JSON格式輸入輸入值以執行工作流程。
-
選擇 Start execution (開始執行)。
-
「Step Functions」主控台會將您引導至標題為您的執行 ID 的頁面。此頁面稱為「執行詳細資訊」頁面。在此頁面上,您可以在執行進行時或完成之後複查執行結果。
若要複查執行結果,請在「圖形」檢視中選擇個別狀態,然後選擇步驟詳情窗格上的個別索引標籤,分別檢視每個狀態的詳細資訊,包括輸入、輸出和定義。如需有關可在「執行詳細資訊」頁面檢視之執行資訊的詳細資訊,請參閱執行細節概述。
例如,選擇
Map
狀態,然後選擇 [對應執行] 以開啟 [對映執行詳細資訊] 頁面。在此頁面上,您可以檢視分散式對應狀態的所有執行詳細資訊,以及其啟動的子工作流程執行項目。如需有關此頁面的資訊,請參閱檢視地圖執行。 -