PigActivity - AWS Data Pipeline

AWS Data Pipeline 不再向新客户提供。的现有客户 AWS Data Pipeline 可以继续照常使用该服务。了解更多

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

PigActivity

PigActivity 为中的 Pig 脚本提供原生支持, AWS Data Pipeline 无需使用ShellCommandActivityEmrActivity。此外,还 PigActivity 支持数据暂存。在将 stage 字段设置为 true 时, AWS Data Pipeline 会在 Pig 中将输入数据暂存为架构,而无需用户编写其他代码。

示例

以下示例管道说明如何使用 PigActivity。该示例管道执行以下步骤:

  • MyPigActivity1 从 Amazon S3 加载数据并运行一个 Pig 脚本,该脚本选择几列数据并将其上传到 Amazon S3。

  • MyPigActivity2 加载第一个输出,选择几列三行数据,然后将其作为第二个输出上传到 Amazon S3。

  • MyPigActivity3 加载第二个输出数据,向 Amazon 插入两行数据,仅插入名为 “第五行” 的列RDS。

  • MyPigActivity4 加载亚马逊RDS数据,选择第一行数据,然后将其上传到 Amazon S3。

{ "objects": [ { "id": "MyInputData1", "schedule": { "ref": "MyEmrResourcePeriod" }, "directoryPath": "s3://example-bucket/pigTestInput", "name": "MyInputData1", "dataFormat": { "ref": "MyInputDataType1" }, "type": "S3DataNode" }, { "id": "MyPigActivity4", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData3" }, "pipelineLogUri": "s3://example-bucket/path/", "name": "MyPigActivity4", "runsOn": { "ref": "MyEmrResource" }, "type": "PigActivity", "dependsOn": { "ref": "MyPigActivity3" }, "output": { "ref": "MyOutputData4" }, "script": "B = LIMIT ${input1} 1; ${output1} = FOREACH B GENERATE one;", "stage": "true" }, { "id": "MyPigActivity3", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData2" }, "pipelineLogUri": "s3://example-bucket/path", "name": "MyPigActivity3", "runsOn": { "ref": "MyEmrResource" }, "script": "B = LIMIT ${input1} 2; ${output1} = FOREACH B GENERATE Fifth;", "type": "PigActivity", "dependsOn": { "ref": "MyPigActivity2" }, "output": { "ref": "MyOutputData3" }, "stage": "true" }, { "id": "MyOutputData2", "schedule": { "ref": "MyEmrResourcePeriod" }, "name": "MyOutputData2", "directoryPath": "s3://example-bucket/PigActivityOutput2", "dataFormat": { "ref": "MyOutputDataType2" }, "type": "S3DataNode" }, { "id": "MyOutputData1", "schedule": { "ref": "MyEmrResourcePeriod" }, "name": "MyOutputData1", "directoryPath": "s3://example-bucket/PigActivityOutput1", "dataFormat": { "ref": "MyOutputDataType1" }, "type": "S3DataNode" }, { "id": "MyInputDataType1", "name": "MyInputDataType1", "column": [ "First STRING", "Second STRING", "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING", "Ninth STRING", "Tenth STRING" ], "inputRegEx": "^(\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+)", "type": "RegEx" }, { "id": "MyEmrResource", "region": "us-east-1", "schedule": { "ref": "MyEmrResourcePeriod" }, "keyPair": "example-keypair", "masterInstanceType": "m1.small", "enableDebugging": "true", "name": "MyEmrResource", "actionOnTaskFailure": "continue", "type": "EmrCluster" }, { "id": "MyOutputDataType4", "name": "MyOutputDataType4", "column": "one STRING", "type": "CSV" }, { "id": "MyOutputData4", "schedule": { "ref": "MyEmrResourcePeriod" }, "directoryPath": "s3://example-bucket/PigActivityOutput3", "name": "MyOutputData4", "dataFormat": { "ref": "MyOutputDataType4" }, "type": "S3DataNode" }, { "id": "MyOutputDataType1", "name": "MyOutputDataType1", "column": [ "First STRING", "Second STRING", "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING" ], "columnSeparator": "*", "type": "Custom" }, { "id": "MyOutputData3", "username": "___", "schedule": { "ref": "MyEmrResourcePeriod" }, "insertQuery": "insert into #{table} (one) values (?)", "name": "MyOutputData3", "*password": "___", "runsOn": { "ref": "MyEmrResource" }, "connectionString": "jdbc:mysql://example-database-instance:3306/example-database", "selectQuery": "select * from #{table}", "table": "example-table-name", "type": "MySqlDataNode" }, { "id": "MyOutputDataType2", "name": "MyOutputDataType2", "column": [ "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING" ], "type": "TSV" }, { "id": "MyPigActivity2", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData1" }, "pipelineLogUri": "s3://example-bucket/path", "name": "MyPigActivity2", "runsOn": { "ref": "MyEmrResource" }, "dependsOn": { "ref": "MyPigActivity1" }, "type": "PigActivity", "script": "B = LIMIT ${input1} 3; ${output1} = FOREACH B GENERATE Third, Fourth, Fifth, Sixth, Seventh, Eighth;", "output": { "ref": "MyOutputData2" }, "stage": "true" }, { "id": "MyEmrResourcePeriod", "startDateTime": "2013-05-20T00:00:00", "name": "MyEmrResourcePeriod", "period": "1 day", "type": "Schedule", "endDateTime": "2013-05-21T00:00:00" }, { "id": "MyPigActivity1", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyInputData1" }, "pipelineLogUri": "s3://example-bucket/path", "scriptUri": "s3://example-bucket/script/pigTestScipt.q", "name": "MyPigActivity1", "runsOn": { "ref": "MyEmrResource" }, "scriptVariable": [ "column1=First", "column2=Second", "three=3" ], "type": "PigActivity", "output": { "ref": "MyOutputData1" }, "stage": "true" } ] }

pigTestScript.q 的内容如下所示。

B = LIMIT ${input1} $three; ${output1} = FOREACH B GENERATE $column1, $column2, Third, Fourth, Fifth, Sixth, Seventh, Eighth;

语法

对象调用字段 描述 槽类型
schedule 该对象在计划间隔的执行中调用。用户必须指定对另一个对象的计划引用,以便设置该对象的依赖项执行顺序。用户可以通过在对象上显式设置时间表来满足此要求,例如,指定 “schedule”: {"ref”: "DefaultSchedule“}。在大多数情况下,最好将计划引用放在默认管道对象上,以便所有对象继承该计划。或者,如果管道有一个计划树 (计划位于主计划中),用户可以创建具有计划引用的父对象。有关示例可选计划配置的更多信息,请参阅 https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html 参考对象,例如,“日程安排”:{“ref”:” myScheduleId “}

所需的组 (下列选项之一是必需的) 描述 槽类型
script 要运行的 Pig 脚本。 String
scriptUri 要运行的 Pig 脚本的位置(例如 s3://scriptLocation)。 String

所需的组 (下列选项之一是必需的) 描述 槽类型
runsOn EMR PigActivity 运行它的集群。 参考对象,例如 runsOn ““: {" ref”:” myEmrCluster Id "}
workerGroup 工作线程组。这可用于路由任务。如果您提供 runsOn 值并且存在 workerGroup,则将忽略 workerGroup String

可选字段 描述 槽类型
attemptStatus 来自远程活动的最近报告的状态。 String
attemptTimeout 远程工作完成的超时时间。如果设置此字段,则可能会重试未在设定的开始时间内完成的远程活动。 周期
dependsOn 指定与另一个可运行对象的依赖关系。 参考对象,例如 dependsOn ““: {" ref”:” myActivityId “}
failureAndRerun模式 描述依赖项失败或重新运行时的使用者节点行为。 枚举
input 输入数据源。 参考对象,例如,“输入”:{"ref”:” myDataNode Id "}
lateAfterTimeout 管道启动后经过的时间,在此时间内,对象必须完成。仅当计划类型未设置为 ondemand 时才会触发。 周期
maxActiveInstances 组件的并发活动实例的最大数量。重新运行不计入活动实例数中。 整数
maximumRetries 失败后的最大重试次数。 整数
onFail 当前对象失败时要运行的操作。 参考对象,例如 onFail ““: {" ref”:” myActionId “}
onLateAction 在尚未计划对象或对象仍未完成的情况下将触发的操作。 参考对象,例如 onLateAction ““: {" ref”:” myActionId “}
onSuccess 当前对象成功时要运行的操作。 参考对象,例如 onSuccess ““: {" ref”:” myActionId “}
output 输出数据源。 参考对象,例如,“输出”:{"ref”:” myDataNode Id "}
parent 槽将继承自的当前对象的父级。 参考对象,例如,“父对象”:{"ref”:” myBaseObject Id "}
pipelineLogUri 用于上传管道日志的 Amazon S3URI(例如 's3: BucketName ///Key/ ')。 String
postActivityTaskConfig 要运行的活动后配置脚本。它由 Amazon S33 中的 shell 脚本和参数列表组成。URI 参考对象,例如,“postActivityTaskConfig”: {“ref”:” myShellScript ConfigId “}
preActivityTaskConfig 要运行的活动前配置脚本。它由 Amazon S3 中的 shell 脚本和参数列表组成。URI 参考对象,例如,“preActivityTaskConfig”: {“ref”:” myShellScript ConfigId “}
precondition (可选) 定义先决条件。在满足所有先决条件之前,数据节点不会被标记 READY “”。 参考对象,例如,“前提条件”:{“ref”:” myPreconditionId “}
reportProgressTimeout 远程工作对 reportProgress 的连续调用的超时时间。如果设置此字段,则未报告指定时段的进度的远程活动可能会被视为停滞且已重试。 周期
resizeClusterBefore正在跑步 在执行此活动前,重新调整集群的大小,以适应指定为输入或输出的 DynamoDB 数据节点。
注意

如果您的活动使用DynamoDBDataNode作为输入或输出数据节点,并且将设置resizeClusterBeforeRunningTRUE,则 AWS Data Pipeline 开始使用m3.xlarge实例类型。这将使用 m3.xlarge 覆盖您的实例类型,从而可能会增加您的月度成本。

布尔值
resizeClusterMax实例 调整大小算法可以请求的最大实例数的限制。 整数
retryDelay 两次重试之间的超时时间。 周期
scheduleType 计划类型允许您指定应在间隔的结尾还是开头计划您管道定义中的对象。时间序列风格计划表示在每次间隔的结尾计划实例,而 Cron 风格计划表示应在每次间隔的开头计划实例。按需计划让您可以在每次激活时运行一次管道。这意味着,您不需要克隆或重新创建管道以再次运行它。如果您使用按需计划,则必须在默认对象中指定该计划,并且必须是唯一为管道中的对象 scheduleType 指定的计划。要使用按需管道,您只需为后续每次运行调用该 ActivatePipeline 操作即可。值包括:cron、ondemand 和 timeseries。 枚举
scriptVariable 要传递到 Pig 脚本的参数。您可以 scriptVariable 与脚本一起使用或scriptUri。 String
stage 确定是否启用暂存功能并允许您的 Pig 脚本访问分阶段数据表,例如 $ {INPUT1} 和 $ {}。OUTPUT1 布尔值

运行时字段 描述 槽类型
@activeInstances 当前计划的有效实例对象的列表。 参考对象,例如 activeInstances ““: {" ref”:” myRunnableObject Id "}
@actualEndTime 该对象的执行完成时间。 DateTime
@actualStartTime 该对象的执行开始时间。 DateTime
cancellationReason cancellationReason 如果此对象已取消,则为。 String
@cascadeFailedOn 对象在其上失败的依赖项链的描述。 参考对象,例如 cascadeFailedOn ““: {" ref”:” myRunnableObject Id "}
emrStepLog Amazon EMR 步骤日志仅在EMR活动尝试时可用。 String
errorId errorId 如果此对象失败,则为。 String
errorMessage errorMessage 如果此对象失败,则为。 String
errorStackTrace 该对象失败时显示的错误堆栈跟踪。 String
@finishedTime 该对象完成其执行的时间。 DateTime
hadoopJobLog Hadoop 作业日志可用于尝试进行EMR基于活动的情况。 String
@healthStatus 对象的运行状况,反映进入终止状态的上个对象实例成功还是失败。 String
@healthStatusFromInstanceId 进入终止状态的上个实例对象的 ID。 String
@ T healthStatusUpdated ime 上次更新运行状况的时间。 DateTime
hostname 已执行任务尝试的客户端的主机名。 String
@lastDeactivatedTime 上次停用该对象的时间。 DateTime
@ T latestCompletedRun ime 已完成执行的最新运行的时间。 DateTime
@latestRunTime 已计划执行的最新运行的时间。 DateTime
@nextRunTime 计划下次运行的时间。 DateTime
reportProgressTime 远程活动报告进度的最近时间。 DateTime
@scheduledEndTime 对象的计划结束时间。 DateTime
@scheduledStartTime 对象的计划开始时间。 DateTime
@status 该对象的状态。 String
@version 用于创建对象的管道版本。 String
@waitingOn 该对象在其上处于等待状态的依赖项列表的描述。 参考对象,例如 waitingOn ““: {" ref”:” myRunnableObject Id "}

系统字段 描述 槽类型
@error 用于描述格式不正确的对象的错误消息。 String
@pipelineId 该对象所属的管道的 ID。 String
@sphere 对象的范围指明对象在生命周期中的位置:组件对象产生实例对象,后者执行尝试对象。 String

另请参阅