

AWS Data Pipeline 不再向新客户提供。的现有客户 AWS Data Pipeline 可以继续照常使用该服务。[了解详情](https://aws.amazon.com/blogs/big-data/migrate-workloads-from-aws-data-pipeline/)

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# RedshiftCopyActivity
<a name="dp-object-redshiftcopyactivity"></a>

将数据从 DynamoDB 或 Amazon S3 复制到 Amazon Redshift。您可以将数据加载到新表中，或轻松地将数据并入现有表中。

下面概括了使用 `RedshiftCopyActivity` 的使用案例：

1. 首先使用 AWS Data Pipeline 在 Amazon S3 中暂存您的数据。

1. 使用 `RedshiftCopyActivity` 将数据从 Amazon RDS 和 Amazon EMR 移动到 Amazon Redshift。

   这可让您将数据加载到 Amazon Redshift 中，您可以在此处对数据进行分析。

1. 使用 [SqlActivity](dp-object-sqlactivity.md) 可以对已加载到 Amazon Redshift 中的数据执行 SQL 查询。

 此外，借助 `RedshiftCopyActivity`，您可以使用 `S3DataNode`，因为它支持清单文件。有关更多信息，请参阅 [S3 DataNode](dp-object-s3datanode.md)。

## 示例
<a name="redshiftcopyactivity-example"></a>

以下是该对象类型的示例。

为了确保格式转换，本示例在 `commandOptions` 中使用 [EMPTYASNULL](https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-data-conversion.html#copy-emptyasnull) 和 [IGNOREBLANKLINES](https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-data-conversion.html#copy-ignoreblanklines) 特殊转换参数。有关信息，请参阅 *Amazon Redshift 数据库开发人员指南*中的[数据转换参数](https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-data-conversion.html)。

```
{
  "id" : "S3ToRedshiftCopyActivity",
  "type" : "RedshiftCopyActivity",
  "input" : { "ref": "MyS3DataNode" },
  "output" : { "ref": "MyRedshiftDataNode" },
  "insertMode" : "KEEP_EXISTING",
  "schedule" : { "ref": "Hour" },
  "runsOn" : { "ref": "MyEc2Resource" },
  "commandOptions": ["EMPTYASNULL", "IGNOREBLANKLINES"]
}
```

以下示例管道定义说明了一个使用 `APPEND` 插入模式的活动：

```
{
  "objects": [
    {
      "id": "CSVId1",
      "name": "DefaultCSV1",
      "type": "CSV"
    },
    {
      "id": "RedshiftDatabaseId1",
      "databaseName": "dbname",
      "username": "user",
      "name": "DefaultRedshiftDatabase1",
      "*password": "password",
      "type": "RedshiftDatabase",
      "clusterId": "redshiftclusterId"
    },
    {
      "id": "Default",
      "scheduleType": "timeseries",
      "failureAndRerunMode": "CASCADE",
      "name": "Default",
      "role": "DataPipelineDefaultRole",
      "resourceRole": "DataPipelineDefaultResourceRole"
    },
    {
      "id": "RedshiftDataNodeId1",
      "schedule": {
        "ref": "ScheduleId1"
      },
      "tableName": "orders",
      "name": "DefaultRedshiftDataNode1",
      "createTableSql": "create table StructuredLogs (requestBeginTime CHAR(30) PRIMARY KEY DISTKEY SORTKEY, requestEndTime CHAR(30), hostname CHAR(100), requestDate varchar(20));",
      "type": "RedshiftDataNode",
      "database": {
        "ref": "RedshiftDatabaseId1"
      }
    },
    {
      "id": "Ec2ResourceId1",
      "schedule": {
        "ref": "ScheduleId1"
      },
      "securityGroups": "MySecurityGroup",
      "name": "DefaultEc2Resource1",
      "role": "DataPipelineDefaultRole",
      "logUri": "s3://myLogs",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "type": "Ec2Resource"
    },
    {
      "id": "ScheduleId1",
      "startDateTime": "yyyy-mm-ddT00:00:00",
      "name": "DefaultSchedule1",
      "type": "Schedule",
      "period": "period",
      "endDateTime": "yyyy-mm-ddT00:00:00"
    },
    {
      "id": "S3DataNodeId1",
      "schedule": {
        "ref": "ScheduleId1"
      },
      "filePath": "s3://datapipeline-us-east-1/samples/hive-ads-samples.csv",
      "name": "DefaultS3DataNode1",
      "dataFormat": {
        "ref": "CSVId1"
      },
      "type": "S3DataNode"
    },
    {
      "id": "RedshiftCopyActivityId1",
      "input": {
        "ref": "S3DataNodeId1"
      },
      "schedule": {
        "ref": "ScheduleId1"
      },
      "insertMode": "APPEND",
      "name": "DefaultRedshiftCopyActivity1",
      "runsOn": {
        "ref": "Ec2ResourceId1"
      },
      "type": "RedshiftCopyActivity",
      "output": {
        "ref": "RedshiftDataNodeId1"
      }
    }
  ]
}
```

`APPEND` 操作向表中添加项，无论主键或排序键如何。例如，如果您有以下表，则可追加具有相同的 ID 和用户值的记录。

```
ID(PK)     USER
1          aaa
2          bbb
```

您可以追加具有相同的 ID 和用户值的记录：

```
ID(PK)     USER
1          aaa
2          bbb
1          aaa
```

**注意**  
如果 `APPEND` 操作中断并重试，生成的重新运行管道可能会从开始位置追加。这可能会导致进一步复制，因此，您应了解此行为，尤其是当您有任何计算行数的逻辑时。

有关教程，请参阅 [使用将数据复制到亚马逊 Redshift AWS Data Pipeline](dp-copydata-redshift.md)。

## 语法
<a name="redshiftcopyactivity-syntax"></a>


****  

| 必填字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| insertMode |   确定 AWS Data Pipeline 如何处理目标表中与要加载的数据中的行重叠的预先存在的数据。 有效值包括：`KEEP_EXISTING`、`OVERWRITE_EXISTING`、`TRUNCATE` 和 `APPEND`。 `KEEP_EXISTING` 添加新行到表中，同时保留任何现有的行不变。 `KEEP_EXISTING` 和 ` OVERWRITE_EXISTING` 使用主键、排序键和分配键来识别哪些传入行与现有行匹配。请参阅 Amazon Redshift *数据库开发人员指南*中的[更新和插入新数据](https://docs.aws.amazon.com/redshift/latest/dg/t_updating-inserting-using-staging-tables-.html)。 `TRUNCATE` 先删除目标表中的所有数据，然后写入新数据。  `APPEND` 会将所有记录添加到 Redshift 表的结尾。`APPEND` 不需要主键、分配键或排序键，因此会附加可能存在重复的项。  | 枚举 | 

 


****  

| 对象调用字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| 计划 |  该对象在计划间隔的执行中调用。 指定对另一个对象的计划引用，以便设置该对象的依赖项执行顺序。 在大多数情况下，我们建议将计划引用放在默认管道对象上，以便所有对象继承该计划。例如，您可以通过指定 `"schedule": {"ref": "DefaultSchedule"}`，明确地针对该对象设置计划。 如果您的管道中的主计划包含嵌套计划，则可以创建具有计划引用的父对象。 有关示例可选计划配置的更多信息，请参阅[计划](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html)。  | 引用对象，例如： "schedule":\$1"ref":"myScheduleId"\$1 | 

 


****  

| 所需的组 (下列选项之一是必需的) | 说明 | 槽位类型 | 
| --- | --- | --- | 
| runsOn | 运行活动或命令的计算资源。例如，Amazon EC2 实例或 Amazon EMR 集群。 | 参考对象，例如 “runson”：\$1“ref”:” myResourceId “\$1 | 
| workerGroup | 工作线程组。这可用于路由任务。如果您提供 runsOn 值并且存在 workerGroup，则将忽略 workerGroup。 | 字符串 | 

 


****  

| 可选字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| attemptStatus | 来自远程活动的最近报告的状态。 | 字符串 | 
| attemptTimeout | 远程工作完成的超时时间。如果设置此字段，则可能会重试未在设定的开始时间内完成的远程活动。 | 周期 | 
| commandOptions |  获取在 `COPY` 操作期间传递到 Amazon Redshift 数据节点的参数。有关更多信息，请参阅 Amazon Redshift *数据库开发人员指南*中的 [COPY](https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html)。 在加载表时，`COPY` 会尝试将字符串隐式转换为目标列的数据类型。如果您收到错误或有其他转换需求，则除了自动发生的默认数据转换之外，您还可以指定其他转换参数。有关信息，请参阅 Amazon Redshift *数据库开发人员指南*中的[数据转换参数](https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-data-conversion.html)。 如果数据格式与输入或输出数据节点关联，则忽略提供的参数。 由于复制操作首先使用 `COPY` 将数据插入暂存表，然后使用 `INSERT` 命令将数据从暂存表复制到目标表中，一些 `COPY` 参数不适用，例如 `COPY` 命令启用表上自动压缩的功能。如果需要压缩，请向 `CREATE TABLE` 语句添加列编码详细信息。 此外，在某些需要从 Amazon Redshift 集群卸载数据和在 Amazon S3 中创建文件的情况下，`RedshiftCopyActivity` 依赖 Amazon Redshift 的 `UNLOAD` 操作。 为提高复制和卸载过程中的性能，请从 `UNLOAD` 命令指定 `PARALLEL OFF` 参数。有关更多信息，请参阅 Amazon Redshift *数据库开发人员指南*中的 [UNLOAD](https://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html)。  | 字符串 | 
| dependsOn | 指定与另一个可运行对象的依赖关系。 | 引用对象："dependsOn":\$1"ref":"myActivityId"\$1 | 
| failureAndRerun模式 | 描述依赖项失败或重新运行时的使用者节点行为。 | 枚举 | 
| input | 输入数据节点。数据源可以是 Amazon S3、DynamoDB 或 Amazon Redshift。 | 引用对象： "input":\$1"ref":"myDataNodeId"\$1 | 
| lateAfterTimeout | 管道启动后经过的时间，在此时间内，对象必须完成。仅当计划类型未设置为 ondemand 时才会触发。 | 周期 | 
| maxActiveInstances | 组件的并发活动实例的最大数量。重新运行不计入活动实例数中。 | 整数 | 
| maximumRetries | 失败后的最大重试次数 | 整数 | 
| onFail | 当前对象失败时要运行的操作。 | 引用对象："onFail":\$1"ref":"myActionId"\$1 | 
| onLateAction | 在尚未计划对象或对象仍未完成的情况下将触发的操作。 | 引用对象： "onLateAction":\$1"ref":"myActionId"\$1 | 
| onSuccess | 当前对象成功时要运行的操作。 | 引用对象： "onSuccess":\$1"ref":"myActionId"\$1 | 
| output | 输出数据节点。输出位置可以是 Amazon S3 或 Amazon Redshift。 | 引用对象： "output":\$1"ref":"myDataNodeId"\$1 | 
| parent | 槽将继承自的当前对象的父级。 | 引用对象："parent":\$1"ref":"myBaseObjectId"\$1 | 
| pipelineLogUri | 用于上传管道日志的 S3 URI（例如 's3: BucketName ///Key/ '）。 | 字符串 | 
| precondition | (可选) 定义先决条件。在满足所有先决条件之前，数据节点不会标记为“READY”。 | 引用对象："precondition":\$1"ref":"myPreconditionId"\$1 | 
| 队列 |  对应于 Amazon Redshift 中的 `query_group ` 设置，这允许您根据放置在队列中的位置分配并优先处理并发活动。 Amazon Redshift 将同时连接的数量限制为 15。有关更多信息，请参阅 Amazon RDS *数据库开发人员指南*中的[向队列分配查询](https://docs.aws.amazon.com/AmazonRDS/latest/DeveloperGuide/cm-c-executing-queries.html)。  | 字符串 | 
| reportProgressTimeout |  远程工作对 `reportProgress` 的连续调用的超时时间。 如果设置此字段，则未报告指定时段的进度的远程活动可能会被视为停滞且已重试。  | 周期 | 
| retryDelay | 两次重试之间的超时时间。 | 周期 | 
| scheduleType |  允许您指定是否计划管道中的对象。值包括：`cron`、`ondemand` 和 `timeseries`。 `timeseries` 计划表示在每个间隔结束时计划实例。 `Cron` 计划表示在每个间隔开始时计划实例。 `ondemand` 计划让您可以在每次激活时运行一次管道。这意味着，您不需要克隆或重新创建管道以再次运行它。  要使用 `ondemand` 管道，请为每个后续运行调用 `ActivatePipeline` 操作。 如果您使用 `ondemand` 计划，您必须在默认对象中指定它，并且该计划必须是在管道中为对象指定的唯一 `scheduleType`。  | 枚举 | 
| transformSql |  用于转换输入数据的 `SQL SELECT` 表达式。 在名为 `staging` 的表上运行 `transformSql` 表达式。 当您从 DynamoDB 或 Amazon S3 复制数据时， AWS Data Pipeline 会创建一个名为“staging”的表，并且最初在该表中加载数据。此表中的数据用于更新目标表。 `transformSql` 的输出架构必须与最终目标表的架构匹配。 如果您指定了 `transformSql` 选项，则会从指定的 SQL 语句创建第二个暂存表。然后，来自这第二个暂存表的数据更新到最终的目标表中。  | 字符串 | 

 


****  

| 运行时字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| @activeInstances | 当前计划的有效实例对象的列表。 | 引用对象："activeInstances":\$1"ref":"myRunnableObjectId"\$1 | 
| @actualEndTime | 该对象的执行完成时间。 | DateTime | 
| @actualStartTime | 该对象的执行开始时间。 | DateTime | 
| cancellationReason | 该对象被取消时显示的 cancellationReason。 | 字符串 | 
| @cascadeFailedOn | 对象在其上失败的依赖项链的描述。 | 引用对象： "cascadeFailedOn":\$1"ref":"myRunnableObjectId"\$1 | 
| emrStepLog | 仅在尝试 EMR 活动时可用的 EMR 步骤日志 | 字符串 | 
| errorId | 该对象失败时显示的 errorId。 | 字符串 | 
| errorMessage | 该对象失败时显示的 errorMessage。 | 字符串 | 
| errorStackTrace | 该对象失败时显示的错误堆栈跟踪。 | 字符串 | 
| @finishedTime | 该对象完成其执行的时间。 | DateTime | 
| hadoopJobLog | 在尝试基于 EMR 的活动时可用的 Hadoop 任务日志。 | 字符串 | 
| @healthStatus | 对象的运行状况，反映进入终止状态的上个对象实例成功还是失败。 | 字符串 | 
| @healthStatusFromInstanceId | 进入终止状态的上个实例对象的 ID。 | 字符串 | 
| @ T healthStatusUpdated ime | 上次更新运行状况的时间。 | DateTime | 
| hostname | 已执行任务尝试的客户端的主机名。 | 字符串 | 
| @lastDeactivatedTime | 上次停用该对象的时间。 | DateTime | 
| @ T latestCompletedRun ime | 已完成执行的最新运行的时间。 | DateTime | 
| @latestRunTime | 已计划执行的最新运行的时间。 | DateTime | 
| @nextRunTime | 计划下次运行的时间。 | DateTime | 
| reportProgressTime | 远程活动报告进度的最近时间。 | DateTime | 
| @scheduledEndTime | 对象的计划结束时间。 | DateTime | 
| @scheduledStartTime | 对象的计划开始时间。 | DateTime | 
| @status | 该对象的状态。 | 字符串 | 
| @version | 用来创建对象的管道版本。 | 字符串 | 
| @waitingOn | 该对象在其上处于等待状态的依赖项列表的描述。 | 引用对象： "waitingOn":\$1"ref":"myRunnableObjectId"\$1 | 

 


****  

| 系统字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| @error | 用于描述格式不正确的对象的错误消息。 | 字符串 | 
| @pipelineId | 该对象所属的管道的 ID。 | 字符串 | 
| @sphere | 对象的球体。表示对象在生命周期中的位置。例如，组件对象产生实例对象，后者执行尝试对象。 | 字符串 | 