本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
在本教學課程中,您可以使用「分散式地圖」狀態的ItemBatcher (地圖)欄位,使用 Lambda 函數對批次中存在的個別項目進行迭代。「分散式對應」狀態會啟動四個子工作流程執行。這些子工作流程中的每一個都會執行內嵌對映狀態。對於其每次迭代,內聯映射狀態調用 Lambda 函數,並將單個項目從批處理傳遞給函數。Lambda 函數接著會處理項目並傳回結果。
您將創建一個狀態機,該狀態機器對整數數組執行乘法。假設您提供的整數數組作為輸入是[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
,乘法因子為7
。然後,將這些整數乘以 [7, 14, 21, 28, 35, 42, 49, 56, 63, 70]
7 的因子之後形成的結果數組。
步驟 1:建立狀態機
在此步驟中,您會建立狀態機器的工作流程原型,將單一項目從批次項目傳遞至您將在步驟 2 中建立的 Lambda 函數的每次叫用。
-
使用下列定義可使用 Step Functions 主控台
建立狀態機器。如需有關建立狀態機器的資訊,請參閱〈使用分散式地圖狀態入門〉自學課程步驟 1:建立工作流程原型中的〈〉。 在此狀態機器中,您可以定義接受 10 個整數的陣列作為輸入的分散式地圖狀態,並將這些陣列項目批次傳遞給子工作流程執行。每個子工作流程執行都會接收三個項目的批次作為輸入,並執行內嵌對應狀態。內聯地圖狀態的每一次迭代調用 Lambda 函數,並將項目從批處理傳遞給函數。然後,此函數會將項目乘以係數,
7
並傳回結果。每個子工作流程執行的輸出都是一個JSON陣列,其中包含每個傳遞項目的乘法結果。
重要
請務必將下列程式碼中的 Lambda 函數的 Amazon 資源名稱 (ARN) 取ARN代為您將在步驟 2 中建立的函數。
{ "StartAt": "Pass", "States": { "Pass": { "Type": "Pass", "Next": "Map", "Result": { "MyMultiplicationFactor": 7, "MyItems": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] } }, "Map": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "STANDARD" }, "StartAt": "InnerMap", "States": { "InnerMap": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "INLINE" }, "StartAt": "Lambda Invoke", "States": { "Lambda Invoke": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:
functionName
" }, "Retry": [ { "ErrorEquals": [ "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException", "Lambda.TooManyRequestsException" ], "IntervalSeconds": 2, "MaxAttempts": 6, "BackoffRate": 2 } ], "End": true } } }, "End": true, "ItemsPath": "$.Items", "ItemSelector": { "MyMultiplicationFactor.$": "$.BatchInput.MyMultiplicationFactor", "MyItem.$": "$$.Map.Item.Value" } } } }, "End": true, "Label": "Map", "MaxConcurrency": 1000, "ItemsPath": "$.MyItems", "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } } } } }
步驟 2:建立 Lambda 函數
在此步驟中,您會建立 Lambda 函數來處理從批次傳遞的每個項目。
重要
確保您的 Lambda 函數處於相同 AWS 區域 作為你的狀態機。
建立 Lambda 函數
-
使用 L ambda 主控台
建立名為的 Python Lambda 函數 ProcessSingleItem
。如需建立 Lambda 函數的相關資訊,請參閱〈使用分散式地圖狀態入門〉教學課程中的步驟 4:設定 Lambda 函數。 -
複製 Lambda 函數的下列程式碼,並將其貼到 Lambda 函數的程式碼原始碼區段中。
import json def lambda_handler(event, context): multiplication_factor = event['MyMultiplicationFactor'] item = event['MyItem'] result = multiplication_factor * item return { 'statusCode': 200, 'multiplied': result }
-
建立 Lambda 函數後,複製頁面右上角ARN顯示的函數。下面是一個例子ARN,其中
是 Lambda 函數的名稱 (在本例中為功能名
ProcessSingleItem
):arn:aws:lambda:us-east-1:123456789012:function:
function-name
您需要在步驟 1 ARN 中建立的狀態機中提供函數。
-
選擇部署以部署變更。
步驟 3:運行狀態機
當您執行狀態機器時,分散式地圖狀態會啟動四個子工作流程執行,其中每個執行會處理三個項目,而一個執行則處理單一項目。
下列範例顯示傳遞至子工作流程執行內其中一個ProcessSingleItem函數叫用的資料。
{
"MyMultiplicationFactor": 7,
"MyItem": 1
}
有了這個輸入,下列範例會顯示 Lambda 函數傳回的輸出。
{
"statusCode": 200,
"multiplied": 7
}
下列範例顯示其中一個子工作流程執行的輸出JSON陣列。
[
{
"statusCode": 200,
"multiplied": 7
},
{
"statusCode": 200,
"multiplied": 14
},
{
"statusCode": 200,
"multiplied": 21
}
]
狀態機會傳回下列輸出,其中包含四個子工作流程執行的四個陣列。這些陣列包含個別輸入項目的乘法結果。
最後,狀態機輸出是一個名為的陣列,結合multiplied
了針對四個子工作流程執行傳回的所有乘法結果。
[
[
{
"statusCode": 200,
"multiplied": 7
},
{
"statusCode": 200,
"multiplied": 14
},
{
"statusCode": 200,
"multiplied": 21
}
],
[
{
"statusCode": 200,
"multiplied": 28
},
{
"statusCode": 200,
"multiplied": 35
},
{
"statusCode": 200,
"multiplied": 42
}
],
[
{
"statusCode": 200,
"multiplied": 49
},
{
"statusCode": 200,
"multiplied": 56
},
{
"statusCode": 200,
"multiplied": 63
}
],
[
{
"statusCode": 200,
"multiplied": 70
}
]
]
若要將子工作流程執行傳回的所有乘法結果合併為單一輸出陣列,您可以使用ResultSelector欄位。在「分散式地圖」狀態中定義此欄位以尋找所有結果、擷取個別結果,然後將它們合併為一個名為的單一輸出陣列multiplied
。
若要使用ResultSelector
欄位,請更新狀態機定義,如下列範例所示。
{
"StartAt": "Pass",
"States": {
...
...
"Map": {
"Type": "Map",
...
...
"ItemBatcher": {
"MaxItemsPerBatch": 3,
"BatchInput": {
"MyMultiplicationFactor.$": "$.MyMultiplicationFactor"
}
},
"ItemsPath": "$.MyItems",
"ResultSelector": {
"multiplied.$": "$..multiplied"
}
}
}
}
更新後的狀態機會傳回合併的輸出陣列,如下列範例所示。
{
"multiplied": [7, 14, 21, 28, 35, 42, 49, 56, 63, 70]
}