

AWS Data Pipeline 不再向新客户提供。的现有客户 AWS Data Pipeline 可以继续照常使用该服务。[了解详情](https://aws.amazon.com/blogs/big-data/migrate-workloads-from-aws-data-pipeline/)

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 将数据和表与管道活动一起暂存
<a name="dp-concepts-staging"></a>

AWS Data Pipeline 可以在管道中暂存输入和输出数据，以便更轻松地使用某些活动，例如`ShellCommandActivity`和`HiveActivity`。

数据暂存让您能够将数据从输入数据节点复制到执行活动的资源，从资源到输出数据节点与此类似。

通过在活动的 shell 命令或 Hive 脚本中使用特殊变量，可使用 Amazon EMR 或 Amazon EC2 资源上的暂存数据。

表暂存类似于数据暂存，具体而言，不同之处在于暂存的数据采用数据库表的形式。

AWS Data Pipeline 支持以下暂存场景：
+ 使用 `ShellCommandActivity` 的数据暂存
+ 使用 Hive 的表暂存和支持暂存的数据节点
+ 使用 Hive 的表暂存和不支持暂存的数据节点

**注意**  
仅当活动上的 `stage` 字段设置为 `true` 时暂存才生效，例如 `ShellCommandActivity`。有关更多信息，请参阅 [ShellCommandActivity](dp-object-shellcommandactivity.md)。

此外，数据节点和活动可以通过四种方式关联：

在资源上本地暂存数据  
输入数据自动复制到资源本地文件系统。输出数据自动从资源本地文件系统复制到输出数据节点。例如，当您使用 staging = true 配置 `ShellCommandActivity` 输入和输出时，输入数据作为 INPUT*x*\$1STAGING\$1DIR 可用，输出数据作为 OUTPUT*x*\$1STAGING\$1DIR 可用，其中 *x* 是输入或输出的编号。

暂存活动的输入和输出定义  
输入数据格式 (列名和表名) 自动复制到活动的资源中。例如，当您使用 staging = true 配置 `HiveActivity` 时。在输入 `S3DataNode` 上指定的数据格式用于从 Hive 表暂存表定义。

暂存未启用  
输入和输出对象及其字段可用于活动，但数据本身不行。例如，`EmrActivity` 默认情况下或在您使用 staging = false 配置其他活动时。在此配置中，活动可以使用 AWS Data Pipeline 表达式语法对数据字段进行引用，并且只有在满足依赖关系时才会发生这种情况。这仅用作依赖关系检查。活动中的代码负责将数据从输入复制到运行活动的资源。

对象之间的依赖关系  
两个对象之间存在依赖关系，这会导致类似于未启用暂存的情况。这导致数据节点或活动用作执行另一个活动的先决条件。

## 使用进行数据暂存 ShellCommandActivity
<a name="dp-concepts-datastaging"></a>

考虑使用`ShellCommandActivity`带`S3DataNode`对象的场景作为数据输入和输出。 AWS Data Pipeline 使用环境变量自动暂存数据节点，让 shell 命令可以访问它们，`${OUTPUT1_STAGING_DIR}`就像它们是本地文件文件夹一样`${INPUT1_STAGING_DIR}`，如以下示例所示。名为 `INPUT1_STAGING_DIR` 和 `OUTPUT1_STAGING_DIR` 的变量的数字部分根据您的活动引用的数据节点数递增。

**注意**  
 只有在您的输入和输出为 `S3DataNode` 对象时，此场景才按所述工作。此外，只有当 `directoryPath` 设置在输出 `S3DataNode` 对象上时，才允许输出数据暂存。

```
{
  "id": "AggregateFiles",
  "type": "ShellCommandActivity",
  "stage": "true",
  "command": "cat ${INPUT1_STAGING_DIR}/part* > ${OUTPUT1_STAGING_DIR}/aggregated.csv",
  "input": {
    "ref": "MyInputData"
  },
  "output": {
    "ref": "MyOutputData"
  }
},
{
  "id": "MyInputData",
  "type": "S3DataNode",
  "schedule": {
    "ref": "MySchedule"
  },
  "filePath": "s3://my_bucket/source/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}/items"
  }
},                    
{
  "id": "MyOutputData",
  "type": "S3DataNode",
  "schedule": {
    "ref": "MySchedule"
  },
  "directoryPath": "s3://my_bucket/destination/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}"
  }
},
...
```

## 使用 Hive 的表暂存和支持暂存的数据节点
<a name="dp-concepts-tablestaging"></a>

考虑使用`HiveActivity`带`S3DataNode`对象的场景作为数据输入和输出。 AWS Data Pipeline 使用变量自动暂存数据节点，使 Hive 脚本可以访问它们，就像它们是 Hive 表`${output1}`一样`${input1}`，如以下示例所示。`HiveActivity`名为 `input` 和 `output` 的变量的数字部分根据您的活动引用的数据节点数递增。

**注意**  
 只有在您的输入和输出为 `S3DataNode` 或 `MySqlDataNode` 对象时，此场景才按所述方式工作。`DynamoDBDataNode` 不支持表暂存。

```
{
  "id": "MyHiveActivity",
  "type": "HiveActivity",
  "schedule": {
    "ref": "MySchedule"
  },
  "runsOn": {
    "ref": "MyEmrResource"
  },
  "input": {
    "ref": "MyInputData"
  },
  "output": {
    "ref": "MyOutputData"
  },
  "hiveScript": "INSERT OVERWRITE TABLE ${output1} select * from ${input1};"
},
{
  "id": "MyInputData",
  "type": "S3DataNode",
  "schedule": {
    "ref": "MySchedule"
  },
  "directoryPath": "s3://test-hive/input"
  }
},                    
{
  "id": "MyOutputData",
  "type": "S3DataNode",
  "schedule": {
    "ref": "MySchedule"
  },
  "directoryPath": "s3://test-hive/output"
  }
},
...
```

## 使用 Hive 的表暂存和不支持暂存的数据节点
<a name="dp-concepts-nostaging"></a>

请考虑使用 `HiveActivity` 和 `DynamoDBDataNode` 作为数据输入并将 `S3DataNode` 对象作为输出的场景。没有数据暂存可用于 `DynamoDBDataNode`，因此您必须先手动在 hive 脚本中创建表，使用变量名 `#{input.tableName}` 引用 DynamoDB 表。如果 DynamoDB 表是输出，类似术语也适用，除非您使用变量 `#{output.tableName}`。在本示例中，暂存可用于输出 `S3DataNode` 对象，因此您可以将输出数据节点作为 `${output1}` 引用。

**注意**  
在此示例中，表名变量具有 \$1（哈希）字符前缀，因为 AWS Data Pipeline 使用表达式来访问`tableName`或`directoryPath`。有关表达式求值工作原理的更多信息 AWS Data Pipeline，请参阅[表达式计算](dp-pipeline-expressions.md#dp-datatype-functions)。

```
{
  "id": "MyHiveActivity",
  "type": "HiveActivity",
  "schedule": {
    "ref": "MySchedule"
  },
  "runsOn": {
    "ref": "MyEmrResource"
  },
  "input": {
    "ref": "MyDynamoData"
  },
  "output": {
    "ref": "MyS3Data"
  },
  "hiveScript": "-- Map DynamoDB Table
SET dynamodb.endpoint=dynamodb.us-east-1.amazonaws.com;
SET dynamodb.throughput.read.percent = 0.5;
CREATE EXTERNAL TABLE dynamodb_table (item map<string,string>)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "#{input.tableName}"); 
INSERT OVERWRITE TABLE ${output1} SELECT * FROM dynamodb_table;"
},
{
  "id": "MyDynamoData",
  "type": "DynamoDBDataNode",
  "schedule": {
    "ref": "MySchedule"
  },
  "tableName": "MyDDBTable"
},                 
{
  "id": "MyS3Data",
  "type": "S3DataNode",
  "schedule": {
    "ref": "MySchedule"
  },
  "directoryPath": "s3://test-hive/output"
  }
},
...
```