AWS Data Pipeline 不再向新客户提供。的现有客户 AWS Data Pipeline 可以继续照常使用该服务。了解更多
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
RedshiftCopyActivity
将数据从 DynamoDB 或 Amazon S3 复制到 Amazon Redshift。您可以将数据加载到新表中,或轻松地将数据并入现有表中。
下面概括了使用 RedshiftCopyActivity
的使用案例:
-
首先使用 AWS Data Pipeline 在 Amazon S3 中暂存您的数据。
-
用于将数据从亚马逊RDS和亚马逊转移
RedshiftCopyActivity
到亚马逊 Redshift。EMR这可让您将数据加载到 Amazon Redshift 中,您可以在此处对数据进行分析。
-
用于SQL查询您已加载SqlActivity到 Amazon Redshift 中的数据。
此外,借助 RedshiftCopyActivity
,您可以使用 S3DataNode
,因为它支持清单文件。有关更多信息,请参阅 S3 DataNode。
示例
以下是该对象类型的示例。
为了确保格式转换,本示例在中使用了EMPTYASNULL和IGNOREBLANKLINES特殊的转换参数commandOptions
。有关信息,请参阅 Amazon Redshift 数据库开发人员指南中的数据转换参数。
{ "id" : "S3ToRedshiftCopyActivity", "type" : "RedshiftCopyActivity", "input" : { "ref": "MyS3DataNode" }, "output" : { "ref": "MyRedshiftDataNode" }, "insertMode" : "KEEP_EXISTING", "schedule" : { "ref": "Hour" }, "runsOn" : { "ref": "MyEc2Resource" }, "commandOptions": ["EMPTYASNULL", "IGNOREBLANKLINES"] }
以下示例管道定义说明了一个使用 APPEND
插入模式的活动:
{ "objects": [ { "id": "CSVId1", "name": "DefaultCSV1", "type": "CSV" }, { "id": "RedshiftDatabaseId1", "databaseName": "dbname", "username": "user", "name": "DefaultRedshiftDatabase1", "*password": "password", "type": "RedshiftDatabase", "clusterId": "redshiftclusterId" }, { "id": "Default", "scheduleType": "timeseries", "failureAndRerunMode": "CASCADE", "name": "Default", "role": "DataPipelineDefaultRole", "resourceRole": "DataPipelineDefaultResourceRole" }, { "id": "RedshiftDataNodeId1", "schedule": { "ref": "ScheduleId1" }, "tableName": "orders", "name": "DefaultRedshiftDataNode1", "createTableSql": "create table StructuredLogs (requestBeginTime CHAR(30) PRIMARY KEY DISTKEY SORTKEY, requestEndTime CHAR(30), hostname CHAR(100), requestDate varchar(20));", "type": "RedshiftDataNode", "database": { "ref": "RedshiftDatabaseId1" } }, { "id": "Ec2ResourceId1", "schedule": { "ref": "ScheduleId1" }, "securityGroups": "MySecurityGroup", "name": "DefaultEc2Resource1", "role": "DataPipelineDefaultRole", "logUri": "s3://myLogs", "resourceRole": "DataPipelineDefaultResourceRole", "type": "Ec2Resource" }, { "id": "ScheduleId1", "startDateTime": "yyyy-mm-ddT00:00:00", "name": "DefaultSchedule1", "type": "Schedule", "period": "period", "endDateTime": "yyyy-mm-ddT00:00:00" }, { "id": "S3DataNodeId1", "schedule": { "ref": "ScheduleId1" }, "filePath": "s3://datapipeline-us-east-1/samples/hive-ads-samples.csv", "name": "DefaultS3DataNode1", "dataFormat": { "ref": "CSVId1" }, "type": "S3DataNode" }, { "id": "RedshiftCopyActivityId1", "input": { "ref": "S3DataNodeId1" }, "schedule": { "ref": "ScheduleId1" }, "insertMode": "APPEND", "name": "DefaultRedshiftCopyActivity1", "runsOn": { "ref": "Ec2ResourceId1" }, "type": "RedshiftCopyActivity", "output": { "ref": "RedshiftDataNodeId1" } } ] }
APPEND
操作向表中添加项,无论主键或排序键如何。例如,如果您有以下表,则可追加具有相同的 ID 和用户值的记录。
ID(PK) USER 1 aaa 2 bbb
您可以追加具有相同的 ID 和用户值的记录:
ID(PK) USER 1 aaa 2 bbb 1 aaa
注意
如果 APPEND
操作中断并重试,生成的重新运行管道可能会从开始位置追加。这可能会导致进一步复制,因此,您应了解此行为,尤其是当您有任何计算行数的逻辑时。
有关教程,请参阅 使用 AWS Data Pipeline 复制数据到 Amazon Redshift。
语法
必填字段 | 描述 | 槽类型 |
---|---|---|
insertMode |
确定 AWS Data Pipeline 如何处理目标表中与要加载的数据中的行重叠的预先存在的数据。 有效值包括:
|
枚举 |
对象调用字段 | 描述 | 槽类型 |
---|---|---|
schedule |
该对象在计划间隔的执行中调用。 指定对另一个对象的计划引用,以便设置该对象的依赖项执行顺序。 在大多数情况下,我们建议将计划引用放在默认管道对象上,以便所有对象继承该计划。例如,您可以通过指定 如果您的管道中的主计划包含嵌套计划,则可以创建具有计划引用的父对象。 有关示例可选计划配置的更多信息,请参阅计划。 |
引用对象,例如: "schedule":{"ref":"myScheduleId"} |
所需的组 (下列选项之一是必需的) | 描述 | 槽类型 |
---|---|---|
runsOn | 运行活动或命令的计算资源。例如,亚马逊EC2实例或亚马逊EMR集群。 | 引用对象,例如 runsOn ““: {" ref”:” myResourceId “} |
workerGroup | 工作线程组。这可用于路由任务。如果您提供了一个runsOn 值并且workerGroup 存在,则会workerGroup被忽略。 |
String |
可选字段 | 描述 | 槽类型 |
---|---|---|
attemptStatus | 来自远程活动的最近报告的状态。 | String |
attemptTimeout | 远程工作完成的超时时间。如果设置此字段,则可能会重试未在设定的开始时间内完成的远程活动。 | 周期 |
commandOptions |
获取在 在加载表时, 如果数据格式与输入或输出数据节点关联,则忽略提供的参数。 由于复制操作首先使用 此外,在某些需要从 Amazon Redshift 集群卸载数据和在 Amazon S3 中创建文件的情况下, 为提高复制和卸载过程中的性能,请从 |
String |
dependsOn | 指定与另一个可运行对象的依赖关系。 | 引用对象:"dependsOn":{"ref":"myActivityId"} |
failureAndRerun模式 | 描述依赖项失败或重新运行时的使用者节点行为。 | 枚举 |
input | 输入数据节点。数据源可以是 Amazon S3、DynamoDB 或 Amazon Redshift。 | 引用对象: "input":{"ref":"myDataNodeId"} |
lateAfterTimeout | 管道启动后经过的时间,在此时间内,对象必须完成。仅当计划类型未设置为 ondemand 时才会触发。 |
周期 |
maxActiveInstances | 组件的并发活动实例的最大数量。重新运行不计入活动实例数中。 | 整数 |
maximumRetries | 失败后的最大重试次数 | 整数 |
onFail | 当前对象失败时要运行的操作。 | 引用对象:"onFail":{"ref":"myActionId"} |
onLateAction | 在尚未计划对象或对象仍未完成的情况下将触发的操作。 | 引用对象: "onLateAction":{"ref":"myActionId"} |
onSuccess | 当前对象成功时要运行的操作。 | 引用对象: "onSuccess":{"ref":"myActionId"} |
output | 输出数据节点。输出位置可以是 Amazon S3 或 Amazon Redshift。 | 引用对象: "output":{"ref":"myDataNodeId"} |
parent | 槽将继承自的当前对象的父级。 | 引用对象:"parent":{"ref":"myBaseObjectId"} |
pipelineLogUri | 用于上传管道日志的 S3URI(例如 's3: BucketName ///Key/ ')。 | String |
precondition | (可选) 定义先决条件。在满足所有先决条件之前,数据节点不会被标记 READY “”。 | 引用对象:"precondition":{"ref":"myPreconditionId"} |
队列 |
对应于 Amazon Redshift 中的 Amazon Redshift 将同时连接的数量限制为 15。有关更多信息,请参阅 Amazon RDS 数据库开发者指南中的为队列分配查询。 |
String |
reportProgressTimeout |
远程工作对 如果设置此字段,则未报告指定时段的进度的远程活动可能会被视为停滞且已重试。 |
周期 |
retryDelay | 两次重试之间的超时时间。 | 周期 |
scheduleType |
允许您指定是否计划管道中的对象。值包括:
要使用 如果您使用 |
枚举 |
transformSql |
用于转换输入数据的 在名为 当您从 DynamoDB 或 Amazon S3 复制数据时, AWS Data Pipeline 会创建一个名为“staging”的表,并且最初在该表中加载数据。此表中的数据用于更新目标表。
如果指定该 |
String |
运行时字段 | 描述 | 槽类型 |
---|---|---|
@activeInstances | 当前计划的有效实例对象的列表。 | 引用对象:"activeInstances":{"ref":"myRunnableObjectId"} |
@actualEndTime | 该对象的执行完成时间。 | DateTime |
@actualStartTime | 该对象的执行开始时间。 | DateTime |
cancellationReason | cancellationReason 如果此对象已取消,则为。 | String |
@cascadeFailedOn | 对象在其上失败的依赖项链的描述。 | 引用对象: "cascadeFailedOn":{"ref":"myRunnableObjectId"} |
emrStepLog | EMR步骤日志仅在尝试EMR活动时可用 | String |
errorId | errorId 如果此对象失败,则为。 | String |
errorMessage | errorMessage 如果此对象失败,则为。 | String |
errorStackTrace | 该对象失败时显示的错误堆栈跟踪。 | String |
@finishedTime | 该对象完成其执行的时间。 | DateTime |
hadoopJobLog | Hadoop 作业日志可用于尝试进行EMR基于活动的情况。 | String |
@healthStatus | 对象的运行状况,反映进入终止状态的上个对象实例成功还是失败。 | String |
@healthStatusFromInstanceId | 进入终止状态的上个实例对象的 ID。 | String |
@ T healthStatusUpdated ime | 上次更新运行状况的时间。 | DateTime |
hostname | 已执行任务尝试的客户端的主机名。 | String |
@lastDeactivatedTime | 上次停用该对象的时间。 | DateTime |
@ T latestCompletedRun ime | 已完成执行的最新运行的时间。 | DateTime |
@latestRunTime | 已计划执行的最新运行的时间。 | DateTime |
@nextRunTime | 计划下次运行的时间。 | DateTime |
reportProgressTime | 远程活动报告进度的最近时间。 | DateTime |
@scheduledEndTime | 对象的计划结束时间。 | DateTime |
@scheduledStartTime | 对象的计划开始时间。 | DateTime |
@status | 该对象的状态。 | String |
@version | 用来创建对象的管道版本。 | String |
@waitingOn | 该对象在其上处于等待状态的依赖项列表的描述。 | 引用对象: "waitingOn":{"ref":"myRunnableObjectId"} |
系统字段 | 描述 | 槽类型 |
---|---|---|
@error | 用于描述格式不正确的对象的错误消息。 | String |
@pipelineId | 该对象所属的管道的 ID。 | String |
@sphere | 对象的球体。表示对象在生命周期中的位置。例如,组件对象产生实例对象,后者执行尝试对象。 | String |