AWS Data Pipeline ya no está disponible para nuevos clientes. Los clientes actuales de AWS Data Pipeline pueden seguir utilizando el servicio con normalidad. Más información
Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
PigActivity
PigActivity proporciona soporte nativo para los scripts de Pig AWS Data Pipeline sin la necesidad de usar ShellCommandActivity
oEmrActivity
. Además, PigActivity admite la puesta en escena de datos. Cuando el campo de uso transitorio se establece en true, AWS Data Pipeline almacena de modo transitorio los datos de entrada como un esquema en Pig sin código adicional del usuario.
Ejemplo
En la siguiente canalización de ejemplo se muestra cómo utilizar PigActivity
. En la canalización de ejemplo se ejecutan los siguientes pasos:
-
MyPigActivity1 carga datos de Amazon S3 y ejecuta un script Pig que selecciona algunas columnas de datos y las carga en Amazon S3.
-
MyPigActivity2 carga la primera salida, selecciona algunas columnas y tres filas de datos y la carga en Amazon S3 como segunda salida.
-
MyPigActivity3 carga los segundos datos de salida, inserta dos filas de datos y solo la columna denominada «quinta» en AmazonRDS.
-
MyPigActivity4 carga RDS los datos de Amazon, selecciona la primera fila de datos y los carga en Amazon S3.
{ "objects": [ { "id": "MyInputData1", "schedule": { "ref": "MyEmrResourcePeriod" }, "directoryPath": "s3://
example-bucket
/pigTestInput", "name": "MyInputData1", "dataFormat": { "ref": "MyInputDataType1" }, "type": "S3DataNode" }, { "id": "MyPigActivity4", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData3" }, "pipelineLogUri": "s3://example-bucket/path
/", "name": "MyPigActivity4", "runsOn": { "ref": "MyEmrResource" }, "type": "PigActivity", "dependsOn": { "ref": "MyPigActivity3" }, "output": { "ref": "MyOutputData4" }, "script": "B = LIMIT ${input1} 1; ${output1} = FOREACH B GENERATE one;", "stage": "true" }, { "id": "MyPigActivity3", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData2" }, "pipelineLogUri": "s3://example-bucket/path
", "name": "MyPigActivity3", "runsOn": { "ref": "MyEmrResource" }, "script": "B = LIMIT ${input1} 2; ${output1} = FOREACH B GENERATE Fifth;", "type": "PigActivity", "dependsOn": { "ref": "MyPigActivity2" }, "output": { "ref": "MyOutputData3" }, "stage": "true" }, { "id": "MyOutputData2", "schedule": { "ref": "MyEmrResourcePeriod" }, "name": "MyOutputData2", "directoryPath": "s3://example-bucket
/PigActivityOutput2", "dataFormat": { "ref": "MyOutputDataType2" }, "type": "S3DataNode" }, { "id": "MyOutputData1", "schedule": { "ref": "MyEmrResourcePeriod" }, "name": "MyOutputData1", "directoryPath": "s3://example-bucket
/PigActivityOutput1", "dataFormat": { "ref": "MyOutputDataType1" }, "type": "S3DataNode" }, { "id": "MyInputDataType1", "name": "MyInputDataType1", "column": [ "First STRING", "Second STRING", "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING", "Ninth STRING", "Tenth STRING" ], "inputRegEx": "^(\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+)", "type": "RegEx" }, { "id": "MyEmrResource", "region": "us-east-1", "schedule": { "ref": "MyEmrResourcePeriod" }, "keyPair": "example-keypair
", "masterInstanceType": "m1.small", "enableDebugging": "true", "name": "MyEmrResource", "actionOnTaskFailure": "continue", "type": "EmrCluster" }, { "id": "MyOutputDataType4", "name": "MyOutputDataType4", "column": "one STRING", "type": "CSV" }, { "id": "MyOutputData4", "schedule": { "ref": "MyEmrResourcePeriod" }, "directoryPath": "s3://example-bucket
/PigActivityOutput3", "name": "MyOutputData4", "dataFormat": { "ref": "MyOutputDataType4" }, "type": "S3DataNode" }, { "id": "MyOutputDataType1", "name": "MyOutputDataType1", "column": [ "First STRING", "Second STRING", "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING" ], "columnSeparator": "*", "type": "Custom" }, { "id": "MyOutputData3", "username": "___", "schedule": { "ref": "MyEmrResourcePeriod" }, "insertQuery": "insert into #{table} (one) values (?)", "name": "MyOutputData3", "*password": "___", "runsOn": { "ref": "MyEmrResource" }, "connectionString": "jdbc:mysql://example-database-instance
:3306/example-database
", "selectQuery": "select * from #{table}", "table": "example-table-name
", "type": "MySqlDataNode" }, { "id": "MyOutputDataType2", "name": "MyOutputDataType2", "column": [ "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING" ], "type": "TSV" }, { "id": "MyPigActivity2", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData1" }, "pipelineLogUri": "s3://example-bucket/path
", "name": "MyPigActivity2", "runsOn": { "ref": "MyEmrResource" }, "dependsOn": { "ref": "MyPigActivity1" }, "type": "PigActivity", "script": "B = LIMIT ${input1} 3; ${output1} = FOREACH B GENERATE Third, Fourth, Fifth, Sixth, Seventh, Eighth;", "output": { "ref": "MyOutputData2" }, "stage": "true" }, { "id": "MyEmrResourcePeriod", "startDateTime": "2013-05-20T00:00:00", "name": "MyEmrResourcePeriod", "period": "1 day", "type": "Schedule", "endDateTime": "2013-05-21T00:00:00" }, { "id": "MyPigActivity1", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyInputData1" }, "pipelineLogUri": "s3://example-bucket/path
", "scriptUri": "s3://example-bucket
/script/pigTestScipt.q", "name": "MyPigActivity1", "runsOn": { "ref": "MyEmrResource" }, "scriptVariable": [ "column1=First", "column2=Second", "three=3" ], "type": "PigActivity", "output": { "ref": "MyOutputData1" }, "stage": "true" } ] }
El contenido de pigTestScript.q
es el siguiente.
B = LIMIT ${input1} $three; ${output1} = FOREACH B GENERATE $column1, $column2, Third, Fourth, Fifth, Sixth, Seventh, Eighth;
Sintaxis
Campos de invocación de objetos | Descripción | Tipo de slot |
---|---|---|
schedule | Este objeto se invoca dentro de la ejecución de un intervalo de programación. Los usuarios deben especificar una referencia de programación a otro objeto para establecer el orden de ejecución de dependencia para este objeto. Los usuarios pueden cumplir este requisito estableciendo explícitamente una programación en el objeto, por ejemplo, especificando «schedule»: {"ref»: "DefaultSchedule«}. En la mayoría de los casos, es mejor poner la referencia de programación en el objeto de la canalización predeterminado de modo que todos los objetos hereden ese programa. O bien, si la canalización tiene un árbol de programas (programas dentro del programa maestro), los usuarios pueden crear un objeto principal que tenga una referencia de programación. Para obtener más información acerca de las configuraciones de programación opcionales de ejemplo, consulte https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html | Objeto de referencia, por ejemplo, «schedule»: {"ref»:» myScheduleId «} |
Grupo obligatorio (se requiere uno de los siguientes) | Descripción | Tipo de slot |
---|---|---|
script | El script de Pig que se ejecutará. | Cadena |
scriptUri | La ubicación del script de Pig que se va a ejecutar (por ejemplo, s3://scriptLocation). | Cadena |
Grupo obligatorio (se requiere uno de los siguientes) | Descripción | Tipo de slot |
---|---|---|
runsOn | EMRClúster en el que PigActivity se ejecuta. | Objeto de referencia, por ejemplo, "runsOn«: {" ref»:» myEmrCluster Id "} |
workerGroup | El grupo de procesos de trabajo. Este se usa para dirigir tareas. Si proporciona un valor runsOn y existe workerGroup , workerGroup se ignora. |
Cadena |
Campos opcionales | Descripción | Tipo de slot |
---|---|---|
attemptStatus | El estado más reciente notificado por la actividad remota. | Cadena |
attemptTimeout | El tiempo de espera para que se complete el trabajo remoto. Si se establece, se puede reintentar una actividad remota que no se complete dentro del tiempo de inicio establecido. | Período |
dependsOn | Especifica la dependencia de otro objeto ejecutable. | Objeto de referencia, por ejemplo, "dependsOn«: {" ref»:» myActivityId «} |
failureAndRerunModo | failureAndRerunMode. | Enumeración |
input | El origen de datos de entrada. | Objeto de referencia, por ejemplo, «input»: {"ref»:» myDataNode Id "} |
lateAfterTimeout | El tiempo transcurrido desde el inicio de la canalización dentro del cual el objeto debe completarse. Solo se activa cuando el tipo de programación no está establecido en ondemand . |
Período |
maxActiveInstances | El número máximo de instancias activas simultáneas de un componente. Las nuevas ejecuciones no cuentan para el número de instancias activas. | Entero |
maximumRetries | El número máximo de intentos en caso de error. | Entero |
onFail | Acción que se debe ejecutar cuando el objeto actual produzca un error. | Objeto de referencia, por ejemplo, "onFail«: {" ref»:» myActionId «} |
onLateAction | Acciones que deben iniciarse si un objeto todavía no se ha programado o no se ha completado. | Objeto de referencia, por ejemplo, "onLateAction«: {" ref»:» myActionId «} |
onSuccess | Acción que se debe ejecutar cuando el objeto actual se complete correctamente. | Objeto de referencia, por ejemplo, "onSuccess«: {" ref»:» myActionId «} |
salida | El origen de datos de salida. | Objeto de referencia, por ejemplo, «output»: {"ref»:» myDataNode Id "} |
parent | Elemento principal del objeto actual del que se heredarán los slots. | Objeto de referencia, por ejemplo, «parent»: {"ref»:» myBaseObject Id "} |
pipelineLogUri | Amazon S3 URI (como 's3://BucketName/Key/ ') para cargar los registros de la canalización. | Cadena |
postActivityTaskConfig | Script de configuración después de la actividad que se va a ejecutar. Consiste en un script URI de shell de Amazon S33 y una lista de argumentos. | Objeto de referencia, por ejemplo, "postActivityTaskConfig»: {"ref»:» myShellScript ConfigId «} |
preActivityTaskConfig | Script de configuración antes de la actividad que se va a ejecutar. Consiste en un script URI de shell de Amazon S3 y una lista de argumentos. | Objeto de referencia, por ejemplo, "preActivityTaskConfig»: {"ref»:» myShellScript ConfigId «} |
precondition | Opcionalmente, defina una condición previa. Un nodo de datos no se marca como "READY" hasta que se hayan cumplido todas las condiciones previas. | Objeto de referencia, por ejemplo, «condición previa»: {"ref»:» myPreconditionId «} |
reportProgressTimeout | El tiempo de espera para llamadas sucesivas del trabajo remoto a reportProgress . Si se establece, las actividades remotas que no informen de su progreso durante el período especificado pueden considerarse estancadas y, en consecuencia, reintentarse. |
Período |
resizeClusterBeforeEn ejecución | Cambiar el tamaño del clúster antes de realizar esta actividad para adaptarse a los nodos de datos de DynamoDB especificados como entradas o salidas.notaSi tu actividad usa un |
Booleano |
resizeClusterMaxInstancias | Un límite del número máximo de instancias que el algoritmo de cambio de tamaño puede solicitar. | Entero |
retryDelay | Duración del tiempo de espera entre dos reintentos. | Período |
scheduleType | El tipo de programa le permite especificar si los objetos de la definición de la canalización deben programarse al principio del intervalo o al final de este. La programación de estilo de serie temporal significa que las instancias se programan al final de cada intervalo y la programación de estilo cron significa que las instancias se programan al principio de cada intervalo. Un programa bajo demanda le permite ejecutar una canalización una vez por activación. Esto significa que no tiene que clonar o recrear la canalización para ejecutarla de nuevo. Si utiliza una programación bajo demanda, debe especificarse en el objeto predeterminado y debe ser la única scheduleType especificada para los objetos en proceso. Para utilizar canalizaciones bajo demanda, basta con llamar a la ActivatePipeline operación para cada ejecución posterior. Los valores son: cron, ondemand y timeseries. | Enumeración |
scriptVariable | Los argumentos que se pasan al script de Pig. Se puede utilizar scriptVariable con un script oscriptUri. | Cadena |
stage | Determina si la puesta en escena está habilitada y permite que tu script de Pig tenga acceso a las tablas de datos escalonados, como $ {INPUT1} y $ {}. OUTPUT1 | Booleano |
Campos de tiempo de ejecución | Descripción | Tipo de slot |
---|---|---|
@activeInstances | Lista de los objetos de instancias activas programados actualmente. | Objeto de referencia, por ejemplo, "activeInstances«: {" ref»:» myRunnableObject Id "} |
@actualEndTime | La hora a la que finalizó la ejecución de este objeto. | DateTime |
@actualStartTime | La hora a la que comenzó la ejecución de este objeto. | DateTime |
cancellationReason | El cancellationReason si este objeto se ha cancelado. | Cadena |
@cascadeFailedOn | Descripción de la cadena de dependencia en la que ha fallado el objeto. | Objeto de referencia, por ejemplo, "cascadeFailedOn«: {" ref»:» myRunnableObject Id "} |
emrStepLog | Los registros de EMR pasos de Amazon solo están disponibles en los intentos de EMR actividad. | Cadena |
errorId | El errorId si este objeto falló. | Cadena |
errorMessage | El errorMessage si este objeto falló. | Cadena |
errorStackTrace | El seguimiento de la pila de error si este objeto ha fallado. | Cadena |
@finishedTime | La hora a la que este objeto finalizó su ejecución. | DateTime |
hadoopJobLog | Los registros de trabajos de Hadoop están disponibles cuando se intenta realizar actividades EMR basadas. | Cadena |
@healthStatus | El estado de salud del objeto que refleja el éxito o el fracaso de la última instancia de objeto que alcanzó un estado terminado. | Cadena |
@healthStatusFromInstanceId | ID del último objeto de instancia que alcanzó un estado terminado. | Cadena |
@ Hora healthStatusUpdated | Hora a la que el estado de salud se actualizó la última vez. | DateTime |
hostname | El nombre de host del cliente que recogió el intento de tarea. | Cadena |
@lastDeactivatedTime | La hora a la que este objeto se desactivó la última vez. | DateTime |
@ latestCompletedRun Hora | Hora de la última ejecución para la que se completó la ejecución. | DateTime |
@latestRunTime | Hora de la última ejecución para la que se programó la ejecución. | DateTime |
@nextRunTime | Hora de ejecución que se va a programar a continuación. | DateTime |
reportProgressTime | La hora más reciente a la que la actividad remota notificó algún progreso. | DateTime |
@scheduledEndTime | Hora de finalización programada para el objeto. | DateTime |
@scheduledStartTime | Hora de comienzo programada para el objeto. | DateTime |
@status | El estado de este objeto. | Cadena |
@version | Versión de la canalización con la que se creó el objeto. | Cadena |
@waitingOn | Descripción de la lista de dependencias de la que este objeto está a la espera. | Objeto de referencia, por ejemplo, "waitingOn«: {" ref»:» myRunnableObject Id "} |
Campos del sistema | Descripción | Tipo de slot |
---|---|---|
@error | Error al describir el objeto mal estructurado. | Cadena |
@pipelineId | ID de la canalización a la que pertenece este objeto. | Cadena |
@sphere | La esfera de un objeto denota su lugar en el ciclo de vida: los objetos de componente dan lugar a objetos de instancia que ejecutan objetos de intento. | Cadena |