RedshiftCopyActivity - AWS Data Pipeline

AWS Data Pipeline 不再向新客户提供。的现有客户 AWS Data Pipeline 可以继续照常使用该服务。了解更多

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

RedshiftCopyActivity

将数据从 DynamoDB 或 Amazon S3 复制到 Amazon Redshift。您可以将数据加载到新表中,或轻松地将数据并入现有表中。

下面概括了使用 RedshiftCopyActivity 的使用案例:

  1. 首先使用 AWS Data Pipeline 在 Amazon S3 中暂存您的数据。

  2. 用于将数据从亚马逊RDS和亚马逊转移RedshiftCopyActivity到亚马逊 Redshift。EMR

    这可让您将数据加载到 Amazon Redshift 中,您可以在此处对数据进行分析。

  3. 用于SQL查询您已加载SqlActivity到 Amazon Redshift 中的数据。

此外,借助 RedshiftCopyActivity,您可以使用 S3DataNode,因为它支持清单文件。有关更多信息,请参阅 S3 DataNode

示例

以下是该对象类型的示例。

为了确保格式转换,本示例在中使用了EMPTYASNULLIGNOREBLANKLINES特殊的转换参数commandOptions。有关信息,请参阅 Amazon Redshift 数据库开发人员指南中的数据转换参数

{ "id" : "S3ToRedshiftCopyActivity", "type" : "RedshiftCopyActivity", "input" : { "ref": "MyS3DataNode" }, "output" : { "ref": "MyRedshiftDataNode" }, "insertMode" : "KEEP_EXISTING", "schedule" : { "ref": "Hour" }, "runsOn" : { "ref": "MyEc2Resource" }, "commandOptions": ["EMPTYASNULL", "IGNOREBLANKLINES"] }

以下示例管道定义说明了一个使用 APPEND 插入模式的活动:

{ "objects": [ { "id": "CSVId1", "name": "DefaultCSV1", "type": "CSV" }, { "id": "RedshiftDatabaseId1", "databaseName": "dbname", "username": "user", "name": "DefaultRedshiftDatabase1", "*password": "password", "type": "RedshiftDatabase", "clusterId": "redshiftclusterId" }, { "id": "Default", "scheduleType": "timeseries", "failureAndRerunMode": "CASCADE", "name": "Default", "role": "DataPipelineDefaultRole", "resourceRole": "DataPipelineDefaultResourceRole" }, { "id": "RedshiftDataNodeId1", "schedule": { "ref": "ScheduleId1" }, "tableName": "orders", "name": "DefaultRedshiftDataNode1", "createTableSql": "create table StructuredLogs (requestBeginTime CHAR(30) PRIMARY KEY DISTKEY SORTKEY, requestEndTime CHAR(30), hostname CHAR(100), requestDate varchar(20));", "type": "RedshiftDataNode", "database": { "ref": "RedshiftDatabaseId1" } }, { "id": "Ec2ResourceId1", "schedule": { "ref": "ScheduleId1" }, "securityGroups": "MySecurityGroup", "name": "DefaultEc2Resource1", "role": "DataPipelineDefaultRole", "logUri": "s3://myLogs", "resourceRole": "DataPipelineDefaultResourceRole", "type": "Ec2Resource" }, { "id": "ScheduleId1", "startDateTime": "yyyy-mm-ddT00:00:00", "name": "DefaultSchedule1", "type": "Schedule", "period": "period", "endDateTime": "yyyy-mm-ddT00:00:00" }, { "id": "S3DataNodeId1", "schedule": { "ref": "ScheduleId1" }, "filePath": "s3://datapipeline-us-east-1/samples/hive-ads-samples.csv", "name": "DefaultS3DataNode1", "dataFormat": { "ref": "CSVId1" }, "type": "S3DataNode" }, { "id": "RedshiftCopyActivityId1", "input": { "ref": "S3DataNodeId1" }, "schedule": { "ref": "ScheduleId1" }, "insertMode": "APPEND", "name": "DefaultRedshiftCopyActivity1", "runsOn": { "ref": "Ec2ResourceId1" }, "type": "RedshiftCopyActivity", "output": { "ref": "RedshiftDataNodeId1" } } ] }

APPEND 操作向表中添加项,无论主键或排序键如何。例如,如果您有以下表,则可追加具有相同的 ID 和用户值的记录。

ID(PK) USER 1 aaa 2 bbb

您可以追加具有相同的 ID 和用户值的记录:

ID(PK) USER 1 aaa 2 bbb 1 aaa
注意

如果 APPEND 操作中断并重试,生成的重新运行管道可能会从开始位置追加。这可能会导致进一步复制,因此,您应了解此行为,尤其是当您有任何计算行数的逻辑时。

有关教程,请参阅 使用 AWS Data Pipeline 复制数据到 Amazon Redshift

语法

必填字段 描述 槽类型
insertMode

确定 AWS Data Pipeline 如何处理目标表中与要加载的数据中的行重叠的预先存在的数据。

有效值包括:KEEP_EXISTINGOVERWRITE_EXISTINGTRUNCATEAPPEND

KEEP_EXISTING 添加新行到表中,同时保留任何现有的行不变。

KEEP_EXISTING OVERWRITE_EXISTING 使用主键、排序键和分配键来识别哪些传入行与现有行匹配。请参阅 Amazon Redshift 数据库开发人员指南中的更新和插入新数据

TRUNCATE 先删除目标表中的所有数据,然后写入新数据。

APPEND 会将所有记录添加到 Redshift 表的结尾。APPEND 不需要主键、分配键或排序键,因此会附加可能存在重复的项。

枚举

对象调用字段 描述 槽类型
schedule

该对象在计划间隔的执行中调用。

指定对另一个对象的计划引用,以便设置该对象的依赖项执行顺序。

在大多数情况下,我们建议将计划引用放在默认管道对象上,以便所有对象继承该计划。例如,您可以通过指定 "schedule": {"ref": "DefaultSchedule"},明确地针对该对象设置计划。

如果您的管道中的主计划包含嵌套计划,则可以创建具有计划引用的父对象。

有关示例可选计划配置的更多信息,请参阅计划

引用对象,例如: "schedule":{"ref":"myScheduleId"}

所需的组 (下列选项之一是必需的) 描述 槽类型
runsOn 运行活动或命令的计算资源。例如,亚马逊EC2实例或亚马逊EMR集群。 引用对象,例如 runsOn ““: {" ref”:” myResourceId “}
workerGroup 工作线程组。这可用于路由任务。如果您提供了一个runsOn值并且workerGroup存在,则会workerGroup被忽略。 String

可选字段 描述 槽类型
attemptStatus 来自远程活动的最近报告的状态。 String
attemptTimeout 远程工作完成的超时时间。如果设置此字段,则可能会重试未在设定的开始时间内完成的远程活动。 周期
commandOptions

获取在 COPY 操作期间传递到 Amazon Redshift 数据节点的参数。有关参数的信息,请参阅COPY《亚马逊 Redshift 数据库开发者指南》。

在加载表时,COPY 会尝试将字符串隐式转换为目标列的数据类型。如果您收到错误或有其他转换需求,则除了自动发生的默认数据转换之外,您还可以指定其他转换参数。有关信息,请参阅 Amazon Redshift 数据库开发人员指南中的数据转换参数

如果数据格式与输入或输出数据节点关联,则忽略提供的参数。

由于复制操作首先使用 COPY 将数据插入暂存表,然后使用 INSERT 命令将数据从暂存表复制到目标表中,一些 COPY 参数不适用,例如 COPY 命令启用表上自动压缩的功能。如果需要压缩,请向 CREATE TABLE 语句添加列编码详细信息。

此外,在某些需要从 Amazon Redshift 集群卸载数据和在 Amazon S3 中创建文件的情况下,RedshiftCopyActivity 依赖 Amazon Redshift 的 UNLOAD 操作。

为提高复制和卸载过程中的性能,请从 UNLOAD 命令指定 PARALLEL OFF 参数。有关参数的信息,请参阅UNLOAD《亚马逊 Redshift 数据库开发者指南》。

String
dependsOn 指定与另一个可运行对象的依赖关系。 引用对象:"dependsOn":{"ref":"myActivityId"}
failureAndRerun模式 描述依赖项失败或重新运行时的使用者节点行为。 枚举
input 输入数据节点。数据源可以是 Amazon S3、DynamoDB 或 Amazon Redshift。 引用对象: "input":{"ref":"myDataNodeId"}
lateAfterTimeout 管道启动后经过的时间,在此时间内,对象必须完成。仅当计划类型未设置为 ondemand 时才会触发。 周期
maxActiveInstances 组件的并发活动实例的最大数量。重新运行不计入活动实例数中。 整数
maximumRetries 失败后的最大重试次数 整数
onFail 当前对象失败时要运行的操作。 引用对象:"onFail":{"ref":"myActionId"}
onLateAction 在尚未计划对象或对象仍未完成的情况下将触发的操作。 引用对象: "onLateAction":{"ref":"myActionId"}
onSuccess 当前对象成功时要运行的操作。 引用对象: "onSuccess":{"ref":"myActionId"}
output 输出数据节点。输出位置可以是 Amazon S3 或 Amazon Redshift。 引用对象: "output":{"ref":"myDataNodeId"}
parent 槽将继承自的当前对象的父级。 引用对象:"parent":{"ref":"myBaseObjectId"}
pipelineLogUri 用于上传管道日志的 S3URI(例如 's3: BucketName ///Key/ ')。 String
precondition (可选) 定义先决条件。在满足所有先决条件之前,数据节点不会被标记 READY “”。 引用对象:"precondition":{"ref":"myPreconditionId"}
队列

对应于 Amazon Redshift 中的 query_group 设置,这允许您根据放置在队列中的位置分配并优先处理并发活动。

Amazon Redshift 将同时连接的数量限制为 15。有关更多信息,请参阅 Amazon RDS 数据库开发者指南中的为队列分配查询

String
reportProgressTimeout

远程工作对 reportProgress 的连续调用的超时时间。

如果设置此字段,则未报告指定时段的进度的远程活动可能会被视为停滞且已重试。

周期
retryDelay 两次重试之间的超时时间。 周期
scheduleType

允许您指定是否计划管道中的对象。值包括:cronondemandtimeseries

timeseries 计划表示在每个间隔结束时计划实例。

Cron 计划表示在每个间隔开始时计划实例。

ondemand 计划让您可以在每次激活时运行一次管道。这意味着,您不需要克隆或重新创建管道以再次运行它。

要使用 ondemand 管道,请为每个后续运行调用 ActivatePipeline 操作。

如果您使用 ondemand 计划,您必须在默认对象中指定它,并且该计划必须是在管道中为对象指定的唯一 scheduleType

枚举
transformSql

用于转换输入数据的 SQL SELECT 表达式。

在名为 staging 的表上运行 transformSql 表达式。

当您从 DynamoDB 或 Amazon S3 复制数据时, AWS Data Pipeline 会创建一个名为“staging”的表,并且最初在该表中加载数据。此表中的数据用于更新目标表。

transformSql 的输出架构必须与最终目标表的架构匹配。

如果指定该transformSql选项,则会根据指定的SQL语句创建第二个临时表。然后,来自这第二个暂存表的数据更新到最终的目标表中。

String

运行时字段 描述 槽类型
@activeInstances 当前计划的有效实例对象的列表。 引用对象:"activeInstances":{"ref":"myRunnableObjectId"}
@actualEndTime 该对象的执行完成时间。 DateTime
@actualStartTime 该对象的执行开始时间。 DateTime
cancellationReason cancellationReason 如果此对象已取消,则为。 String
@cascadeFailedOn 对象在其上失败的依赖项链的描述。 引用对象: "cascadeFailedOn":{"ref":"myRunnableObjectId"}
emrStepLog EMR步骤日志仅在尝试EMR活动时可用 String
errorId errorId 如果此对象失败,则为。 String
errorMessage errorMessage 如果此对象失败,则为。 String
errorStackTrace 该对象失败时显示的错误堆栈跟踪。 String
@finishedTime 该对象完成其执行的时间。 DateTime
hadoopJobLog Hadoop 作业日志可用于尝试进行EMR基于活动的情况。 String
@healthStatus 对象的运行状况,反映进入终止状态的上个对象实例成功还是失败。 String
@healthStatusFromInstanceId 进入终止状态的上个实例对象的 ID。 String
@ T healthStatusUpdated ime 上次更新运行状况的时间。 DateTime
hostname 已执行任务尝试的客户端的主机名。 String
@lastDeactivatedTime 上次停用该对象的时间。 DateTime
@ T latestCompletedRun ime 已完成执行的最新运行的时间。 DateTime
@latestRunTime 已计划执行的最新运行的时间。 DateTime
@nextRunTime 计划下次运行的时间。 DateTime
reportProgressTime 远程活动报告进度的最近时间。 DateTime
@scheduledEndTime 对象的计划结束时间。 DateTime
@scheduledStartTime 对象的计划开始时间。 DateTime
@status 该对象的状态。 String
@version 用来创建对象的管道版本。 String
@waitingOn 该对象在其上处于等待状态的依赖项列表的描述。 引用对象: "waitingOn":{"ref":"myRunnableObjectId"}

系统字段 描述 槽类型
@error 用于描述格式不正确的对象的错误消息。 String
@pipelineId 该对象所属的管道的 ID。 String
@sphere 对象的球体。表示对象在生命周期中的位置。例如,组件对象产生实例对象,后者执行尝试对象。 String