使用 AWS CLI 的 AWS Data Pipeline 示例
以下代码示例演示了如何通过将 AWS Command Line Interface与 AWS Data Pipeline 结合使用,来执行操作和实现常见场景。
操作是大型程序的代码摘录,必须在上下文中运行。您可以通过操作了解如何调用单个服务函数,还可以通过函数相关场景的上下文查看操作。
每个示例都包含一个指向完整源代码的链接,您可以从中找到有关如何在上下文中设置和运行代码的说明。
主题
操作
以下代码示例演示了如何使用 activate-pipeline
。
- AWS CLI
-
激活管道
此示例激活指定的管道:
aws datapipeline activate-pipeline --pipeline-id
df-00627471SOVYZEXAMPLE
要在特定的日期和时间激活管道,请使用以下命令:
aws datapipeline activate-pipeline --pipeline-id
df-00627471SOVYZEXAMPLE
--start-timestamp2015-04-07T00:00:00Z
-
有关 API 详细信息,请参阅《AWS CLI 命令参考》中的 ActivatePipeline
。
-
以下代码示例演示了如何使用 add-tags
。
- AWS CLI
-
向管道添加标签
此示例向指定管道添加指定标签:
aws datapipeline add-tags --pipeline-id
df-00627471SOVYZEXAMPLE
--tagskey=environment,value=production
key=owner,value=sales
要查看标签,请使用 describe-pipelines 命令。例如,示例命令中添加的标签在 describe-pipelines 的输出中如下所示:
{ ... "tags": [ { "value": "production", "key": "environment" }, { "value": "sales", "key": "owner" } ] ... }
-
有关 API 详细信息,请参阅《AWS CLI 命令参考》中的 AddTags
。
-
以下代码示例演示了如何使用 create-pipeline
。
- AWS CLI
-
创建管道
此示例创建一个管道:
aws datapipeline create-pipeline --name
my-pipeline
--unique-idmy-pipeline-token
下面是示例输出:
{ "pipelineId": "df-00627471SOVYZEXAMPLE" }
-
有关 API 详细信息,请参阅《AWS CLI 命令参考》中的 CreatePipeline
。
-
以下代码示例演示了如何使用 deactivate-pipeline
。
- AWS CLI
-
停用管道
此示例停用指定的管道:
aws datapipeline deactivate-pipeline --pipeline-id
df-00627471SOVYZEXAMPLE
要仅在所有正在运行的活动完成后停用管道,请使用以下命令:
aws datapipeline deactivate-pipeline --pipeline-id
df-00627471SOVYZEXAMPLE
--no-cancel-active-
有关 API 详细信息,请参阅《AWS CLI 命令参考》中的 DeactivatePipeline
。
-
以下代码示例演示了如何使用 delete-pipeline
。
- AWS CLI
-
删除管道
此示例删除指定的管道:
aws datapipeline delete-pipeline --pipeline-id
df-00627471SOVYZEXAMPLE
-
有关 API 详细信息,请参阅《AWS CLI 命令参考》中的 DeletePipeline
。
-
以下代码示例演示了如何使用 describe-pipelines
。
- AWS CLI
-
描述您的管道
此示例描述指定的管道:
aws datapipeline describe-pipelines --pipeline-ids
df-00627471SOVYZEXAMPLE
下面是示例输出:
{ "pipelineDescriptionList": [ { "fields": [ { "stringValue": "PENDING", "key": "@pipelineState" }, { "stringValue": "my-pipeline", "key": "name" }, { "stringValue": "2015-04-07T16:05:58", "key": "@creationTime" }, { "stringValue": "df-00627471SOVYZEXAMPLE", "key": "@id" }, { "stringValue": "123456789012", "key": "pipelineCreator" }, { "stringValue": "PIPELINE", "key": "@sphere" }, { "stringValue": "123456789012", "key": "@userId" }, { "stringValue": "123456789012", "key": "@accountId" }, { "stringValue": "my-pipeline-token", "key": "uniqueId" } ], "pipelineId": "df-00627471SOVYZEXAMPLE", "name": "my-pipeline", "tags": [] } ] }
-
有关 API 详细信息,请参阅《AWS CLI 命令参考》中的 DescribePipelines
。
-
以下代码示例演示了如何使用 get-pipeline-definition
。
- AWS CLI
-
获取管道定义
此示例获取指定管道的管道定义:
aws datapipeline get-pipeline-definition --pipeline-id
df-00627471SOVYZEXAMPLE
下面是示例输出:
{ "parameters": [ { "type": "AWS::S3::ObjectKey", "id": "myS3OutputLoc", "description": "S3 output folder" }, { "default": "s3://us-east-1.elasticmapreduce.samples/pig-apache-logs/data", "type": "AWS::S3::ObjectKey", "id": "myS3InputLoc", "description": "S3 input folder" }, { "default": "grep -rc \"GET\" ${INPUT1_STAGING_DIR}/* > ${OUTPUT1_STAGING_DIR}/output.txt", "type": "String", "id": "myShellCmd", "description": "Shell command to run" } ], "objects": [ { "type": "Ec2Resource", "terminateAfter": "20 Minutes", "instanceType": "t1.micro", "id": "EC2ResourceObj", "name": "EC2ResourceObj" }, { "name": "Default", "failureAndRerunMode": "CASCADE", "resourceRole": "DataPipelineDefaultResourceRole", "schedule": { "ref": "DefaultSchedule" }, "role": "DataPipelineDefaultRole", "scheduleType": "cron", "id": "Default" }, { "directoryPath": "#{myS3OutputLoc}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')}", "type": "S3DataNode", "id": "S3OutputLocation", "name": "S3OutputLocation" }, { "directoryPath": "#{myS3InputLoc}", "type": "S3DataNode", "id": "S3InputLocation", "name": "S3InputLocation" }, { "startAt": "FIRST_ACTIVATION_DATE_TIME", "name": "Every 15 minutes", "period": "15 minutes", "occurrences": "4", "type": "Schedule", "id": "DefaultSchedule" }, { "name": "ShellCommandActivityObj", "command": "#{myShellCmd}", "output": { "ref": "S3OutputLocation" }, "input": { "ref": "S3InputLocation" }, "stage": "true", "type": "ShellCommandActivity", "id": "ShellCommandActivityObj", "runsOn": { "ref": "EC2ResourceObj" } } ], "values": { "myS3OutputLoc": "s3://my-s3-bucket/", "myS3InputLoc": "s3://us-east-1.elasticmapreduce.samples/pig-apache-logs/data", "myShellCmd": "grep -rc \"GET\" ${INPUT1_STAGING_DIR}/* > ${OUTPUT1_STAGING_DIR}/output.txt" } }
-
有关 API 详细信息,请参阅《AWS CLI 命令参考》中的 GetPipelineDefinition
。
-
以下代码示例演示了如何使用 list-pipelines
。
- AWS CLI
-
列出您的管道
此示例列出您的管道:
aws datapipeline list-pipelines
下面是示例输出:
{ "pipelineIdList": [ { "id": "df-00627471SOVYZEXAMPLE", "name": "my-pipeline" }, { "id": "df-09028963KNVMREXAMPLE", "name": "ImportDDB" }, { "id": "df-0870198233ZYVEXAMPLE", "name": "CrossRegionDDB" }, { "id": "df-00189603TB4MZEXAMPLE", "name": "CopyRedshift" } ] }
-
有关 API 详细信息,请参阅《AWS CLI 命令参考》中的 ListPipelines
。
-
以下代码示例演示了如何使用 list-runs
。
- AWS CLI
-
示例 1:列出管道的运行情况
以下
list-runs
示例列出指定管道的运行情况。aws datapipeline list-runs --pipeline-id
df-00627471SOVYZEXAMPLE
输出:
Name Scheduled Start Status ID Started Ended ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 1. EC2ResourceObj 2015-04-12T17:33:02 CREATING @EC2ResourceObj_2015-04-12T17:33:02 2015-04-12T17:33:10 2. S3InputLocation 2015-04-12T17:33:02 FINISHED @S3InputLocation_2015-04-12T17:33:02 2015-04-12T17:33:09 2015-04-12T17:33:09 3. S3OutputLocation 2015-04-12T17:33:02 WAITING_ON_DEPENDENCIES @S3OutputLocation_2015-04-12T17:33:02 2015-04-12T17:33:09 4. ShellCommandActivityObj 2015-04-12T17:33:02 WAITING_FOR_RUNNER @ShellCommandActivityObj_2015-04-12T17:33:02 2015-04-12T17:33:09
示例 2:列出在指定日期之间的管道运行情况
以下
list-runs
示例使用--start-interval
来指定要包含在输出中的日期。aws datapipeline list-runs --pipeline-id
df-01434553B58A2SHZUKO5
--start-interval2017-10-07T00:00:00,2017-10-08T00:00:00
-
有关 API 详细信息,请参阅《AWS CLI 命令参考》中的 ListRuns
。
-
以下代码示例演示了如何使用 put-pipeline-definition
。
- AWS CLI
-
上传管道定义
此示例将指定的管道定义上传到指定的管道:
aws datapipeline put-pipeline-definition --pipeline-id
df-00627471SOVYZEXAMPLE
--pipeline-definitionfile://my-pipeline-definition.json
下面是示例输出:
{ "validationErrors": [], "errored": false, "validationWarnings": [] }
-
有关 API 详细信息,请参阅《AWS CLI 命令参考》中的 PutPipelineDefinition
。
-
以下代码示例演示了如何使用 remove-tags
。
- AWS CLI
-
从管道中移除标签
此示例从指定管道中移除指定标签:
aws datapipeline remove-tags --pipeline-id
df-00627471SOVYZEXAMPLE
--tag-keysenvironment
-
有关 API 详细信息,请参阅《AWS CLI 命令参考》中的 RemoveTags
。
-