AWS Data Pipeline 不再向新客户提供。的现有客户 AWS Data Pipeline 可以继续照常使用该服务。了解更多
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
CopyActivity
将数据从一个位置复制到另一个位置。 CopyActivity
支持S3 DataNode和SqlDataNode作为输入和输出,通常执行复制操作 record-by-record。不过,当满足以下条件时,CopyActivity
可提供高性能的 Amazon S3 到 Amazon S3 复制:
-
输入和输出均为 S3 DataNodes
-
对于输入和输出,
dataFormat
字段是相同的
如果您提供压缩的数据文件作为输入,并且不使用 S3 数据节点上的 compression
字段指示这一点,则 CopyActivity
可能会失败。在此情况下,CopyActivity
无法正确检测记录结束字符,并且操作会失败。此外,还CopyActivity
支持从一个目录复制到另一个目录以及将文件复制到一个目录,但是在 record-by-record 将目录复制到文件时会发生复制。最后,CopyActivity
不支持复制多部分 Amazon S3 文件。
CopyActivity
其CSV支持有特定的局限性。当您使用 S3 DataNode 作为输入时CopyActivity
,只能对 Amazon S3 的输入和输出字段使用CSV数据文件格式的 Unix/Linux 变体。Unix/Linux 变体要求:
-
分隔符必须是“,”(逗号) 字符。
-
记录未用引号引起来。
-
默认转义字符的ASCII值为 92(反斜杠)。
-
记录结尾标识符的ASCII值为 10(或 “\ n”)。
基于 Windows 的系统通常使用不同的 end-of-record 字符序列:回车和换行在一起(ASCII值 13 和ASCII值 10)。您必须使用其他机制来适应此差异 (例如,使用预先复制脚本修改输入数据) 以确保 CopyActivity
能够正确检测到记录结尾;否则,CopyActivity
将反复失败。
使用CopyActivity
从 Postgre SQL RDS 对象导出为TSV数据格式时,默认NULL字符为\ n。
示例
以下是该对象类型的示例。该对象引用您将在同一管道定义文件中定义的其他三个对象。CopyPeriod
为 Schedule
对象,InputData
和 OutputData
为数据节点对象。
{ "id" : "S3ToS3Copy", "type" : "CopyActivity", "schedule" : { "ref" : "CopyPeriod" }, "input" : { "ref" : "InputData" }, "output" : { "ref" : "OutputData" }, "runsOn" : { "ref" : "MyEc2Resource" } }
语法
对象调用字段 | 描述 | 槽类型 |
---|---|---|
schedule | 该对象在计划间隔的执行中调用。用户必须指定对另一个对象的计划引用,以便设置该对象的依赖项执行顺序。用户可以通过在对象上显式设置时间表来满足此要求,例如,指定 “schedule”: {"ref”: "DefaultSchedule“}。在大多数情况下,最好将计划引用放在默认管道对象上,以便所有对象继承该计划。或者,如果管道有一个计划树 (计划位于主计划中),用户可以创建具有计划引用的父对象。有关示例可选计划配置的更多信息,请参阅 https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html。 | 参考对象,例如 “日程安排”:{“ref”:” myScheduleId “} |
所需的组 (下列选项之一是必需的) | 描述 | 槽类型 |
---|---|---|
runsOn | 运行活动或命令的计算资源。例如,亚马逊EC2实例或亚马逊EMR集群。 | 引用对象,例如 runsOn ““: {" ref”:” myResourceId “} |
workerGroup | 工作线程组。这可用于路由任务。如果您提供了一个 runsOn 值并且 workerGroup 存在,则会 workerGroup 被忽略。 | String |
可选字段 | 描述 | 槽类型 |
---|---|---|
attemptStatus | 来自远程活动的最近报告的状态。 | String |
attemptTimeout | 远程工作完成的超时时间。如果设置此字段,则可能会重试未在设定的开始时间内完成的远程活动。 | 周期 |
dependsOn | 指定与另一个可运行对象的依赖关系。 | 引用对象,例如 dependsOn ““: {" ref”:” myActivityId “} |
failureAndRerun模式 | 描述依赖项失败或重新运行时的使用者节点行为。 | 枚举 |
input | 输入数据源。 | 参考对象,例如 “输入”:{"ref”:” myDataNode Id "} |
lateAfterTimeout | 管道启动后经过的时间,在此时间内,对象必须完成。仅当计划类型未设置为 ondemand 时才会触发。 |
周期 |
maxActiveInstances | 组件的并发活动实例的最大数量。重新运行不计入活动实例数中。 | 整数 |
maximumRetries | 失败后的最大重试次数 | 整数 |
onFail | 当前对象失败时要运行的操作。 | 引用对象,例如 onFail ““: {" ref”:” myActionId “} |
onLateAction | 在尚未计划对象或对象仍未完成的情况下将触发的操作。 | 引用对象,例如 onLateAction ““: {" ref”:” myActionId “} |
onSuccess | 当前对象成功时要运行的操作。 | 引用对象,例如 onSuccess ““: {" ref”:” myActionId “} |
output | 输出数据源。 | 参考对象,例如 “输出”:{"ref”:” myDataNode Id "} |
parent | 槽将继承自的当前对象的父级。 | 引用对象,例如 “父对象”:{"ref”:” myBaseObject Id "} |
pipelineLogUri | 用于上传管道日志的 S3URI(例如 's3: BucketName ///Key/ ')。 | String |
precondition | (可选) 定义先决条件。在满足所有先决条件之前,数据节点不会被标记 READY “”。 | 参考对象,例如 “前提条件”:{“ref”:” myPreconditionId “} |
reportProgressTimeout | 远程办公连续调用超时reportProgress。如果设置此字段,则未报告指定时段的进度的远程活动可能会被视为停滞且已重试。 | 周期 |
retryDelay | 两次重试之间的超时时间。 | 周期 |
scheduleType | 计划类型允许您指定应在间隔的结尾还是开头计划您管道定义中的对象。时间序列风格计划表示在每次间隔的结尾计划实例,而 Cron 风格计划表示应在每次间隔的开头计划实例。按需计划让您可以在每次激活时运行一次管道。这意味着,您不需要克隆或重新创建管道以再次运行它。如果您使用按需计划,则必须在默认对象中指定该计划,并且必须是唯一为管道中的对象 scheduleType 指定的计划。要使用按需管道,您只需为后续每次运行调用该 ActivatePipeline 操作即可。值包括:cron、ondemand 和 timeseries。 | 枚举 |
运行时字段 | 描述 | 槽类型 |
---|---|---|
@activeInstances | 当前计划的有效实例对象的列表。 | 引用对象,例如 activeInstances ““: {" ref”:” myRunnableObject Id "} |
@actualEndTime | 该对象的执行完成时间。 | DateTime |
@actualStartTime | 该对象的执行开始时间。 | DateTime |
cancellationReason | cancellationReason 如果此对象已取消,则为。 | String |
@cascadeFailedOn | 对象在其上失败的依赖项链的描述。 | 引用对象,例如 cascadeFailedOn ““: {" ref”:” myRunnableObject Id "} |
emrStepLog | EMR步骤日志仅在尝试EMR活动时可用 | String |
errorId | errorId 如果此对象失败,则为。 | String |
errorMessage | errorMessage 如果此对象失败,则为。 | String |
errorStackTrace | 该对象失败时显示的错误堆栈跟踪。 | String |
@finishedTime | 该对象完成其执行的时间。 | DateTime |
hadoopJobLog | Hadoop 作业日志可用于尝试进行EMR基于活动的情况。 | String |
@healthStatus | 对象的运行状况,反映进入终止状态的上个对象实例成功还是失败。 | String |
@healthStatusFromInstanceId | 进入终止状态的上个实例对象的 ID。 | String |
@ T healthStatusUpdated ime | 上次更新运行状况的时间。 | DateTime |
hostname | 已执行任务尝试的客户端的主机名。 | String |
@lastDeactivatedTime | 上次停用该对象的时间。 | DateTime |
@ T latestCompletedRun ime | 已完成执行的最新运行的时间。 | DateTime |
@latestRunTime | 已计划执行的最新运行的时间。 | DateTime |
@nextRunTime | 计划下次运行的时间。 | DateTime |
reportProgressTime | 远程活动报告进度的最近时间。 | DateTime |
@scheduledEndTime | 对象的计划结束时间。 | DateTime |
@scheduledStartTime | 对象的计划开始时间。 | DateTime |
@status | 该对象的状态。 | String |
@version | 用来创建对象的管道版本。 | String |
@waitingOn | 该对象在其上处于等待状态的依赖项列表的描述。 | 引用对象,例如 waitingOn ““: {" ref”:” myRunnableObject Id "} |
系统字段 | 描述 | 槽类型 |
---|---|---|
@error | 用于描述格式不正确的对象的错误消息 | String |
@pipelineId | 该对象所属的管道的 ID | String |
@sphere | 对象的范围指明对象在生命周期中的位置:组件对象产生实例对象,后者执行尝试对象 | String |