AWS Data Pipeline ya no está disponible para nuevos clientes. Clientes actuales de AWS Data Pipeline pueden seguir utilizando el servicio con normalidad. Más información
Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Copia de datos de MySQL mediante la línea de comandos
Cree una canalización para copiar datos de una tabla de MySQL en un archivo en un bucket de Amazon S3.
Requisitos previos
Debe seguir estos pasos antes de comenzar:
-
Instale y configure la interfaz de línea de comandos (CLI). Para obtener más información, consulte Acceder AWS Data Pipeline.
-
Asegúrese de que existan los roles de IAM denominados DataPipelineDefaultRole y DataPipelineDefaultResourceRole La consola AWS Data Pipeline crea estos roles automáticamente. Si no ha utilizado la consola AWS Data Pipeline al menos una vez, debe crear estos roles manualmente. Para obtener más información, consulte Roles de IAM para AWS Data Pipeline.
-
Configure un bucket de Amazon S3 y una instancia de Amazon RDS. Para obtener más información, consulte Antes de empezar.
Definir una canalización en formato JSON
Este escenario de ejemplo muestra cómo utilizar definiciones de la canalización de JSON y la CLI de AWS Data Pipeline para copiar datos (filas) de una tabla en una base de datos MySQL en un archivo CSV (valores separados por comas) en un bucket de Amazon S3 en un intervalo de tiempo especificado.
Este es el archivo JSON de definición de la canalización completo seguido de una explicación de cada una de sus secciones.
nota
Le recomendamos que use un editor de texto que pueda ayudarle a comprobar la sintaxis de los archivos con formato JSON y que asigne un nombre al archivo con la extensión .json.
{ "objects": [ { "id": "ScheduleId113", "startDateTime": "2013-08-26T00:00:00", "name": "My Copy Schedule", "type": "Schedule", "period": "1 Days" }, { "id": "CopyActivityId112", "input": { "ref": "MySqlDataNodeId115" }, "schedule": { "ref": "ScheduleId113" }, "name": "My Copy", "runsOn": { "ref": "Ec2ResourceId116" }, "onSuccess": { "ref": "ActionId1" }, "onFail": { "ref": "SnsAlarmId117" }, "output": { "ref": "S3DataNodeId114" }, "type": "CopyActivity" }, { "id": "S3DataNodeId114", "schedule": { "ref": "ScheduleId113" }, "filePath": "s3://
example-bucket
/rds-output
/output
.csv", "name": "My S3 Data", "type": "S3DataNode" }, { "id": "MySqlDataNodeId115", "username": "my-username
", "schedule": { "ref": "ScheduleId113" }, "name": "My RDS Data", "*password": "my-password
", "table": "table-name
", "connectionString": "jdbc:mysql://your-sql-instance-name
.id
.region-name
.rds.amazonaws.com:3306/database-name
", "selectQuery": "select * from #{table}", "type": "SqlDataNode" }, { "id": "Ec2ResourceId116", "schedule": { "ref": "ScheduleId113" }, "name": "My EC2 Resource", "role": "DataPipelineDefaultRole", "type": "Ec2Resource", "resourceRole": "DataPipelineDefaultResourceRole" }, { "message": "This is a success message.", "id": "ActionId1", "subject": "RDS to S3 copy succeeded!", "name": "My Success Alarm", "role": "DataPipelineDefaultRole", "topicArn": "arn:aws:sns:us-east-1:123456789012:example-topic
", "type": "SnsAlarm" }, { "id": "Default", "scheduleType": "timeseries", "failureAndRerunMode": "CASCADE", "name": "Default", "role": "DataPipelineDefaultRole", "resourceRole": "DataPipelineDefaultResourceRole" }, { "message": "There was a problem executing #{node.name} at for period #{node.@scheduledStartTime} to #{node.@scheduledEndTime}", "id": "SnsAlarmId117", "subject": "RDS to S3 copy failed", "name": "My Failure Alarm", "role": "DataPipelineDefaultRole", "topicArn": "arn:aws:sns:us-east-1:123456789012:example-topic
", "type": "SnsAlarm" } ] }
Nodo de datos MySQL
El componente de entrada MySqlDataNode de la canalización define una ubicación para los datos de entrada; en este caso, una instancia de Amazon RDS. El componente MySqlDataNode de la entrada se define por medio de los siguientes campos:
{ "id": "MySqlDataNodeId115", "username": "
my-username
", "schedule": { "ref": "ScheduleId113" }, "name": "My RDS Data", "*password": "my-password
", "table": "table-name
", "connectionString": "jdbc:mysql://your-sql-instance-name
.id
.region-name
.rds.amazonaws.com:3306/database-name
", "selectQuery": "select * from #{table}", "type": "SqlDataNode" },
- Id
El nombre definido por el usuario, que es una etiqueta solo con fines de referencia.
- Nombre de usuario
El nombre de usuario de la cuenta de la base de datos que tiene permisos suficientes para recuperar los datos de la tabla de base de datos. Sustituya
my-username
por el nombre de su cuenta de usuario.- Programación
Una referencia al componente de programación que creamos en las líneas anteriores del archivo JSON.
- Nombre
El nombre definido por el usuario, que es una etiqueta solo con fines de referencia.
- *Password
La contraseña de la cuenta de la base de datos con el asterisco como prefijo para indicar que AWS Data Pipeline debe cifrar el valor de la contraseña. Sustituya
my-password
por la contraseña correcta para su cuenta de usuario. El campo de la contraseña está precedido por el carácter especial del asterisco. Para obtener más información, consulte Caracteres especiales.- Tabla
El nombre de la tabla de base datos que contiene los datos que se van a copiar. Sustituya
table-name
por el nombre de la tabla de base de datos.- connectionString
La cadena de conexión JDBC para el objeto CopyActivity que se va a conectar a la base de datos.
- selectQuery
Una consulta SQL SELECT válida que especifica qué datos se van a copiar de la tabla de base de datos. Tenga en cuenta que
#{table}
es una expresión que reutiliza el nombre de la tabla proporcionado por la variable "table" en las líneas anteriores del archivo JSON.- Tipo
El tipo SqlDataNode, que es una instancia de Amazon RDS que utiliza MySQL en este ejemplo.
nota
El tipo MySqlDataNode está obsoleto. Aunque puede seguir usando MySqlDataNode, le recomendamos que utilice SqlDataNode.
Nodo de datos Amazon S3
A continuación, el componente de canalización S3Output define una ubicación para el archivo de salida; en este caso, un archivo CSV en una ubicación de bucket de Amazon S3. El componente S3DataNode de salida se define por los siguientes campos:
{ "id": "S3DataNodeId114", "schedule": { "ref": "ScheduleId113" }, "filePath": "s3://
example-bucket
/rds-output
/output
.csv", "name": "My S3 Data", "type": "S3DataNode" },
- Id
El ID definido por el usuario, que es una etiqueta solo con fines de referencia.
- Programación
Una referencia al componente de programación que creamos en las líneas anteriores del archivo JSON.
- filePath
La ruta a los datos asociados al nodo de datos, que es un archivo de salida CSV en este ejemplo.
- Nombre
El nombre definido por el usuario, que es una etiqueta solo con fines de referencia.
- Tipo
El tipo de objeto de canalización, que es S3DataNode para que coincida con la ubicación donde residen los datos, en un bucket de Amazon S3.
Recurso
Esta es una definición del recurso informático que realiza la operación de copia. En este ejemplo, AWS Data Pipeline debe crear automáticamente una instancia EC2 para realizar la tarea de copia y terminar el recurso tras completarse la tarea. Los campos definidos aquí controlan la creación y función de la instancia EC2 que realiza el trabajo. El componente Resource se define por los siguientes campos:
{ "id": "Ec2ResourceId116", "schedule": { "ref": "ScheduleId113" }, "name": "My EC2 Resource", "role": "DataPipelineDefaultRole", "type": "Ec2Resource", "resourceRole": "DataPipelineDefaultResourceRole" },
- Id
El ID definido por el usuario, que es una etiqueta solo con fines de referencia.
- Programación
El programa en el que desea crear este recurso informático.
- Nombre
El nombre definido por el usuario, que es una etiqueta solo con fines de referencia.
- Rol
El rol de IAM de la cuenta que tiene acceso a recursos, como el acceso a un bucket de Amazon S3 para recuperar datos.
- Tipo
El tipo de recurso informático para realizar el trabajo; en este caso, una instancia EC2. Hay otros tipos de recursos disponibles, como un tipo EmrCluster.
- resourceRole
El rol de IAM de la cuenta que crea recursos, como la creación y configuración de una instancia EC2 en su nombre. resourceRole
Actividad
La última sección del archivo JSON es la definición de la actividad que representa el trabajo que se realizará. En este caso, utilizamos un componente CopyActivity para copiar datos de un archivo en un bucket de Amazon S3 en otro archivo. El componente CopyActivity se define por los siguientes campos:
{ "id": "CopyActivityId112", "input": { "ref": "MySqlDataNodeId115" }, "schedule": { "ref": "ScheduleId113" }, "name": "My Copy", "runsOn": { "ref": "Ec2ResourceId116" }, "onSuccess": { "ref": "ActionId1" }, "onFail": { "ref": "SnsAlarmId117" }, "output": { "ref": "S3DataNodeId114" }, "type": "CopyActivity" },
- Id
El ID definido por el usuario, que es una etiqueta solo con fines de referencia.
- Input
La ubicación de los datos de MySQL que se van a copiar.
- Programación
La programación en la que ejecutar esta actividad.
- Nombre
El nombre definido por el usuario, que es una etiqueta solo con fines de referencia.
- runsOn
El recurso informático que realiza el trabajo que define esta actividad. En este ejemplo, proporcionamos una referencia a la instancia de EC2 definidos anteriormente. El uso del campo
runsOn
conlleva que AWS Data Pipeline cree la instancia EC2 automáticamente. El camporunsOn
indica que el recurso existe en la infraestructura de AWS, mientras que el valor de workerGroup indica que desea usar sus propios recursos locales para realizar el trabajo.- onSuccess
La SnsAlarm que se va a enviar si la actividad se realiza de forma correcta.
- onFail
La SnsAlarm que se va a enviar si la actividad no se realiza de forma correcta.
- Output
La ubicación en Amazon S3 del archivo de salida CSV.
- Tipo
El tipo de actividad que se va a realizar.
Cargar y activar la definición de canalización
Debe cargar la definición de su canalización y activarla. En los siguientes comandos de ejemplo, reemplace pipeline_name
por una etiqueta para su canalización y pipeline_file
por la ruta completa para el archivo de definición de canalización..json
AWS CLI
Para crear su definición de canalización y activarla, use el siguiente comando create-pipeline. Anote el ID de su canalización, ya que utilizará este valor con la mayoría de los comandos de la CLI.
aws datapipeline create-pipeline --name
{ "pipelineId": "df-00627471SOVYZEXAMPLE" }pipeline_name
--unique-idtoken
Para cargar su definición de la canalización, utilice el comando siguiente: put-pipeline-definition.
aws datapipeline put-pipeline-definition --pipeline-id df-00627471SOVYZEXAMPLE --pipeline-definition file://MyEmrPipelineDefinition.json
Si la canalización se valida correctamente, el campo validationErrors
estará vacío. Debe revisar todas las advertencias.
Para activar la canalización, utilice el siguiente comando activate-pipeline:
aws datapipeline activate-pipeline --pipeline-id df-00627471SOVYZEXAMPLE
Puede comprobar que su canalización aparece en la lista de canalizaciones mediante el siguiente comando list-pipelines.
aws datapipeline list-pipelines