

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 在 Step Functions 中使用分布式 Map 复制大规模 CSV 数据
<a name="tutorial-map-distributed"></a>

本教程将帮助您开始在分布式模式下使用 `Map` 状态。设置为**分布式**的 `Map` 状态被称为*分布式 Map 状态*。您可以在工作流中使用*分布式 Map 状态*来迭代大规模 Amazon S3 数据来源。`Map` 状态将每次迭代作为子工作流执行来运行，从而实现高并发数。有关分布式模式的更多信息，请参阅[分布式模式下的 Map 状态](state-map-distributed.md)。

在本教程中，您将使用*分布式地图状态*对 Amazon S3 存储桶中的 CSV 文件进行迭代。然后，您将其内容以及子工作流执行的 ARN 返回到另一个 Amazon S3 存储桶中。首先，在 Workflow Studio 中创建一个工作流原型。接下来，将 [`Map` 状态的处理模式](state-map.md#concepts-map-process-modes)设置为“分布式”，将 CSV 文件指定为数据集，然后将其位置提供给该 `Map` 状态。您还可以为子工作流执行指定工作流类型，*分布式 Map 状态*以**快速**方式启动。

除了这些设置外，您还可以为本教程中使用的示例工作流指定其他配置，例如并发子工作流执行的最大数量和导出 `Map` 结果的位置。

## 先决条件
<a name="use-dist-map-prereqs"></a>
+ 将 CSV 文件上传到 Amazon S3 存储桶。您必须在 CSV 文件中定义标题行。有关对 CSV 文件的大小限制以及如何指定标题行的信息，请参阅 [Amazon S3 存储桶中的 CSV 文件](input-output-itemreader.md#itemsource-example-csv-data)。
+ 创建另一个 Amazon S3 存储桶，并在其中创建的一个文件夹，以便将 `Map` 状态结果导出到该存储桶中。

**账户和区域的相关要求**  
Amazon S3 存储桶必须与状态机位于同一 AWS 账户和 AWS 区域中。  
请注意，尽管您的状态机也许能够访问同一 AWS 区域中不同 AWS 账户内存储桶中的文件，但 Step Functions 仅支持列出与状态机位于同一 AWS 账户**和同一 AWS 区域的 Amazon S3 存储桶中的对象。

## 第 1 步：创建工作流原型
<a name="use-dist-map-create-workflow"></a>

在此步骤中，您将使用 Workflow Studio 为工作流创建原型。Workflow Studio 是一款可视化工作流设计器，可在 Step Functions 控制台中使用。您可以分别从**流**和**操作**选项卡中选择所需的状态和 API 操作。您将使用 Workflow Studio 的拖放特征来创建工作流原型。

1. 打开 [Step Functions 控制台](https://console.aws.amazon.com/states/home)，从菜单中选择**状态机**，然后选择**创建状态机**。

1. 选择**从空白创建**。

1. 为状态机命名，然后选择**继续**，在 Workflow Studio 中编辑状态机。

1. 从**流**选项卡中，将 **Map** 状态拖放到标有**将第一个状态拖至此处**的空白状态处。

1. 在**配置**选项卡下，在**状态名称**中输入 **Process data**。

1. 从**操作**选项卡中，将 **AWS Lambda 调用** API 操作拖放到**处理数据**状态中。

1. 将 **AWS Lambda 调用**状态重命名为 **Process CSV data**。

## 第 2 步：配置 Map 状态的必填字段
<a name="use-dist-map-config-fields"></a>

在此步骤中，您将配置*分布式 Map 状态*的以下必填字段：
+ [ItemReader](input-output-itemreader.md) – 指定 `Map` 状态可读取输入的数据集及其位置。
+ [ItemProcessor](state-map-distributed.md#distitemprocessor) – 指定以下值：
  + `ProcessorConfig` – 将 `Mode` 和 `ExecutionType` 分别设置为 `DISTRIBUTED` 和 `EXPRESS`。这将为*分布式 Map 状态*启动的子工作流执行设置 `Map` 状态的处理模式和工作流类型。
  + `StartAt` – Map 工作流中的第一个状态。
  + `States` – 定义 Map 工作流程，这是在每个子工作流执行中要重复的一组步骤。
+ [ResultWriter](input-output-resultwriter.md) – 指定 Step Functions 写入*分布式 Map 状态*结果的 Amazon S3 位置。
**重要**  
确保您用于导出 Map Run 结果的 Amazon S3 存储桶与状态机位于同一个 AWS 账户和 AWS 区域下。否则，您的状态机执行将因 `States.ResultWriterFailed` 错误而失败。

**要配置必填句字段，请执行以下操作：**

1. 选择 **Process data** 状态，然后在**配置**选项卡中执行以下操作：

   1. 对于**处理模式**，选择**分布式**。

   1. 对于**项目来源**，选择 **Amazon S3**，然后从 **S3 项目来源**下拉列表中选择 **S3 中的 CSV 文件**。

   1. 执行以下操作来指定 CSV 文件的 Amazon S3 位置：

      1. 对于 **S3 对象**，请从下拉列表中选择**输入存储桶和密钥**。

      1. 对于**存储桶**，输入包含 CSV 文件的 Amazon S3 存储桶的名称。例如 **amzn-s3-demo-source-bucket**。

      1. 对于**密钥**，输入保存 CSV 文件的 Amazon S3 对象的名称。您还必须在此字段中指定 CSV 文件的名称。例如 **csvDataset/ratings.csv**。

   1. 对于 CSV 文件，还必须指定列标题的位置。为此，请选择**其他配置**，如果 CSV 文件的第一行是标题，对于 **CSV 标题位置**，请保留**第一行**的默认选择。否则，请选择**给定**以在状态机定义中指定标题。有关更多信息，请参阅 `ReaderConfig`。

   1. 对于**子执行类型**，请选择**快速**。

1. 在**导出位置**中，要将 Map Run 结果导出到特定的 Amazon S3 位置，请选择**将 Map 状态的输出导出到 Amazon S3**。

1. 执行以下操作：

   1. 对于 **S3 存储桶**，请从下拉列表中选择**输入存储桶名称和前缀**。

   1. 对于**存储桶**，输入要将结果导出到的 Amazon S3 存储桶的名称。例如 **mapOutputs**。

   1. 对于**前缀**，输入要将结果保存到的文件夹名称。例如 **resultData**。

## 第 3 步：配置其他选项
<a name="use-dist-map-config-misc-fields"></a>

除了*分布式 Map 状态* 所需的设置外，您还可以指定其他选项。这些选项可以包括子工作流并发执行的最大数量和导出 `Map` 状态结果的位置。

1. 选择 **Process data** 状态。然后，在**项目来源**中，选择**其他配置**。

1. 执行以下操作：

   1. 选择**使用 ItemSelector 修改项目**，为每个子工作流执行指定自定义 JSON 输入。

   1. 输入以下 JSON 输入：

      ```
      {
        "index.$": "$$.Map.Item.Index",
        "value.$": "$$.Map.Item.Value"
      }
      ```

      有关如何创建自定义输入的信息，请参阅 `ItemSelector（Map）`。

1. 在**运行时设置**中，对于**并发限制**，指定*分布式 Map 状态* 可以启动的并发子工作流执行数量。例如，输入 **100**。

1. 在浏览器中打开一个新窗口或选项卡，完成您将在此工作流中使用的 Lambda 函数的配置，如[第 4 步：配置 Lambda 函数](#use-dist-map-config-resource)中所述。

## 第 4 步：配置 Lambda 函数
<a name="use-dist-map-config-resource"></a>

**重要**  
确保您的 Lambda 函数与状态机处于同一 AWS 区域。

1. 打开 [Lambda 控制台](https://console.aws.amazon.com/lambda/home)，然后选择**创建函数**。

1. 在**创建函数**页面上，选择**从头开始创作**。

1. 在**基本信息**部分中，配置您的 Lambda 函数：

   1. 对于**函数名称**，请输入 **distributedMapLambda**。

   1. 对于 **Runtime (运行时)**，选择 **Node.js**。

   1. 保留所有默认选项，然后选择**创建函数**。

   1. 创建 Lambda 函数后，复制页面右上角显示的函数 Amazon 资源名称 (ARN)。您需要在工作流原型中提供此信息。以下是示例 ARN：

      ```
      arn:aws:lambda:us-east-2:123456789012:function:distributedMapLambda
      ```

1. 复制以下 Lambda 函数代码，并将其粘贴到 **distributedMapLambda** 页面的**代码源**部分。

   ```
   exports.handler = async function(event, context) {
       console.log("Received Input:\n", event);
   
       return {
           'statusCode' : 200,
           'inputReceived' : event //returns the input that it received
       }
   };
   ```

1. 选择**部署**。函数部署后，选择**测试**，查看您的 Lambda 函数的输出。

## 第 5 步：更新工作流原型
<a name="use-dist-map-update-workflow"></a>

在 Step Functions 控制台中，您将更新工作流，添加 Lambda 函数的 ARN。

1. 返回到创建工作流原型的选项卡或窗口。

1. 选择**处理 CSV 数据**步骤，然后在**配置**选项卡中执行以下操作：

   1. 对于**集成类型**，请选择**已优化**。

   1. 对于**函数名称**，输入 Lambda 函数名称。从出现的下拉列表中选择函数，或者选择**输入函数名称**并提供 Lambda 函数 ARN。

## 第 6 步：查看自动生成的 Amazon States Language 定义并保存工作流
<a name="use-dist-map-review-asl"></a>

当您将状态从**操作**和**流**选项卡拖放到画布上时，Workflow Studio 会自动实时撰写工作流的 [Amazon States Language](concepts-amazon-states-language.md) 定义。您可以根据需要编辑此定义。

1. （可选）在 [检查器面板](workflow-studio.md#workflow-studio-components-formdefinition) 面板上选择**定义**，然后查看状态机定义。
**提示**  
您也可以在 Workflow Studio 的[代码编辑器](workflow-studio.md#wfs-interface-code-editor)中查看 ASL 的定义。在代码编辑器中，还可以编辑工作流的 ASL 定义。

   以下示例代码显示了为您的工作流自动生成的 Amazon States Language 定义。

   ```
   {
     "Comment": "Using Map state in Distributed mode",
     "StartAt": "Process data",
     "States": {
       "Process data": {
         "Type": "Map",
         "MaxConcurrency": 100,
         "ItemReader": {
           "ReaderConfig": {
             "InputType": "CSV",
             "CSVHeaderLocation": "FIRST_ROW"
           },
           "Resource": "arn:aws:states:::s3:getObject",
           "Parameters": {
             "Bucket": "amzn-s3-demo-source-bucket",
             "Key": "csvDataset/ratings.csv"
           }
         },
         "ItemProcessor": {
           "ProcessorConfig": {
             "Mode": "DISTRIBUTED",
             "ExecutionType": "EXPRESS"
           },
           "StartAt": "Process CSV data",
           "States": {
             "Process CSV data": {
               "Type": "Task",
               "Resource": "arn:aws:states:::lambda:invoke",
               "OutputPath": "$.Payload",
               "Parameters": {
                 "Payload.$": "$",
                 "FunctionName": "arn:aws:lambda:us-east-2:account-id:function:distributedMapLambda"
               },
               "End": true
             }
           }
         },
         "Label": "Processdata",
         "End": true,
         "ResultWriter": {
           "Resource": "arn:aws:states:::s3:putObject",
           "Parameters": {
             "Bucket": "mapOutputs",
             "Prefix": "resultData"
           }
         },
         "ItemSelector": {
           "index.$": "$$.Map.Item.Index",
           "value.$": "$$.Map.Item.Value"
         }
       }
     }
   }
   ```

1. 为状态机指定一个名称。要执行此操作，请选择默认状态机名称 **MyStateMachine** 旁边的编辑图标。然后，找到**状态机配置**，在**状态机名称**框中指定一个名称。

   对于本教程，请输入名称 **DistributedMapDemo**。

1. （可选）在**状态机配置**中，指定其他工作流设置，例如状态机类型及其执行角色。

   在本教程中，请保留**状态机配置**中的所有默认选项。

1. 在**确认角色创建**对话框中，选择**确认**继续。

   您也可以选择**查看角色设置**，返回至**状态机配置**。
**注意**  
如果您删除了 Step Functions 创建的 IAM 角色，Step Functions 在以后无法重新创建该角色。同样，如果您修改了该角色（例如，通过在 IAM 策略中从主体中删除 Step Functions），Step Functions 在以后也无法还原其原始设置。

## 第 7 步：运行状态机
<a name="use-dist-map-sm-run"></a>

*执行*是状态机的一个实例，您可以在其中运行工作流来执行任务。

1. 在 **DistributedMapDemo** 页面上，选择**启动执行**。

1. 在**启动执行**对话框中，执行以下操作：

   1. （可选）输入自定义执行名称，以便覆盖生成的默认执行名称。
**非 ASCII 名称和日志记录**  
Step Functions 对于状态机、执行、活动和标签接受包含非 ASCII 字符的名称。由于此类字符会阻止 Amazon CloudWatch 记录数据，因此我们建议仅使用 ASCII 字符，这样您就可以跟踪 Step Functions 指标。

   1. （可选）在**输入**框中，以 JSON 格式输入输入值以便运行工作流。

   1. 选择**启动执行**。

   1. Step Functions 控制台会将您引导到一个以您的执行 ID 为标题的页面，即*执行详细信息*页面。您可以随着工作流的进展以及在工作流完成后查看执行结果。

      要查看执行结果，请在**图表视图**上选择各个状态，然后在[步骤详细信息](concepts-view-execution-details.md#exec-details-intf-step-details)窗格中选择各个选项卡，分别查看每个状态的详细信息，包括输入、输出和定义。有关可在*执行详细信息*页面上查看的执行信息的详细信息，请参阅[执行详细信息概览](concepts-view-execution-details.md#exec-details-interface-overview)。

   例如，选择 `Map` 状态，然后选择 **Map Run**，打开 *Map Run 详细信息*页面。在此页面上，您可以查看*分布式 Map 状态*的所有执行细节及其启动的子工作流执行。有关该页面的信息，请参阅[查看 Map Run](concepts-examine-map-run.md)。