

AWS Data Pipeline 不再向新客户提供。的现有客户 AWS Data Pipeline 可以继续照常使用该服务。[了解详情](https://aws.amazon.com/blogs/big-data/migrate-workloads-from-aws-data-pipeline/)

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用命令行复制数据到 Amazon Redshift
<a name="dp-copydata-redshift-cli"></a>

本教程演示如何将数据从 Amazon S3 复制到 Amazon Redshift。您将在 Amazon Redshift 中创建一个新表，然后使用 AWS Data Pipeline 公共的 Amazon S3 存储桶向该表传输数据，该存储桶包含 CSV 格式的示例输入数据。日志保存到您拥有的 Amazon S3 存储桶。

Amazon S3 是一项 Web 服务，可让您在云中存储数据。有关更多信息，请参阅 [Amazon Simple Storage Service 用户指南](https://docs.aws.amazon.com/AmazonS3/latest/userguide/)。Amazon Redshift 是云中的数据仓库服务。有关更多信息，请参阅 [Amazon Redshift 管理指南](https://docs.aws.amazon.com/redshift/latest/mgmt/)。

**先决条件**

在开始本教程之前，您必须完成以下步骤：

1. 安装和配置命令行界面（CLI）。有关更多信息，请参阅 [正在访问 AWS Data Pipeline](what-is-datapipeline.md#accessing-datapipeline)。

1. 确保 IAM 角色已命名**DataPipelineDefaultRole**且**DataPipelineDefaultResourceRole**存在。 AWS Data Pipeline 控制台会自动为您创建这些角色。如果您至少没有使用过 AWS Data Pipeline 控制台，则必须手动创建这些角色。有关更多信息，请参阅 [适用的 IAM 角色 AWS Data Pipeline](dp-iam-roles.md)。

1. 在 Amazon Redshift 中设置 `COPY` 命令，因为当您在 AWS Data Pipeline中执行复制操作时，需要让这些相同的选项生效。有关信息，请参阅[开始之前：配置 COPY 选项并加载数据](dp-learn-copy-redshift.md)。

1. 设置 Amazon Redshift 数据库。有关更多信息，请参阅 [设置管道，创建安全组，并创建 Amazon Redshift 集群](dp-copydata-redshift-prereq.md)。

**Topics**
+ [以 JSON 格式定义管道](dp-copydata-redshift-define-pipeline-cli.md)
+ [上传并激活管道定义](dp-copydata-redshift-upload-cli.md)

# 以 JSON 格式定义管道
<a name="dp-copydata-redshift-define-pipeline-cli"></a>

此示例情景说明如何将数据从 Amazon S3 存储桶复制到 Amazon Redshift。

这是完整管道定义 JSON 文件，其每个部分后跟说明。我们建议您使用文本编辑器，这可帮助您验证 JSON 格式文件的语法，并使用 `.json` 文件扩展名来命名文件。

```
{
  "objects": [
    {
      "id": "CSVId1",
      "name": "DefaultCSV1",
      "type": "CSV"
    },
    {
      "id": "RedshiftDatabaseId1",
      "databaseName": "dbname",
      "username": "user",
      "name": "DefaultRedshiftDatabase1",
      "*password": "password",
      "type": "RedshiftDatabase",
      "clusterId": "redshiftclusterId"
    },
    {
      "id": "Default",
      "scheduleType": "timeseries",
      "failureAndRerunMode": "CASCADE",
      "name": "Default",
      "role": "DataPipelineDefaultRole",
      "resourceRole": "DataPipelineDefaultResourceRole"
    },
    {
      "id": "RedshiftDataNodeId1",
      "schedule": {
        "ref": "ScheduleId1"
      },
      "tableName": "orders",
      "name": "DefaultRedshiftDataNode1",
      "createTableSql": "create table StructuredLogs (requestBeginTime CHAR(30) PRIMARY KEY DISTKEY SORTKEY, requestEndTime CHAR(30), hostname CHAR(100), requestDate varchar(20));",
      "type": "RedshiftDataNode",
      "database": {
        "ref": "RedshiftDatabaseId1"
      }
    },
    {
      "id": "Ec2ResourceId1",
      "schedule": {
        "ref": "ScheduleId1"
      },
      "securityGroups": "MySecurityGroup",
      "name": "DefaultEc2Resource1",
      "role": "DataPipelineDefaultRole",
      "logUri": "s3://myLogs",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "type": "Ec2Resource"
    },
    {
      "id": "ScheduleId1",
      "startDateTime": "yyyy-mm-ddT00:00:00",
      "name": "DefaultSchedule1",
      "type": "Schedule",
      "period": "period",
      "endDateTime": "yyyy-mm-ddT00:00:00"
    },
    {
      "id": "S3DataNodeId1",
      "schedule": {
        "ref": "ScheduleId1"
      },
      "filePath": "s3://datapipeline-us-east-1/samples/hive-ads-samples.csv",
      "name": "DefaultS3DataNode1",
      "dataFormat": {
        "ref": "CSVId1"
      },
      "type": "S3DataNode"
    },
    {
      "id": "RedshiftCopyActivityId1",
      "input": {
        "ref": "S3DataNodeId1"
      },
      "schedule": {
        "ref": "ScheduleId1"
      },
      "insertMode": "KEEP_EXISTING",
      "name": "DefaultRedshiftCopyActivity1",
      "runsOn": {
        "ref": "Ec2ResourceId1"
      },
      "type": "RedshiftCopyActivity",
      "output": {
        "ref": "RedshiftDataNodeId1"
      }
    }
  ]
}
```

有关这些对象的更多信息，请参阅以下文档。

**Topics**
+ [数据节点](dp-copydata-redshift-node-cli.md)
+ [资源](dp-copydata-redshift-resource-cli.md)
+ [Activity](dp-copydata-redshift-activity-cli.md)

# 数据节点
<a name="dp-copydata-redshift-node-cli"></a>

此示例使用输入数据节点、输出数据节点和数据库。

**输入数据节点**  
输入 `S3DataNode` 管道组件定义了 Amazon S3 中的输入数据位置和输入数据的数据格式。有关更多信息，请参阅 [S3 DataNode](dp-object-s3datanode.md)。

此输入组件由以下字段定义：

```
{
  "id": "S3DataNodeId1",
  "schedule": {
    "ref": "ScheduleId1"
  },
  "filePath": "s3://datapipeline-us-east-1/samples/hive-ads-samples.csv",
  "name": "DefaultS3DataNode1",
  "dataFormat": {
    "ref": "CSVId1"
  },
  "type": "S3DataNode"
},
```

`id`  
用户定义 ID，这是仅供您参考的标签。

`schedule`  
对计划组件的引用。

`filePath`  
与数据节点关联的数据的路径，在本示例中是一个 CSV 输入文件。

`name`  
用户定义名称，这是仅供您参考的标签。

`dataFormat`  
对活动要处理的数据格式的引用。

**输出数据节点**  
输出 `RedshiftDataNode` 管道组件定义了输出数据的位置；在本例中是 Amazon Redshift 数据库中的表。有关更多信息，请参阅 [RedshiftDataNode](dp-object-redshiftdatanode.md)。此输出组件由以下字段定义：

```
{
  "id": "RedshiftDataNodeId1",
  "schedule": {
    "ref": "ScheduleId1"
  },
  "tableName": "orders",
  "name": "DefaultRedshiftDataNode1",
  "createTableSql": "create table StructuredLogs (requestBeginTime CHAR(30) PRIMARY KEY DISTKEY SORTKEY, requestEndTime CHAR(30), hostname CHAR(100), requestDate varchar(20));",
  "type": "RedshiftDataNode",
  "database": {
    "ref": "RedshiftDatabaseId1"
  }
},
```

`id`  
用户定义 ID，这是仅供您参考的标签。

`schedule`  
对计划组件的引用。

`tableName`  
Amazon Redshift 表的名称。

`name`  
用户定义名称，这是仅供您参考的标签。

`createTableSql`  
一个用于在数据库中创建表的 SQL 表达式。

`database`  
对 Amazon Redshift 数据库的引用。

**数据库**  
`RedshiftDatabase` 组件由以下字段定义。有关更多信息，请参阅 [RedshiftDatabase](dp-object-redshiftdatabase.md)。

```
{
  "id": "RedshiftDatabaseId1",
  "databaseName": "dbname",
  "username": "user",
  "name": "DefaultRedshiftDatabase1",
  "*password": "password",
  "type": "RedshiftDatabase",
  "clusterId": "redshiftclusterId"
},
```

`id`  
用户定义 ID，这是仅供您参考的标签。

`databaseName`  
逻辑数据库的名称。

`username`  
连接到数据库的用户名。

`name`  
用户定义名称，这是仅供您参考的标签。

`password`  
连接到数据库的密码。

`clusterId`  
Redshift 集群的 ID。

# 资源
<a name="dp-copydata-redshift-resource-cli"></a>

这是执行复制操作的计算资源的定义。在此示例中， AWS Data Pipeline 应自动创建一个 EC2 实例来执行复制任务，并在任务完成后终止该实例。此处定义的字段控制完成任务的实例的创建和功能。有关更多信息，请参阅 [Ec2Resource](dp-object-ec2resource.md)。

`Ec2Resource` 由以下字段定义：

```
{
  "id": "Ec2ResourceId1",
  "schedule": {
    "ref": "ScheduleId1"
  },
  "securityGroups": "MySecurityGroup",
  "name": "DefaultEc2Resource1",
  "role": "DataPipelineDefaultRole",
  "logUri": "s3://myLogs",
  "resourceRole": "DataPipelineDefaultResourceRole",
  "type": "Ec2Resource"
},
```

`id`  
用户定义 ID，这是仅供您参考的标签。

`schedule`  
根据它来创建此计算资源的计划。

`securityGroups`  
要用于资源池中实例的安全组。

`name`  
用户定义名称，这是仅供您参考的标签。

`role`  
访问资源的账户的 IAM 角色，例如访问 Amazon S3 存储桶检索数据。

`logUri`  
从 `Ec2Resource` 备份任务运行程序日志的 Amazon S3 目标路径。

`resourceRole`  
创建资源的账户的 IAM 角色，如代表您创建和配置 EC2 实例。角色和 ResourceRole 可以是相同的角色，但单独在安全配置中提供更精细的粒度。

# Activity
<a name="dp-copydata-redshift-activity-cli"></a>

JSON 文件的最后一个部分是活动的定义，表示要执行的工作。在本例中，我们使用 `RedshiftCopyActivity` 组件将数据从 Amazon S3 复制到 Amazon Redshift。有关更多信息，请参阅 [RedshiftCopyActivity](dp-object-redshiftcopyactivity.md)。

`RedshiftCopyActivity` 组件由以下字段定义：

```
{
  "id": "RedshiftCopyActivityId1",
  "input": {
    "ref": "S3DataNodeId1"
  },
  "schedule": {
    "ref": "ScheduleId1"
  },
  "insertMode": "KEEP_EXISTING",
  "name": "DefaultRedshiftCopyActivity1",
  "runsOn": {
    "ref": "Ec2ResourceId1"
  },
  "type": "RedshiftCopyActivity",
  "output": {
    "ref": "RedshiftDataNodeId1"
  }
},
```

`id`  
用户定义 ID，这是仅供您参考的标签。

`input`  
对 Amazon S3 源文件的引用。

`schedule`  
运行此活动的计划。

`insertMode`  
插入类型 (`KEEP_EXISTING`、`OVERWRITE_EXISTING` 或 `TRUNCATE`)。

`name`  
用户定义名称，这是仅供您参考的标签。

`runsOn`  
执行此活动定义的工作的计算资源。

`output`  
对 Amazon Redshift 目标表的引用。

# 上传并激活管道定义
<a name="dp-copydata-redshift-upload-cli"></a>

您必须上传您的管道定义并激活您的管道。在以下示例命令中，*pipeline\$1name*替换为管道的标签和*pipeline\$1file*管道定义`.json`文件的完全限定路径。

**AWS CLI**

要创建管道定义并激活管道，请使用以下 [create-pipeline](https://docs.aws.amazon.com/cli/latest/reference/datapipeline/create-pipeline.html) 命令。记下您的管道 ID，因为您将在大多数 CLI 命令中使用这个值。

```
aws datapipeline create-pipeline --name pipeline_name --unique-id token
{
    "pipelineId": "df-00627471SOVYZEXAMPLE"
}
```

要上传您的管道定义，请使用以下[put-pipeline-definition](https://docs.aws.amazon.com/cli/latest/reference/datapipeline/put-pipeline-definition.html)命令。

```
aws datapipeline put-pipeline-definition --pipeline-id df-00627471SOVYZEXAMPLE --pipeline-definition file://MyEmrPipelineDefinition.json
```

如果您的管道成功验证，则 `validationErrors` 字段为空。您应该查看所有警告。

要激活管道，请使用以下 [activate-pipeline](https://docs.aws.amazon.com/cli/latest/reference/datapipeline/activate-pipeline.html) 命令。

```
aws datapipeline activate-pipeline --pipeline-id df-00627471SOVYZEXAMPLE
```

您可以使用以下 [list-pipelines](https://docs.aws.amazon.com/cli/latest/reference/datapipeline/list-pipelines.html) 命令来验证您的管道是否出现在管道列表中。

```
aws datapipeline list-pipelines
```