本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
在 Step Functions 中使用分布式地图复制大规模CSV数据
本教程将帮助您开始在分布式模式下使用 Map
状态。设置为分布式的 Map
状态被称为分布式 Map 状态。您可以在工作流中使用分布式 Map 状态来迭代大规模 Amazon S3 数据来源。Map
状态将每次迭代作为子工作流执行来运行,从而实现高并发数。有关分布式模式的更多信息,请参阅分布式模式下的 Map 状态。
在本教程中,您将使用分布式地图状态来迭代 Amazon S3 存储桶中的CSV文件。然后,您可以将其内容以及子工作流程执行ARN的内容返回到另一个 Amazon S3 存储桶中。首先,在 Workflow Studio 中创建一个工作流原型。接下来,将Map状态的处理模式设置为 “分布式”,将CSV文件指定为数据集,然后将其位置提供给该Map
状态。您还可以为子工作流执行指定工作流类型,分布式 Map 状态以快速方式启动。
除了这些设置外,您还可以为本教程中使用的示例工作流指定其他配置,例如并发子工作流执行的最大数量和导出 Map
结果的位置。
先决条件
将CSV文件上传到 Amazon S3 存储桶。您必须在CSV文件中定义标题行。有关对CSV文件施加的大小限制以及如何指定标题行的信息,请参见CSV在 Amazon S3 存储桶中存入文件。
创建另一个 Amazon S3 存储桶,并在其中创建的一个文件夹,以便将
Map
状态结果导出到该存储桶中。
重要
确保您的 Amazon S3 存储桶处于相同位置 AWS 账户 以及 AWS 区域 作为你的状态机。
第 1 步:创建工作流原型
在此步骤中,您将使用 Workflow Studio 为工作流创建原型。Workflow Studio 是一款可视化工作流设计器,可在 Step Functions 控制台中使用。您可以分别从 “流程” 和 “操作API” 选项卡中选择所需的状态和操作。您将使用 Workflow Studio 的拖放特征来创建工作流原型。
打开 Step Functions 控制台
,然后选择创建状态机。 在 选择模板对话框中,选择空白。
-
选择 “选择” 以在中打开 “工作流工作室” 设计模式。
从流选项卡中,将 Map 状态拖放到标有将第一个状态拖至此处的空白状态处。
在配置选项卡下,在状态名称中输入
Process data
。从 “操作” 选项卡中,拖动 AWS Lambda 调用API操作并将其放入流程数据状态中。
-
重命名 AWS Lambda 调用状态
Process CSV data
。
第 2 步:配置 Map 状态的必填字段
在此步骤中,您将配置分布式 Map 状态的以下必填字段:
ItemReader— 指定数据集及其位置,该
Map
州可以从中读取输入。ItemProcessor – 指定以下值:
ProcessorConfig
– 将Mode
和ExecutionType
分别设置为DISTRIBUTED
和EXPRESS
。这将为分布式 Map 状态启动的子工作流执行设置Map
状态的处理模式和工作流类型。StartAt
– Map 工作流中的第一个状态。States
– 定义 Map 工作流程,这是在每个子工作流执行中要重复的一组步骤。
ResultWriter— 指定 Step Functions 写入分布式地图状态结果的 Amazon S3 位置。
重要
确保用于导出 Map Run 结果的 Amazon S3 存储桶处于相同位置 AWS 账户 以及 AWS 区域 作为你的状态机。否则,您的状态机执行将因
States.ResultWriterFailed
错误而失败。
要配置必填句字段,请执行以下操作:
选择 Process data 状态,然后在配置选项卡中执行以下操作:
对于处理模式,选择分布式。
对于项目来源,选择 Amazon S3,然后从 S3 项目来源下拉列表中选择 S3 中的CSV文件。
执行以下操作来指定CSV文件的 Amazon S3 位置:
对于 S3 对象,请从下拉列表中选择输入存储桶和密钥。
对于存储桶,输入包含CSV文件的 Amazon S3 存储桶的名称。例如,
amzn-s3-demo-source-bucket
。对于密钥,输入您保存CSV文件的 Amazon S3 对象的名称。您还必须在此字段中指定CSV文件名。例如,
csvDataset/ratings.csv
。
对于CSV文件,还必须指定列标题的位置。为此,请选择 “其他配置”,如果CSV文件的第一行是标题,则对于CSV标题位置,请保留默认的 “第一行” 选择。否则,请选择给定以在状态机定义中指定标题。有关更多信息,请参阅
ReaderConfig
。对于子执行类型,请选择快速。
在导出位置中,要将 Map Run 结果导出到特定的 Amazon S3 位置,请选择将 Map 状态的输出导出到 Amazon S3。
执行以下操作:
对于 S3 存储桶,请从下拉列表中选择输入存储桶名称和前缀。
对于存储桶,输入要将结果导出到的 Amazon S3 存储桶的名称。例如,
mapOutputs
。对于前缀,输入要将结果保存到的文件夹名称。例如,
resultData
。
第 3 步:配置其他选项
除了分布式 Map 状态 所需的设置外,您还可以指定其他选项。这些选项可以包括子工作流并发执行的最大数量和导出 Map
状态结果的位置。
选择 Process data 状态。然后,在项目来源中,选择其他配置。
执行以下操作:
选择 “修改项目” ItemSelector,为每个子工作流程执行指定自定义JSON输入。
输入以下JSON输入:
{
"index.$":
"$$.Map.Item.Index"
,"value.$":
"$$.Map.Item.Value"
}有关如何创建自定义输入的信息,请参阅
ItemSelector (地图)
。
在运行时设置中,对于并发限制,指定分布式 Map 状态 可以启动的并发子工作流执行数量。例如,输入
100
。-
在浏览器中打开一个新窗口或选项卡,完成您将在此工作流中使用的 Lambda 函数的配置,如第 4 步:配置 Lambda 函数中所述。
第 4 步:配置 Lambda 函数
重要
确保您的 Lambda 函数处于相同状态 AWS 区域 作为你的状态机。
-
打开 Lambda 控制台
,然后选择创建函数。 -
在创建函数页面上,选择从头开始创作。
-
在基本信息部分中,配置您的 Lambda 函数:
-
对于函数名称,请输入
distributedMapLambda
。 -
对于 Runtime (运行时),选择 Node.js。
-
保留所有默认选项,然后选择创建函数。
-
创建 Lambda 函数后,复制页面右上角显示的函数的 Amazon 资源名称 (ARN)。您需要在工作流原型中提供此信息。以下是一个示例ARN:
arn:aws:lambda:us-east-2:123456789012:function:distributedMapLambda
-
-
复制以下 Lambda 函数的代码,然后将其粘贴到页面的distributedMapLambda代码源部分。
exports.handler = async function(event, context) { console.log("Received Input:\n", event); return { 'statusCode' : 200, 'inputReceived' : event //returns the input that it received } };
-
选择部署。函数部署后,选择测试,查看您的 Lambda 函数的输出。
第 5 步:更新工作流原型
在 Step Functions 控制台中,您将更新工作流程以添加 Lambda 函数。ARN
返回到创建工作流原型的选项卡或窗口。
选择 “处理CSV数据” 步骤,然后在 “配置” 选项卡中执行以下操作:
对于集成类型,请选择已优化。
对于函数名称,输入 Lambda 函数名称。从出现的下拉列表中选择函数,或者选择输入函数名称并提供 Lambda 函数。ARN
第 6 步:查看自动生成的 Amazon States Language 定义并保存工作流
当您将状态从操作和流选项卡拖放到画布上时,Workflow Studio 会自动实时撰写工作流的 Amazon States Language 定义。您可以根据需要编辑此定义。
-
(可选)在 “检查器” 面板 面板上选择定义,然后查看状态机定义。
提示
您也可以在 Workflow Studio 中查看ASL定义。代码编辑器在代码编辑器中,您还可以编辑工作流程的ASL定义。
以下示例代码显示了为您的工作流自动生成的 Amazon States Language 定义。
{ "Comment": "Using Map state in Distributed mode", "StartAt": "Process data", "States": { "Process data": { "Type": "Map", "MaxConcurrency": 100, "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-source-bucket", "Key": "csvDataset/ratings.csv" } }, "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" }, "StartAt": "Process CSV data", "States": { "Process CSV data": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-2:123456789012:function:distributedMapLambda" }, "End": true } } }, "Label": "Processdata", "End": true, "ResultWriter": { "Resource": "arn:aws:states:::s3:putObject", "Parameters": { "Bucket": "mapOutputs", "Prefix": "resultData" } }, "ItemSelector": { "index.$": "$$.Map.Item.Index", "value.$": "$$.Map.Item.Value" } } } }
-
为状态机指定一个名称。为此,请选择默认状态机名称旁边的编辑图标MyStateMachine。然后,找到状态机配置,在状态机名称框中指定一个名称。
对于本教程,请输入名称
DistributedMapDemo
。 -
(可选)在状态机配置中,指定其他工作流设置,例如状态机类型及其执行角色。
在本教程中,请保留状态机配置中的所有默认选项。
-
在确认角色创建对话框中,选择确认继续。
您也可以选择查看角色设置,返回至状态机配置。
注意
如果你删除 Step Functions 创建的IAM角色,Step Functions 以后将无法重新创建它。同样,如果您修改角色(例如,通过从IAM策略的主体中删除 Step Functions),Step Functions 以后将无法恢复其原始设置。
第 7 步:运行状态机
执行是状态机的一个实例,您可以在其中运行工作流来执行任务。
-
在DistributedMapDemo页面上,选择开始执行。
-
在启动执行对话框中,执行以下操作:
-
(可选)输入自定义执行名称以覆盖生成的默认值。
非ASCII姓名和日志
Step Functions 接受状态机、执行、活动和包含非ASCII字符的标签的名称。由于此类字符不适用于亚马逊 CloudWatch,因此我们建议您仅使用ASCII字符,以便您可以跟踪中的指标 CloudWatch。
-
(可选)在 “输入” 框中,按JSON格式输入输入值以运行您的工作流程。
-
选择启动执行。
-
Step Functions 控制台会将您引导到一个以您的执行 ID 为标题的页面。该页面被称为执行详细信息页面。在此页面上,您可以随着执行的进展或者在执行完成后查看执行结果。
要查看执行结果,请在图表视图上选择各个状态,然后在步骤详细信息窗格中选择各个选项卡,分别查看每个状态的详细信息,包括输入、输出和定义。有关可在执行详细信息页面上查看的执行信息的详细信息,请参阅执行详情概述。
例如,选择
Map
状态,然后选择 Map Run,打开 Map Run 详细信息页面。在此页面上,您可以查看分布式 Map 状态的所有执行细节及其启动的子工作流执行。有关该页面的信息,请参阅查看地图运行情况。 -