

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 对流进行重新分片
<a name="kinesis-using-sdk-java-resharding"></a>

**重要**  
您可以使用 API 对直播进行重新分片。[UpdateShardCount](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_UpdateShardCount.html)否则，您可以按照此处的说明继续执行拆分和合并。

Amazon Kinesis Data Streams 支持*重新分片*，这使您能够调整流中的分片数量以适应流中数据流的速率变化。重新分片被视为高级操作。如果您不熟悉 Kinesis Data Streams，请在熟悉 Kinesis Data Streams 的所有其他方面之后再回来阅读本主题。

这里有两种类型的重新分片操作：分片拆分和分片合并。在分片拆分中，可将一个分片拆分为两个分片。在分片合并中，可将两个分片组合成一个分片。重新分片始终是*成对* 进行的，也就是说，无法在一次操作中拆分为两个以上的分片，并且无法在一次操作中合并两个以上的分片。重新分片操作涉及的分片或分片对称为*父* 分片。从重新分片操作中生成的分片或分片对称为*子* 分片。

拆分将增加流中分片的数量，从而增加流的数据容量。由于按分片收费，因此拆分将增加流的费用。同样，合并会减少流中的分片数量，从而降低流的数据容量和成本。

重新分片一般由不同于创建者（放置）应用程序和消费端（获取）应用程序的管理应用程序执行。这样的管理应用程序根据Amazon提供的指标 CloudWatch 或从制作者和消费者那里收集的指标来监控直播的整体性能。与使用者或创建者相比，管理应用程序还需要更广泛的 IAM 权限，因为使用者和创建者通常不需要访问 APIs 用于重新分片的用户。有关 Kinesis Data Streams 的 IAM 权限的更多信息，请参阅 [使用 IAM 控制对 Amazon Kinesis Data Streams 资源的访问](controlling-access.md)。

有关重新分片的更多信息，请参阅[如何更改 Kinesis Data Streams 中打开的分片数？](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-streams-open-shards/)

**Topics**
+ [确定重新分片策略](kinesis-using-sdk-java-resharding-strategies.md)
+ [拆分分片](kinesis-using-sdk-java-resharding-split.md)
+ [合并两个分片](kinesis-using-sdk-java-resharding-merge.md)
+ [完成重新分片操作](kinesis-using-sdk-java-after-resharding.md)

# 确定重新分片策略
<a name="kinesis-using-sdk-java-resharding-strategies"></a>

在 Amazon Kinesis Data Streams 中重新分片的目的是，使流能够适应数据流的速率的变化。拆分分片将增加流的容量（和费用）。合并分片将减少流的费用（和容量）。

 一种重新分片的方式可能是拆分流中的每个分片，这将使流的容量增加一倍。但是，这提供的容量可能比您实际需要的要多，从而产生不必要的费用。

您还可使用指标确定您的*热*或*冷*分片（即，接收的数据多于预期或少于预期的分片）。之后您可选择性地拆分热分片以增加面向这些分片的哈希键的容量。同样，您可合并冷分片以更好地利用其未使用的容量。

您可以从 Kinesis Data Streams 发布的亚马逊 CloudWatch 指标中获取直播的一些性能数据。但是，您也可收集您自己的一些关于流的指标。一种方式是记录由您的数据记录的分区键生成的哈希键值。记住，您在向流添加记录时指定了分区键。

```
putRecordRequest.setPartitionKey( String.format( "myPartitionKey" ) );
```

Kinesis Data Streams [MD5](http://en.wikipedia.org/wiki/MD5)使用分区键计算哈希键。由于您为记录指定了分区键，因此您可以使用 MD5 计算该记录的哈希键值并将其记录下来。

您也可以记录分配给您的数据记录的分片。 IDs 分片 ID 是通过使用 `getShardId` 对象（由 `putRecordResults` 方法返回）和对 `putRecords` 对象（由 `putRecordResult` 方法返回）的 `putRecord` 方法提供的。

```
String shardId = putRecordResult.getShardId();
```

使用分片 IDs 和哈希键值，您可以确定哪些分片和哈希键接收的流量最多或最少。您之后可使用重新分片操作来增加或减少容量（视这些键的情况而定）。

# 拆分分片
<a name="kinesis-using-sdk-java-resharding-split"></a>

要在 Amazon Kinesis Data Streams 中拆分分片，需要指定父分片中的哈希键值应如何重新分配给子分片。当您向流添加数据记录时，将基于哈希键值将数据记录分配给分片。哈希键值是您在向流中添加数据记录时为数据记录指定的分区键的[MD5](http://en.wikipedia.org/wiki/MD5)哈希值。具有相同分区键的数据记录也具有相同的哈希键值。

指定分片的可能的哈希键值构成一组有序的连续非负整数。此可能的哈希键值范围是通过以下命令指定的：

```
shard.getHashKeyRange().getStartingHashKey();
shard.getHashKeyRange().getEndingHashKey();
```

在拆分分片时，您指定此范围内的一个值。此哈希键值和所有较高的哈希键值将分配到其中一个子分片。所有较小的哈希键值将分配到另一子分片。

以下代码演示在所有子分片之间均匀地重新分配哈希键的分片拆分操作，基本上是将父分片一分为二。这只是一种可能的父分片划分方式。例如，可以拆分分片，以便父分片中下层三分之一的键分配给一个子分片，上层三分之二的键分配给另一子分片。但是，在许多应用程序中，将分片一分为二是一种很有效的方法。

此代码假定 `myStreamName` 包含您的流名称，对象变量 `shard` 包含要拆分的分片。首先实例化一个新的 `splitShardRequest` 对象并设置流名称和分片 ID。

```
SplitShardRequest splitShardRequest = new SplitShardRequest();
splitShardRequest.setStreamName(myStreamName);
splitShardRequest.setShardToSplit(shard.getShardId());
```

确定位于分片中最低值和最高值的中间的哈希键值。这是将包含父分片中上半层哈希键的子分片的起始哈希键值。在 `setNewStartingHashKey` 方法中指定此值。您只需要指定此值。Kinesis Data Streams 会自动将低于此值的哈希键分配给拆分操作所创建的另一子分片。最后一步是对 Kinesis Data Streams 客户端调用 `splitShard` 方法。

```
BigInteger startingHashKey = new BigInteger(shard.getHashKeyRange().getStartingHashKey());
BigInteger endingHashKey   = new BigInteger(shard.getHashKeyRange().getEndingHashKey());
String newStartingHashKey  = startingHashKey.add(endingHashKey).divide(new BigInteger("2")).toString();

splitShardRequest.setNewStartingHashKey(newStartingHashKey);
client.splitShard(splitShardRequest);
```

[等待流再次变为活动状态](kinesis-using-sdk-java-after-resharding.md#kinesis-using-sdk-java-resharding-wait-until-active)中演示了此过程之后的第一步。

# 合并两个分片
<a name="kinesis-using-sdk-java-resharding-merge"></a>

 分片合并操作使用两个指定分片并将它们合并为一个分片。在合并后，单个子分片将收到两个父分片所包含的所有哈希键值的数据。

**分片相邻**  
要合并两个分片，分片必须*相邻*。如果两个分片的哈希键范围联合构成一个无间断的连续集，则认为两个分片相邻。例如，假设您有两个分片，一个分片的哈希键范围为 276...381，另一个分片的哈希键范围为 382...454。您可以将这两个分片合并为一个分片，其哈希键范围为 276...454。

另举一例，假设您有两个分片，一个分片的哈希键范围为 276...381，另一个分片的哈希键范围为 455...560。您无法合并这两个分片，因为这两个分片之间有一个或多个哈希键位于 382...454 范围的分片。

流中所有`OPEN`分片的集合（作为一个组）始终跨越整个哈希键值范围。 MD5 有关分片状态（例如 `CLOSED`）的更多信息，请参阅 [重新分片之后考虑数据路由、数据保留和分片状态](kinesis-using-sdk-java-after-resharding.md#kinesis-using-sdk-java-resharding-data-routing)。

要标识作为合并候选的分片，您应筛选出处于 `CLOSED` 状态的所有分片。状态为 `OPEN`（也就是非 `CLOSED`）的分片的结尾序列号为 `null`。您可使用以下命令测试此结束序列号：

```
if( null == shard.getSequenceNumberRange().getEndingSequenceNumber() ) 
{
  // Shard is OPEN, so it is a possible candidate to be merged.
}
```

在筛选出已关闭分片后，按每个分片支持的最高哈希键值对剩余分片进行排序。您可使用以下命令检索此值：

```
shard.getHashKeyRange().getEndingHashKey();
```

 如果两个分片在这个经过筛选和排序的列表中相邻，则可合并它们。

**合并操作的代码**  
 以下代码可合并两个分片。此代码假定 `myStreamName` 包含您的流名称并且对象变量 `shard1` 和 `shard2` 包含要合并的两个相邻分片。

为进行合并操作，首先实例化一个新的 `mergeShardsRequest` 对象。使用 `setStreamName` 方法指定流名称。然后使用 `setShardToMerge` 和 `setAdjacentShardToMerge` 方法指定要合并的两个分片。最后，对 Kinesis Data Streams 客户端调用 `mergeShards` 方法以执行此操作。

```
MergeShardsRequest mergeShardsRequest = new MergeShardsRequest();
mergeShardsRequest.setStreamName(myStreamName);
mergeShardsRequest.setShardToMerge(shard1.getShardId());
mergeShardsRequest.setAdjacentShardToMerge(shard2.getShardId());
client.mergeShards(mergeShardsRequest);
```

[等待流再次变为活动状态](kinesis-using-sdk-java-after-resharding.md#kinesis-using-sdk-java-resharding-wait-until-active)中演示了此过程之后的第一步。

# 完成重新分片操作
<a name="kinesis-using-sdk-java-after-resharding"></a>

在 Amazon Kinesis Data Streams 中执行任何类型的重新分片过程之后，在继续常规记录处理之前，需要执行其他流程并注意一些事项。以下各节介绍了这些过程。

**Topics**
+ [等待流再次变为活动状态](#kinesis-using-sdk-java-resharding-wait-until-active)
+ [重新分片之后考虑数据路由、数据保留和分片状态](#kinesis-using-sdk-java-resharding-data-routing)

## 等待流再次变为活动状态
<a name="kinesis-using-sdk-java-resharding-wait-until-active"></a>

在调用重新分片操作 `splitShard` 或 `mergeShards` 之后，您必须等待流再次变为活动状态。要使用的代码与您在[创建流](kinesis-using-sdk-java-create-stream.md)之后等待流变为活动状态时使用的代码相同。代码如下所示：

```
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
describeStreamRequest.setStreamName( myStreamName );

long startTime = System.currentTimeMillis();
long endTime = startTime + ( 10 * 60 * 1000 );
while ( System.currentTimeMillis() < endTime ) 
{
  try {
    Thread.sleep(20 * 1000);
  } 
  catch ( Exception e ) {}
  
  try {
    DescribeStreamResult describeStreamResponse = client.describeStream( describeStreamRequest );
    String streamStatus = describeStreamResponse.getStreamDescription().getStreamStatus();
    if ( streamStatus.equals( "ACTIVE" ) ) {
      break;
    }
   //
    // sleep for one second
    //
    try {
      Thread.sleep( 1000 );
    }
    catch ( Exception e ) {}
  }
  catch ( ResourceNotFoundException e ) {}
}
if ( System.currentTimeMillis() >= endTime ) 
{
  throw new RuntimeException( "Stream " + myStreamName + " never went active" );
}
```

## 重新分片之后考虑数据路由、数据保留和分片状态
<a name="kinesis-using-sdk-java-resharding-data-routing"></a>

Kinesis Data Streams 是一项实时数据流服务。您的应用程序应假定数据不断地在流中的分片内流动。当您重新分片时，流至父分片的数据记录将基于数据记录分区键映射到的哈希键值重新路由至子分片。但是，重新分片前位于父分片中的任何数据记录将仍位于这些父分片中。父分片在重新分片发生后不会消失。它们与重新分片之前包含的数据一起保留。可以使用 Kinesis Data Streams API 中的 [`getShardIterator` 和 `getRecords`](developing-consumers-with-sdk.md#kinesis-using-sdk-java-get-data) 操作或通过 Kinesis Client Library 访问父分片中的数据记录。

**注意**  
数据记录在当前保留期内添加到流中后即可访问。不论在该期间内对流中的分片进行了任何更改，都是如此。有关流的保留期的更多信息，请参阅[更改数据留存期](kinesis-extended-retention.md)。

在重新分片过程中，父分片将从 `OPEN` 状态过渡到 `CLOSED` 状态再过渡到 `EXPIRED` 状态。
+  **OPEN**：在重新分片操作之前，父分片处于 `OPEN` 状态，这意味着数据记录可添加到分片中并且可从分片进行检索。
+  **CLOSED**：在重新分片操作之后，父分片将过渡到 `CLOSED` 状态。这意味着无法再向此分片添加数据记录。原本应该已添加到此分片的数据记录现在将改为添加到子分片。但是，数据记录在有限时间内仍可从此分片进行检索。
+  **EXPIRED**：在流的保留期过期之后，父分片中的所有数据记录将会过期，不再可供访问。此时，父分片自身将过渡到 `EXPIRED` 状态。用于枚举流中的分片的 `getStreamDescription().getShards` 调用不包括返回的分片列表中的 `EXPIRED` 分片。有关流的保留期的更多信息，请参阅[更改数据留存期](kinesis-extended-retention.md)。

在进行重新分片并且流再次处于 `ACTIVE` 状态之后，您可立即开始读取子分片中的数据。但是，在重新分片后保留的父分片可能仍包含您尚未读取并且已在重新分片前添加到流中的数据。如果您在读取完父分片中的所有数据之前读取子分片中的数据，则可不按数据记录的序列号指定的顺序读取特定哈希键的数据。因此，假定数据顺序很重要，您在重新分片后应始终继续读取父分片中的数据直到读取完。并且只有在这之后才应开始读取子分片中的数据。如果 `getRecordsResult.getNextShardIterator` 返回 `null`，则这指示您已读取完父分片中的所有数据。