AWS Data Pipeline 不再向新客户提供。的现有客户 AWS Data Pipeline 可以继续照常使用该服务。了解更多
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用命令行复制 MySQL 数据
您可以创建管道,将数据从 MySQL 表复制到 Amazon S3 存储桶中的文件。
先决条件
在开始本教程之前,您必须完成以下步骤:
-
安装和配置命令行界面 (CLI)。有关更多信息,请参阅正在访问 AWS Data Pipeline。
-
确保名为 DataPipelineDefaultRole 和 DataPipelineDefaultResourceRole 的 IAM 角色存在。AWS Data Pipeline 控制台会自动为您创建这些角色。如果您一次也没有使用过 AWS Data Pipeline 控制台,则必须手动创建这些角色。有关更多信息,请参阅适用于 AWS Data Pipeline 的 IAM 角色。
-
设置 Amazon S3 存储桶和 Amazon RDS 实例。有关更多信息,请参阅开始前的准备工作。
以 JSON 格式定义管道
此示例场景显示如何使用 JSON 管道定义和 AWS Data Pipeline CLI 按指定的时间间隔,将数据(行)从 MySQL 数据库中的表复制到 Amazon S3 存储桶中的 CSV(逗号分隔值)文件。
这是完整管道定义 JSON 文件,其每个部分后跟说明。
注意
我们建议您使用文本编辑器,这可帮助您验证 JSON 格式文件的语法,并使用 .json 文件扩展名来命名文件。
{ "objects": [ { "id": "ScheduleId113", "startDateTime": "2013-08-26T00:00:00", "name": "My Copy Schedule", "type": "Schedule", "period": "1 Days" }, { "id": "CopyActivityId112", "input": { "ref": "MySqlDataNodeId115" }, "schedule": { "ref": "ScheduleId113" }, "name": "My Copy", "runsOn": { "ref": "Ec2ResourceId116" }, "onSuccess": { "ref": "ActionId1" }, "onFail": { "ref": "SnsAlarmId117" }, "output": { "ref": "S3DataNodeId114" }, "type": "CopyActivity" }, { "id": "S3DataNodeId114", "schedule": { "ref": "ScheduleId113" }, "filePath": "s3://
example-bucket
/rds-output
/output
.csv", "name": "My S3 Data", "type": "S3DataNode" }, { "id": "MySqlDataNodeId115", "username": "my-username
", "schedule": { "ref": "ScheduleId113" }, "name": "My RDS Data", "*password": "my-password
", "table": "table-name
", "connectionString": "jdbc:mysql://your-sql-instance-name
.id
.region-name
.rds.amazonaws.com:3306/database-name
", "selectQuery": "select * from #{table}", "type": "SqlDataNode" }, { "id": "Ec2ResourceId116", "schedule": { "ref": "ScheduleId113" }, "name": "My EC2 Resource", "role": "DataPipelineDefaultRole", "type": "Ec2Resource", "resourceRole": "DataPipelineDefaultResourceRole" }, { "message": "This is a success message.", "id": "ActionId1", "subject": "RDS to S3 copy succeeded!", "name": "My Success Alarm", "role": "DataPipelineDefaultRole", "topicArn": "arn:aws:sns:us-east-1:123456789012:example-topic
", "type": "SnsAlarm" }, { "id": "Default", "scheduleType": "timeseries", "failureAndRerunMode": "CASCADE", "name": "Default", "role": "DataPipelineDefaultRole", "resourceRole": "DataPipelineDefaultResourceRole" }, { "message": "There was a problem executing #{node.name} at for period #{node.@scheduledStartTime} to #{node.@scheduledEndTime}", "id": "SnsAlarmId117", "subject": "RDS to S3 copy failed", "name": "My Failure Alarm", "role": "DataPipelineDefaultRole", "topicArn": "arn:aws:sns:us-east-1:123456789012:example-topic
", "type": "SnsAlarm" } ] }
MySQL 数据节点
输入 MySqlDataNode 管道组件定义输入数据的位置;在本例中是 Amazon RDS 实例。输入 MySqlDataNode 组件由以下字段定义:
{ "id": "MySqlDataNodeId115", "username": "
my-username
", "schedule": { "ref": "ScheduleId113" }, "name": "My RDS Data", "*password": "my-password
", "table": "table-name
", "connectionString": "jdbc:mysql://your-sql-instance-name
.id
.region-name
.rds.amazonaws.com:3306/database-name
", "selectQuery": "select * from #{table}", "type": "SqlDataNode" },
- Id
用户定义名称,这是仅供您参考的标签。
- Username
数据库账户的用户名,该账户具有足够的权限从数据库表检索数据。使用您用户的名称替换
my-username
。- 计划
对我们在 JSON 文件前面行中创建的计划组件的引用。
- 名称
用户定义名称,这是仅供您参考的标签。
- *密码
数据库账户密码带有星号前缀,指示 AWS Data Pipeline 必须加密密码值。使用您的用户的正确密码替换
my-password
。密码字段前面带有星号特殊字符。有关更多信息,请参阅特殊字符。- 表
包含要复制的数据的数据库表的名称。使用您的数据库表名称替换
table-name
。- connectionString
连接到数据库的 CopyActivity 对象的 JDBC 连接字符串。
- selectQuery
有效的 SQL SELECT 查询,用于指定要从数据库表复制的数据。请注意,
#{table}
是重新使用在 JSON 文件前面行中“table”变量提供的表名的表达式。- 类型
SqlDataNode 类型,在本例中是使用 MySQL 的 Amazon RDS 实例。
注意
MySqlDataNode 类型已被弃用。尽管您仍然可以使用 MySqlDataNode,我们建议您使用 SqlDataNode。
Amazon S3 数据节点
接下来,S3Output 管道组件定义输出文件的位置;在这种情况下是 Amazon S3 存储桶位置中的 CSV 文件。输出 S3DataNode 组件由以下字段定义:
{ "id": "S3DataNodeId114", "schedule": { "ref": "ScheduleId113" }, "filePath": "s3://
example-bucket
/rds-output
/output
.csv", "name": "My S3 Data", "type": "S3DataNode" },
- Id
用户定义 ID,这是仅供您参考的标签。
- 计划
对我们在 JSON 文件前面行中创建的计划组件的引用。
- filePath
与数据节点关联的数据的路径,在本示例中是一个 CSV 输出文件。
- 名称
用户定义名称,这是仅供您参考的标签。
- 类型
管道对象类型,为 S3DataNode,与 Amazon S3 存储桶中数据所在位置匹配。
资源
这是执行复制操作的计算资源的定义。在本示例中,AWS Data Pipeline 应该自动创建 EC2 实例来执行复制任务,并在任务完成后终止资源。此处定义的字段控制完成工作的 EC2 实例的创建操作及其功能。EC2Resource 由以下字段定义:
{ "id": "Ec2ResourceId116", "schedule": { "ref": "ScheduleId113" }, "name": "My EC2 Resource", "role": "DataPipelineDefaultRole", "type": "Ec2Resource", "resourceRole": "DataPipelineDefaultResourceRole" },
- Id
用户定义 ID,这是仅供您参考的标签。
- 计划
根据它来创建此计算资源的计划。
- 名称
用户定义名称,这是仅供您参考的标签。
- 角色
访问资源的账户的 IAM 角色,例如访问 Amazon S3 存储桶检索数据。
- 类型
执行工作的计算资源的类型;在这种情况下为 EC2 实例。还有其他资源类型可用,如 EmrCluster 类型。
- resourceRole
创建资源的账户的 IAM 角色,如代表您创建和配置 EC2 实例。Role 和 ResourceRole 可以是相同角色,但在安全配置中可分别提供更细粒度。
活动
JSON 文件的最后一个部分是活动的定义,表示要执行的工作。在本例中,我们使用 CopyActivity 组件将数据从 Amazon S3 存储桶中的文件复制到另一个文件。CopyActivity 组件由以下字段定义:
{ "id": "CopyActivityId112", "input": { "ref": "MySqlDataNodeId115" }, "schedule": { "ref": "ScheduleId113" }, "name": "My Copy", "runsOn": { "ref": "Ec2ResourceId116" }, "onSuccess": { "ref": "ActionId1" }, "onFail": { "ref": "SnsAlarmId117" }, "output": { "ref": "S3DataNodeId114" }, "type": "CopyActivity" },
- Id
用户定义 ID,这是仅供您参考的标签
- 输入
待复制 MySQL 数据的位置
- 计划
运行此活动的计划
- 名称
用户定义名称,这是仅供您参考的标签
- runsOn
执行此活动定义的工作的计算资源。在本示例中,我们提供了对之前定义的 EC2 实例的引用。使用
runsOn
字段会让 AWS Data Pipeline 为您创建 EC2 实例。runsOn
字段指示资源存在于 AWS 基础设施中,而 workerGroup 值指示您要使用自己的本地资源执行工作。- onSuccess
活动成功完成时发送的 SnsAlarm
- onFail
活动失败时发送的 SnsAlarm
- 输出
CSV 输出文件的 Amazon S3 位置
- 类型
要执行的活动类型。
上传并激活管道定义
您必须上传您的管道定义并激活您的管道。在以下示例命令中,将 pipeline_name
替换为管道的标签,将 pipeline_file
替换为管道定义 .json
文件的完全限定路径。
AWS CLI
要创建管道定义并激活管道,请使用以下 create-pipeline 命令。记下您的管道 ID,因为您将在大多数 CLI 命令中使用这个值。
aws datapipeline create-pipeline --name
{ "pipelineId": "df-00627471SOVYZEXAMPLE" }pipeline_name
--unique-idtoken
使用以下 put-pipeline-definition 命令更新管道定义。
aws datapipeline put-pipeline-definition --pipeline-id df-00627471SOVYZEXAMPLE --pipeline-definition file://MyEmrPipelineDefinition.json
如果您的管道成功验证,则 validationErrors
字段为空。您应该查看所有警告。
要激活管道,请使用以下 activate-pipeline 命令。
aws datapipeline activate-pipeline --pipeline-id df-00627471SOVYZEXAMPLE
您可以使用以下 list-pipelines 命令来验证您的管道是否出现在管道列表中。
aws datapipeline list-pipelines