在 Step Functions 工作流程中以内联模式使用地图状态 - AWS Step Functions

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Step Functions 工作流程中以内联模式使用地图状态

默认情况下,Map 状态以内联模式运行。在内联模式下,Map 状态仅接受JSON数组作为输入。它接收来自工作流中上一步的数组。在此模式下,Map 状态的每次迭代都在包含 Map 状态的工作流的上下文中运行。Step Functions 会将这些迭代的执行历史记录添加到父工作流的执行历史记录中。

在此模式下,Map 状态最多支持 40 次并发迭代。

设置为内联Map 状态称为内联 Map 状态。如果工作流的执行历史记录不超过 2.5 万个条目,或者需要的并发迭代次数不超过 40 次,则可以使用内联模式的 Map 状态。

有关使用内联 Map 状态的介绍,请参阅教程使用内联地图重复操作

本主题中的关键概念

内联模式

Map 状态的有限并发模式。在此模式下,Map 状态的每次迭代都在包含 Map 状态的工作流的上下文中运行。Step Functions 会将这些迭代的执行历史记录添加到父工作流的执行历史记录中。默认情况下,Map 状态在内联模式下运行。

此模式仅接受JSON数组作为输入,最多支持 40 次并发迭代。

内联 Map 状态

设置为内联模式的 Map 状态。

Map 工作流

Map 状态为每次迭代运行的一组步骤。

Map 状态迭代

Map 状态内部定义的重复工作流。

内联 Map 状态字段

要在工作流中使用内联 Map 状态,请指定以下字段中的一个或多个。除了公共状态字段外,您还可以指定以下字段。

Type(必填)

设置状态的类型,例如 Map

ItemProcessor
(必填)

包含以下指定Map状态处理模式和定义的JSON对象。

该定义包含处理每个数组项目时要重复的一组步骤。

  • ProcessorConfig— 一个可选JSON对象,用于指定Map状态的处理模式。此对象包含 Mode 子字段。此字段默认为 INLINE,即在内联模式下使用 Map 状态。

    在此模式下,任何迭代失败都会导致 Map 状态失败。当 Map 状态失败时,所有迭代都会停止。

  • StartAt– 指定表示工作流中第一个状态的字符串。该字符串区分大小写,必须与某个状态对象的名称相匹配。此状态首先针对数据集中的每个项目运行。您向 Map 状态提供的任何执行输入都将首先传递给 StartAt 状态。

  • States— 包含一组以逗号分隔的状态的JSON对象。在此对象中,您可以定义 Map workflow

    注意
    • ItemProcessor 字段中的状态只能相互转换。ItemProcessor 字段外的任何状态都不能转换到字段内的状态。

    • ItemProcessor 字段取代了现已弃用的 Iterator 字段。尽管您可以继续包含使用 Iterator 字段的 Map 状态,但我们强烈建议您将此字段替换为 ItemProcessor

      Step Functions Local 目前不支持 ItemProcessor 字段。我们建议对 Step Functions Local 使用 Iterator 字段。

ItemsPath(可选)

使用JsonPath语法指定参考路径。此路径选择包含状态输入内项目数组的JSON节点。有关更多信息,请参阅 ItemsPath (地图)

ItemSelector
(可选)

在输入数组项的值传递到每次 Map 状态迭代之前,覆盖这些值。

在此字段中,您可以指定一个JSON包含键值对集合的有效值。这些对可包含以下任何内容:

  • 在状态机定义中定义的静态值。

  • 使用路径从状态输入中选择的值。

  • 上下文对象中访问的值。

有关更多信息,请参阅 ItemSelector (地图)

ItemSelector 字段取代了现已弃用的 Parameters 字段。尽管您可以继续包含使用 Parameters 字段的 Map 状态,但我们强烈建议您将此字段替换为 ItemSelector

MaxConcurrency(可选)

指定一个整数值,该值提供可以并行运行的 Map 状态迭代次数的上限。例如,MaxConcurrency 值为 10 将限制您的 Map 状态同时运行 10 次并发迭代。

注意

并发迭代可能会受到限制。发生这种情况时,有些迭代要等到之前的迭代完成后才会开始。当输入数组中的项目超过 40 个时,发生这种情况的可能性就会增加。

要实现更高的并发数,请考虑分布式模式

默认值为0,这不限制并发性。Step Functions 尽可能同时调用迭代。

MaxConcurrency 值为 1 会对每个数组元素调用一次 ItemProcessor。数组中的项目按其在输入中的出现顺序进行处理。Step Functions 在完成前一次迭代之后才会开始新的迭代。

MaxConcurrencyPath(可选)

如果要使用参考路径从状态输入中动态提供最大并发数值,请使用 MaxConcurrencyPath。解决后,参考路径必须选择一个值为非负整数的字段。

注意

一个 Map 状态不能同时包含 MaxConcurrencyMaxConcurrencyPath

ResultPath(可选)

指定输入中存储 Map 状态迭代输出的位置。然后,Map 状态按照 OutputPath 字段(如果已指定)的指定筛选输入。然后,它使用筛选后的输入作为状态的输出。有关更多信息,请参阅输入和输出处理

ResultSelector(可选)

传递键值对集合,其中,键值为静态值或从结果中选择。有关更多信息,请参阅 ResultSelector

提示

如果您在状态机中使用的 Parallel 或 Map 状态返回由数组组成的数组,您可以使用 ResultSelector 字段将他们转换为一个平面数组。有关更多信息,请参阅 展平由数组组成的数组

Retry(可选)

一个称为重试器的对象数组,用于定义重试策略。状态在遇到运行时错误时会使用重试策略。有关更多信息,请参阅 使用 Retry 和使用 Catch 的状态机示例

注意

如果您为内联 Map 状态定义了重试器,则重试策略将应用于所有 Map 状态迭代,而不仅仅是失败的迭代。例如,Map 状态包含两次成功的迭代和一次失败的迭代。如果您为Map状态定义了Retry字段,则重试策略将应用于所有三个Map状态迭代,而不仅仅是失败的迭代。

Catch(可选)

一个称为捕获器的对象数组,用于定义回退状态。如果状态遇到了运行时错误,且没有应用重试策略,或者重试策略已用尽,则会运行捕获器。有关更多信息,请参阅回退状态

已弃用的字段

注意

尽管您可以继续包含使用以下字段的 Map 状态,但我们强烈建议您将 Iterator 替换为 ItemProcessor,将 Parameters 替换为 ItemSelector

Iterator

指定一个JSON对象,该对象定义了一组处理数组中每个元素的步骤。

Parameters

指定键值对集合,其中,值可包含以下任何内容:

  • 在状态机定义中定义的静态值。

  • 使用路径从输入中选择的值。

内联 Map 状态示例

考虑以下以内联模式运行的 Map 状态的输入数据。

{ "ship-date": "2016-03-14T01:59:00Z", "detail": { "delivery-partner": "UQS", "shipped": [ { "prod": "R31", "dest-code": 9511, "quantity": 1344 }, { "prod": "S39", "dest-code": 9511, "quantity": 40 }, { "prod": "R31", "dest-code": 9833, "quantity": 12 }, { "prod": "R40", "dest-code": 9860, "quantity": 887 }, { "prod": "R40", "dest-code": 9511, "quantity": 1220 } ] } }

给定之前的输入,以下示例中的Map状态会调用 AWS Lambda 该函数为shipped字段中数组的每个项目命名ship-val一次。

"Validate All": { "Type": "Map", "InputPath": "$.detail", "ItemProcessor": { "ProcessorConfig": { "Mode": "INLINE" }, "StartAt": "Validate", "States": { "Validate": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "arn:aws:lambda:us-east-2:123456789012:function:ship-val:$LATEST" }, "End": true } } }, "End": true, "ResultPath": "$.detail.shipped", "ItemsPath": "$.shipped" }

Map 状态的每次迭代都会将数组中的一个项目(通过 ItemsPath 字段选择)作为输入发送到 ship-val Lambda 函数。以下值是 Map 状态发送给调用 Lambda 函数的输入示例:

{ "prod": "R31", "dest-code": 9511, "quantity": 1344 }

完成后,Map状态的输出是一个JSON数组,其中每个项目都是迭代的输出。在本例中,此数组包含 ship-val Lambda 函数的输出。

使用 ItemSelector 的内联 Map 状态示例

假设上一个示例中的 ship-val Lambda 函数还需要有关货件快递员的信息。该信息是对每次迭代的数组中项目的补充。您可以包括来自输入的信息,以及特定于 Map 状态的当前迭代的信息。请注意下面示例中的 ItemSelector 字段:

"Validate-All": { "Type": "Map", "InputPath": "$.detail", "ItemsPath": "$.shipped", "MaxConcurrency": 0, "ResultPath": "$.detail.shipped", "ItemSelector": { "parcel.$": "$$.Map.Item.Value", "courier.$": "$.delivery-partner" }, "ItemProcessor": { "StartAt": "Validate", "States": { "Validate": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:123456789012:function:ship-val", "End": true } } }, "End": true }

ItemSelector模块用JSON节点替换迭代的输入。此节点既包含来自上下文对象的当前项目数据,也包含来自 Map 状态输入 delivery-partner 字段的快递员信息。以下是一个单次迭代输入示例。Map 状态会将此输入传递给 ship-val Lambda 函数的调用。

{ "parcel": { "prod": "R31", "dest-code": 9511, "quantity": 1344 }, "courier": "UQS" }

在前面的内联 Map 状态示例中,ResultPath 字段以与输入相同的格式生成输出。但是,它会用一个数组覆盖 detail.shipped 字段,数组中每个元素都是每次迭代的 ship-val Lambda 调用的输出。

有关使用内联 Map 状态及其字段的更多信息,请参阅以下内容。

内联 Map 状态输入和输出处理

对于给定的 Map 状态,InputPath 选择该状态输入的子集。

Map状态的输入必须包含JSON数组。Map 状态为数组中的每个项目运行一次 ItemProcessor 部分。如果指定了 ItemsPath 字段,则 Map 状态会选择在输入中的哪个位置查找要迭代的数组。如果未指定,ItemsPath 的值为 $,而 ItemProcessor 部分预期此数组是唯一的输入。如果指定了 ItemsPath 字段,则其值必须为参考路径Map 状态在应用 InputPath 之后,会将此路径应用于有效输入。ItemsPath必须标识值为JSON数组的字段。

默认情况下,每次迭代的输入都是由 ItemsPath 值标识的数组字段的单个元素。您可以使用 ItemSelector (地图) 字段覆盖此值。

完成后,Map状态的输出是一个JSON数组,其中每个项目都是迭代的输出。

有关内联 Map 状态输入和输出的更多信息,请参阅以下内容: