在 Step Functions 數中使用分散式模式中的對應狀態來處理大規模 parallel - AWS Step Functions

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

在 Step Functions 數中使用分散式模式中的對應狀態來處理大規模 parallel

使用 Step Functions,您可以協調大規模的 parallel 工作負載以執行工作,例如按需處理半結構化資料。這些 parallel 工作負載可讓您同時處理存放在 Amazon S3 中的大規模資料來源。例如,您可能會處理包含大量資料的單一JSON或CSV檔案。或者,您可能會處理大量的 Amazon S3 物件集。

若要在工作流程中設定大規模的 parallel 工作負載,請在「分散式」模式中包含Map狀態。Map 狀態會同時處理資料集中的項目。設置為分佈式的Map狀態稱為分佈式地圖狀態。在分散式模式下,狀Map態允許高並行處理。在「分散式」模式中,Map狀態會以稱為子工作流程執行的版序處理資料集中的項目。您可以指定可以 parallel 執行的子工作流程執行數目。每個子工作流程執行都有自己的執行記錄,與父工作流程的執行歷程記錄不同。如果未指定,「Step Functions」會 parallel 執行 10,000 個 parallel 子工作流程執行。

下圖說明如何在工作流程中設定大規模的 parallel 工作負載。

圖表說明編排大規模 parallel 工作負載的概念。
在工作坊中學習

瞭解 Step Functions 和 Lambda 等無伺服器技術如何簡化管理和擴展、卸載未差異化的工作,以及解決大規模分散式資料處理的挑戰。一路上,您將使用分佈式地圖進行高並發處理。該研討會還介紹了優化工作流程的最佳實踐,以及理賠處理,漏洞掃描和蒙特卡羅模擬的實際用例。

工作坊:使用 Step Functions 進行大規模數據

重要用語

分散式模式

地圖狀態的處理模式。在此模式下,Map狀態的每個版序都會以啟用高並行性的子工作流程執行方式執行來執行。每個子工作流程執行都有自己的執行歷程記錄,這與父工作流程的執行歷程記錄不同。此模式支援從大規模 Amazon S3 資料來源讀取輸入。

分散式地圖狀態

「對映」狀態設定為「分散式」處理模式

地圖工作流

Map狀態執行的一組步驟。

父工作流程

包含一或多個分散式地圖狀態的工作流程。

子工作流程執行

分散式貼圖」狀態的版序。子工作流程執行具有自己的執行歷程記錄,與父工作流程的執行歷程記錄不同。

地圖運行

當您在分散式模式下執行Map狀態時,「Step Functions」會建立「對應執行」資源。Map Run 是指分散式地圖狀態啟動的一組子工作流程執行,以及控制這些執行的執行階段設定。Step Functions 分配一個 Amazon 資源名稱(ARN)到您的地圖運行。您可以在 Step Functions 控制台中檢查地圖運行。您也可以呼叫DescribeMapRunAPI動作。「地圖執行」也會向其發出 CloudWatch度量。

如需詳細資訊,請參閱檢視地圖執行

分散式對應狀態定義範例

當您需要協調符合下列任何條件組合的大規模 parallel 工作負載時,請使用分散式模式中的Map狀態:

  • 資料集的大小超過 256 KB。

  • 工作流程的執行事件歷程記錄超過 25,000 個項目。

  • 您需要超過 40 個 parallel 迭代的並發性。

下列分散式地圖狀態定義範例會將資料集指定為存放在 Amazon S3 儲存貯體中的CSV檔案。它還指定了一個 Lambda 函數,用於處理CSV文件的每一行中的數據。由於此範例使用CSV檔案,因此也會指定CSV欄標題的位置。若要檢視此範例的完整狀態機定義,請參閱使用分散式地圖複製大型CSV資料教學課程。

{ "Map": { "Type": "Map", "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "csv-dataset/ratings.csv" } }, "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" }, "StartAt": "LambdaTask", "States": { "LambdaTask": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-2:123456789012:function:processCSVData" }, "End": true } } }, "Label": "Map", "End": true, "ResultWriter": { "Resource": "arn:aws:states:::s3:putObject", "Parameters": { "Bucket": "amzn-s3-demo-destination-bucket", "Prefix": "csvProcessJobs" } } } }

執行分散式地圖的權限

當您在工作流程中包含分散式對應狀態時,Step Functions 需要適當的權限,以允許狀態機器角色呼叫分散式對應狀態StartExecutionAPI動作。

下列IAM原則範例授與狀態機器角色執行分散式對應狀態所需的最少權限。

注意

請確定您以使stateMachineName分散式地圖狀態的狀態機器名稱取代。例如:arn:aws:states:us-east-2:123456789012:stateMachine:mystateMachine

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "states:StartExecution" ], "Resource": [ "arn:aws:states:region:accountID:stateMachine:stateMachineName" ] }, { "Effect": "Allow", "Action": [ "states:DescribeExecution", "states:StopExecution" ], "Resource": "arn:aws:states:region:accountID:execution:stateMachineName:*" } ] }

此外,您還需要確保您擁有訪問所需的最少權限 AWS 分散式地圖狀態中使用的資源,例如 Amazon S3 儲存貯體。如需相關資訊,請參閱 IAM使用分散式地圖狀態的原則

分散式地圖狀態欄位

若要在工作流程中使用「分散式地圖」狀態,請指定其中一個或多個欄位。除了一般狀態欄位之外,您還可以指定這些欄位

Type (必要)

設定狀態的類型,例如Map

ItemProcessor (必要)

包含下列指JSON定Map狀態處理模式和定義的物件。

  • ProcessorConfig— 指定處理項目模式的JSON物件,具有下列子欄位:

    • Mode— 設定為使DISTRIBUTED用分散式模式中的Map狀態。

      警告

      「標準」工作流程支援分散式模式,但在 Express 工作流程中不支援。

    • ExecutionType— 將 Map 工作流程的執行類型指定為STANDARDEXPRESS。如果您DISTRIBUTEDMode子欄位指定了此欄位,則必須提供此欄位。如需工作流程類型的詳細資訊,請參閱在 Step Functions 中選擇工作流程類

  • StartAt— 指定指示工作流程中第一個狀態的字串。此字串區分大小寫,且必須與其中一個狀態物件的名稱相符。此狀態會先針對資料集中的每個項目執行。您提供給Map狀態的任何執行輸入會先傳遞至StartAt狀態。

  • States— 包JSON含逗號分隔狀態集的物件。在此物件中,您可以定義Map workflow.

ItemReader

指定資料集及其位置。狀Map態會從指定的資料集接收其輸入資料。

在分散式模式中,您可以使用從先前狀態傳遞的JSON承載,或使用大型 Amazon S3 資料來源作為資料集。如需詳細資訊,請參閱ItemReader (地圖)

ItemsPath (選用)

使用JsonPath語法來指定參考路徑,該JSON節點包含狀態輸入內的項目陣列。

在「分散式」模式中,只有當您使用上一個步驟中的JSON陣列作為狀態輸入時,才能指定此欄位。如需詳細資訊,請參閱ItemsPath (地圖)

ItemSelector (選用)

覆寫個別資料集項目的值,然後再傳遞至每個Map狀態反覆運算。

在此欄位中,您可以指定包含索引鍵值組集合的有效JSON輸入。這些配對可以是您在狀態機器定義中定義的靜態值、使用路徑從狀態輸入中選取的值,或是從前後關聯物件存取的值。如需詳細資訊,請參閱ItemSelector (地圖)

ItemBatcher (選用)

指定以批次方式處理資料集項目。然後,每個子工作流程執行都會接收這些項目的批次作為輸入。如需詳細資訊,請參閱ItemBatcher (地圖)

MaxConcurrency (選用)

指定可以 parallel 執行的子工作流程執行數目。解譯器最多只允許指定數目的 parallel 子工作流程執行。如果您未指定並行值或將其設定為零,則 Step Functions 不會限制並行執行 10,000 個 parallel 子工作流程執行。

注意

雖然您可以為 parallel 子項工作流程執行指定較高的並行限制,但建議您不要超過下游的容量 AWS 服務,如 AWS Lambda.

MaxConcurrencyPath (選用)

如果您想要使用MaxConcurrencyPath參考路徑從狀態輸入動態提供最大並行值,請使用。解析後,參考路徑必須選取值為非負整數的欄位。

注意

Map態不能同時包含MaxConcurrencyMaxConcurrencyPath

ToleratedFailurePercentage (選用)

定義在地圖執行中容許的失敗項目百分比。如果地圖執行超過此百分比,則會自動失敗。Step Functions 會計算失敗項目的百分比,因為失敗或逾時項目總數除以項目總數的結果。您必須指定一個介於零和 100 之間的值。如需詳細資訊,請參閱在 Step Functions 中設定分散式對應狀態的失敗臨界值

ToleratedFailurePercentagePath (選用)

如果您想要使用ToleratedFailurePercentagePath參照路徑,從狀態輸入動態提供容錯失敗百分比值,請使用。解析後,參考路徑必須選取值介於 0 和 100 之間的欄位。

ToleratedFailureCount (選用)

定義地圖執行中要容許的失敗項目數。如果地圖執行超過此數目,則會自動失敗。如需詳細資訊,請參閱在 Step Functions 中設定分散式對應狀態的失敗臨界值

ToleratedFailureCountPath (選用)

如果您想要使用ToleratedFailureCountPath參照路徑,從狀態輸入動態提供容忍失敗計數值,請使用。解析後,參考路徑必須選取值為非負整數的欄位。

Label (選用)

唯一識別Map狀態的字串。對於每個地圖運行,Step Functions 將標籤添加到地圖運行ARN。以下是具ARN有名為的自訂標籤的 Map Run 範例demoLabel

arn:aws:states:us-east-1:123456789012:mapRun:demoWorkflow/demoLabel:3c39a231-69bb-3d89-8607-9e124eddbb0b

如果您沒有指定標籤,「Step Functions」會自動產生唯一的標籤。

注意

標籤長度不能超過 40 個字元,在狀態機器定義中必須是唯一的,且不能包含下列任何字元:

  • 空白符號

  • 萬用字元 (? *)

  • 括號字元 (< > { } [ ])

  • 特殊字元 (: ; , \ | ^ ~ $ # % & ` ")

  • 控制字符(\\u0000-\\u001f\\u007f-\\u009f)。

Step Functions 接受包含非ASCII字元的狀態機器、執行項目、活動和標籤的名稱。由於此類字元不適用於 Amazon CloudWatch,因此我們建議您僅使用ASCII字元,以便在中追蹤指標 CloudWatch。

ResultWriter (選用)

指定 Step Functions 寫入所有子工作流程執行結果的 Amazon S3 位置。

Step Functions 會合併所有子工作流程執行資料,例如執行輸入與輸出ARN,以及執行狀態。然後,它會將具有相同狀態的執行匯出到指定 Amazon S3 位置的個別檔案。如需詳細資訊,請參閱ResultWriter (地圖)

如果您不匯出Map狀態結果,它會傳回所有子工作流程執行結果的陣列。例如:

[1, 2, 3, 4, 5]
ResultPath (選用)

指定在輸入中放置迭代輸出的位置。然後,在輸入作為狀態的輸出傳遞之前,按照OutputPath字段指定(如果存在)進行過濾。如需詳細資訊,請參閱輸入和輸出處理

ResultSelector (選用)

傳遞鍵值對的集合,其中值是靜態的或從結果中選擇的。如需詳細資訊,請參閱ResultSelector

提示

如果您在狀態機器中使用的「平行」或「對映」狀態傳回陣列陣列,您可以將它們轉換為具有ResultSelector欄位的平面陣列。如需詳細資訊,請參閱扁平化陣列的陣列

Retry (選用)

物件陣列 (稱為「擷取器」),可定義重試原則。如果狀態遇到執行階段錯誤,執行會使用重試原則。如需詳細資訊,請參閱使用「重試」和使用 Catch 的狀態機示例

注意

如果您定義 [分散式對應] 狀態的 [擷取器],則重試原則會套用至所有已啟動Map狀態的子工作流程執行。例如,假設您的Map狀態開始了三個子工作流程執行,其中一個執行失敗。發生失敗時,執行會使用Retry欄位 (如果已定義) 做為Map狀態。重試原則適用於所有子工作流程執行,而不僅僅是失敗的執行。如果一個或多個子工作流程執行失敗,則 Map Run 會失敗。

當您重試某個Map狀態時,它會建立新的 Map Run。

Catch (選用)

稱為 Catcher 的物件陣列,可定義後援狀態。Step Functions 使用中定義的捕獲器,Catch如果狀態遇到運行時錯誤。發生錯誤時,執行會先使用中Retry定義的任何擷取器。如果重試原則未定義或已用盡,則執行會使用其 Catcher (如果已定義)。如需詳細資訊,請參閱備用狀態

在 Step Functions 中設定分散式對應狀態的失敗臨界值

當您協調大規模的 parallel 工作負載時,您也可以定義容忍的故障閾值。此值可讓您指定失敗項目的最大數目或百分比做為 Map Run 的失敗臨界值。根據您指定的值,如果 Map Run 超過臨界值,則會自動失敗。如果您同時指定這兩個值,則工作流程會在超過任一值時失敗。

指定臨界值可協助您在整個 Map Run 失敗之前失敗特定數目的項目。當「對映執行」因為超過指定的臨界值而失敗時,「Step Functions」會傳回States.ExceedToleratedFailureThreshold錯誤。

注意

即使在超過容許的失敗閾值之後,但在 Map Run 失敗之前,Step Functions 仍可繼續在 Map Run 中執行子工作流程。

若要在 Workflow Studio 中指定臨界值,請在 [執行時期設定] 欄位下的 [其他組態] 中選取 [設定容許失敗臨界值]。

容忍失敗百分比

定義要容忍的失敗項目百分比。如果超過此值,則「地圖執行」會失敗。Step Functions 會計算失敗項目的百分比,因為失敗或逾時項目總數除以項目總數的結果。您必須指定一個介於零和 100 之間的值。預設百分比值為零,也就是說,如果工作流程的任何一個子工作流程執行失敗或逾時,工作流程就會失敗。如果將百分比指定為 100,即使所有子工作流程執行都失敗,工作流程也不會失敗。

或者,您可以將百分比指定為「分散式對映」狀態輸入中現有鍵值對的參考路徑。此路徑必須在執行階段解析為介於 0 到 100 之間的正整數。您可以在ToleratedFailurePercentagePath子欄位中指定參照路徑。

例如,假設有下列輸入:

{ "percentage": 15 }

您可以使用該輸入的參考路徑來指定百分比,如下所示:

{ ... "Map": { "Type": "Map", ... "ToleratedFailurePercentagePath": "$.percentage" ... } }
重要

您可以在「分散式貼圖」狀態定義中指定ToleratedFailurePercentageToleratedFailurePercentagePath,但不能同時指定兩者。

容忍失敗計數

定義要容忍的失敗項目數。如果超過此值,則「地圖執行」會失敗。

或者,您可以將 count 指定為分散式對映狀態輸入中現有鍵值對的參考路徑。此路徑必須在執行階段解析為正整數。您可以在ToleratedFailureCountPath子欄位中指定參照路徑。

例如,假設有下列輸入:

{ "count": 10 }

您可以使用該輸入的參考路徑來指定數字,如下所示:

{ ... "Map": { "Type": "Map", ... "ToleratedFailureCountPath": "$.count" ... } }
重要

您可以在「分散式貼圖」狀態定義中指定ToleratedFailureCountToleratedFailureCountPath,但不能同時指定兩者。

進一步了解分散式地圖

若要繼續學習有關分散式地圖狀態的更多資訊,請參閱下列資源: