

AWS Data Pipeline 不再向新客户提供。的现有客户 AWS Data Pipeline 可以继续照常使用该服务。[了解详情](https://aws.amazon.com/blogs/big-data/migrate-workloads-from-aws-data-pipeline/)

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用命令行复制 MySQL 数据
<a name="dp-copymysql-cli"></a>

您可以创建管道，将数据从 MySQL 表复制到 Amazon S3 存储桶中的文件。

**先决条件**

在开始本教程之前，您必须完成以下步骤：

1. 安装和配置命令行界面（CLI）。有关更多信息，请参阅 [正在访问 AWS Data Pipeline](what-is-datapipeline.md#accessing-datapipeline)。

1. 确保 IAM 角色已命名**DataPipelineDefaultRole**且**DataPipelineDefaultResourceRole**存在。 AWS Data Pipeline 控制台会自动为您创建这些角色。如果您至少没有使用过 AWS Data Pipeline 控制台，则必须手动创建这些角色。有关更多信息，请参阅 [适用的 IAM 角色 AWS Data Pipeline](dp-iam-roles.md)。

1. 设置 Amazon S3 存储桶和 Amazon RDS 实例。有关更多信息，请参阅 [开始前的准备工作](dp-copydata-mysql-prereq.md)。

**Topics**
+ [以 JSON 格式定义管道](#dp-copymysql-define-pipeline-cli)
+ [上传并激活管道定义](#dp-copymysql-json-upload-cli)

## 以 JSON 格式定义管道
<a name="dp-copymysql-define-pipeline-cli"></a>

此示例场景显示如何使用 JSON 管道定义和 AWS Data Pipeline CLI 按指定的时间间隔，将数据（行）从 MySQL 数据库中的表复制到 Amazon S3 存储桶中的 CSV（逗号分隔值）文件。

这是完整管道定义 JSON 文件，其每个部分后跟说明。

**注意**  
 我们建议您使用文本编辑器，这可帮助您验证 JSON 格式文件的语法，并使用 .json 文件扩展名来命名文件。

```
{
  "objects": [
    {
      "id": "ScheduleId113",
      "startDateTime": "2013-08-26T00:00:00",
      "name": "My Copy Schedule",
      "type": "Schedule",
      "period": "1 Days"
    },
    {
      "id": "CopyActivityId112",
      "input": {
        "ref": "MySqlDataNodeId115"
      },
      "schedule": {
        "ref": "ScheduleId113"
      },
      "name": "My Copy",
      "runsOn": {
        "ref": "Ec2ResourceId116"
      },
      "onSuccess": {
        "ref": "ActionId1"
      },
      "onFail": {
        "ref": "SnsAlarmId117"
      },
      "output": {
        "ref": "S3DataNodeId114"
      },
      "type": "CopyActivity"
    },
    {
      "id": "S3DataNodeId114",
      "schedule": {
        "ref": "ScheduleId113"
      },
      "filePath": "s3://amzn-s3-demo-bucket/rds-output/output.csv",
      "name": "My S3 Data",
      "type": "S3DataNode"
    },
    {
      "id": "MySqlDataNodeId115",
      "username": "my-username",
      "schedule": {
        "ref": "ScheduleId113"
      },
      "name": "My RDS Data",
      "*password": "my-password",
      "table": "table-name",
      "connectionString": "jdbc:mysql://your-sql-instance-name.id.region-name.rds.amazonaws.com:3306/database-name",
      "selectQuery": "select * from #{table}",
      "type": "SqlDataNode"
    },
    {
      "id": "Ec2ResourceId116",
      "schedule": {
        "ref": "ScheduleId113"
      },
      "name": "My EC2 Resource",
      "role": "DataPipelineDefaultRole",
      "type": "Ec2Resource",
      "resourceRole": "DataPipelineDefaultResourceRole"
    },
    {
      "message": "This is a success message.",
      "id": "ActionId1",
      "subject": "RDS to S3 copy succeeded!",
      "name": "My Success Alarm",
      "role": "DataPipelineDefaultRole",
      "topicArn": "arn:aws:sns:us-east-1:123456789012:example-topic",
      "type": "SnsAlarm"
    },
    {
      "id": "Default",
      "scheduleType": "timeseries",
      "failureAndRerunMode": "CASCADE",
      "name": "Default",
      "role": "DataPipelineDefaultRole",
      "resourceRole": "DataPipelineDefaultResourceRole"
    },
    {
      "message": "There was a problem executing #{node.name} at for period #{node.@scheduledStartTime} to #{node.@scheduledEndTime}",
      "id": "SnsAlarmId117",
      "subject": "RDS to S3 copy failed",
      "name": "My Failure Alarm",
      "role": "DataPipelineDefaultRole",
      "topicArn": "arn:aws:sns:us-east-1:123456789012:example-topic",
      "type": "SnsAlarm"
    }
  ]
}
```

### MySQL 数据节点
<a name="dp-copymysql-rds-node-cli"></a>

输入 MySqlDataNode 管道组件定义了输入数据的位置；在本例中为 Amazon RDS 实例。输入 MySqlDataNode 组件由以下字段定义：

```
{
  "id": "MySqlDataNodeId115",
  "username": "my-username",
  "schedule": {
    "ref": "ScheduleId113"
  },
  "name": "My RDS Data",
  "*password": "my-password",
  "table": "table-name",
  "connectionString": "jdbc:mysql://your-sql-instance-name.id.region-name.rds.amazonaws.com:3306/database-name",
  "selectQuery": "select * from #{table}",
  "type": "SqlDataNode"
},
```

Id  
用户定义名称，这是仅供您参考的标签。

用户名  
数据库账户的用户名，该账户具有足够的权限从数据库表检索数据。*my-username*用您的用户名替换。

Schedule  
对我们在 JSON 文件前面行中创建的计划组件的引用。

Name  
用户定义名称，这是仅供您参考的标签。

\$1密码  
带有星号前缀的数据库帐户的密码，表示 AWS Data Pipeline 必须对密码值进行加密。*my-password*替换为用户的正确密码。密码字段前面带有星号特殊字符。有关更多信息，请参阅 [特殊字符](dp-pipeline-characters.md)。

表  
包含要复制的数据的数据库表的名称。*table-name*替换为数据库表的名称。

connectionString  
用于连接数据库的 CopyActivity 对象的 JDBC 连接字符串。

selectQuery  
有效的 SQL SELECT 查询，用于指定要从数据库表复制的数据。请注意，`#{table}` 是重新使用在 JSON 文件前面行中“table”变量提供的表名的表达式。

Type  
 SqlDataNode 类型，在本示例中为使用 MySQL 的 Amazon RDS 实例。  
该 MySqlDataNode 类型已被弃用。虽然您仍然可以使用 MySqlDataNode，但我们建议使用 SqlDataNode。

### Amazon S3 数据节点
<a name="dp-copymysql-json-s3-node-cli"></a>

接下来，S3Output 管道组件定义输出文件的位置；在这种情况下是 Amazon S3 存储桶位置中的 CSV 文件。输出 S3 DataNode 组件由以下字段定义：

```
{
  "id": "S3DataNodeId114",
  "schedule": {
    "ref": "ScheduleId113"
  },
  "filePath": "s3://amzn-s3-demo-bucket/rds-output/output.csv",
  "name": "My S3 Data",
  "type": "S3DataNode"
},
```

Id  
用户定义 ID，这是仅供您参考的标签。

Schedule  
对我们在 JSON 文件前面行中创建的计划组件的引用。

filePath  
与数据节点关联的数据的路径，在本示例中是一个 CSV 输出文件。

Name  
用户定义名称，这是仅供您参考的标签。

Type  
A DataNode mazon S3 存储桶中的管道对象类型，即与 Amazon S3 存储桶中的数据所在位置相匹配的 S3。

### 资源
<a name="dp-copymysql-json-resource-cli"></a>

这是执行复制操作的计算资源的定义。在此示例中， AWS Data Pipeline 应自动创建一个 EC2 实例来执行复制任务，并在任务完成后终止资源。此处定义的字段控制执行工作的 EC2 实例的创建和功能。 EC2资源由以下字段定义：

```
{
  "id": "Ec2ResourceId116",
  "schedule": {
    "ref": "ScheduleId113"
  },
  "name": "My EC2 Resource",
  "role": "DataPipelineDefaultRole",
  "type": "Ec2Resource",
  "resourceRole": "DataPipelineDefaultResourceRole"
},
```

Id  
用户定义 ID，这是仅供您参考的标签。

Schedule  
根据它来创建此计算资源的计划。

Name  
用户定义名称，这是仅供您参考的标签。

 角色  
访问资源的账户的 IAM 角色，例如访问 Amazon S3 存储桶检索数据。

Type  
用于执行工作的计算资源的类型；在本例中为 EC2 实例。还有其他资源类型可用，例如 EmrCluster 类型。

resourceRole  
创建资源的账户的 IAM 角色，例如代表您创建和配置 EC2实例。角色和 ResourceRole 可以是相同的角色，但单独在安全配置中提供更精细的粒度。

### Activity
<a name="dp-copymysql-json-activity-cli"></a>

JSON 文件的最后一个部分是活动的定义，表示要执行的工作。在本例中，我们使用 CopyActivity 组件将数据从 Amazon S3 存储桶中的文件复制到另一个文件。该 CopyActivity 组件由以下字段定义：

```
{
  "id": "CopyActivityId112",
  "input": {
    "ref": "MySqlDataNodeId115"
  },
  "schedule": {
    "ref": "ScheduleId113"
  },
  "name": "My Copy",
  "runsOn": {
    "ref": "Ec2ResourceId116"
  },
  "onSuccess": {
    "ref": "ActionId1"
  },
  "onFail": {
    "ref": "SnsAlarmId117"
  },
  "output": {
    "ref": "S3DataNodeId114"
  },
  "type": "CopyActivity"
},
```

Id  
用户定义 ID，这是仅供您参考的标签

Input  
待复制 MySQL 数据的位置

Schedule  
运行此活动的计划

Name  
用户定义名称，这是仅供您参考的标签

runsOn  
执行此活动定义的工作的计算资源。在此示例中，我们提供了对先前定义的 EC2 实例的引用。使用该`runsOn`字段 AWS Data Pipeline 可以为您创建 EC2实例。`runsOn` 字段指示资源存在于 AWS 基础设施中，而 workerGroup 值指示您要使用自己的本地资源执行工作。

onSuccess  
活动成功完成时发送的 [SnsAlarm](dp-object-snsalarm.md)

onFail  
活动失败时发送的 [SnsAlarm](dp-object-snsalarm.md)

Output  
CSV 输出文件的 Amazon S3 位置

Type  
要执行的活动类型。

## 上传并激活管道定义
<a name="dp-copymysql-json-upload-cli"></a>

您必须上传您的管道定义并激活您的管道。在以下示例命令中，*pipeline\$1name*替换为管道的标签和*pipeline\$1file*管道定义`.json`文件的完全限定路径。

**AWS CLI**

要创建管道定义并激活管道，请使用以下 [create-pipeline](https://docs.aws.amazon.com/cli/latest/reference/datapipeline/create-pipeline.html) 命令。记下您的管道 ID，因为您将在大多数 CLI 命令中使用这个值。

```
aws datapipeline create-pipeline --name pipeline_name --unique-id token
{
    "pipelineId": "df-00627471SOVYZEXAMPLE"
}
```

要上传您的管道定义，请使用以下[put-pipeline-definition](https://docs.aws.amazon.com/cli/latest/reference/datapipeline/put-pipeline-definition.html)命令。

```
aws datapipeline put-pipeline-definition --pipeline-id df-00627471SOVYZEXAMPLE --pipeline-definition file://MyEmrPipelineDefinition.json
```

如果您的管道成功验证，则 `validationErrors` 字段为空。您应该查看所有警告。

要激活管道，请使用以下 [activate-pipeline](https://docs.aws.amazon.com/cli/latest/reference/datapipeline/activate-pipeline.html) 命令。

```
aws datapipeline activate-pipeline --pipeline-id df-00627471SOVYZEXAMPLE
```

您可以使用以下 [list-pipelines](https://docs.aws.amazon.com/cli/latest/reference/datapipeline/list-pipelines.html) 命令来验证您的管道是否出现在管道列表中。

```
aws datapipeline list-pipelines
```