

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 通过以下方式培养消费者 适用于 Java 的 AWS SDK
<a name="develop-consumers-sdk"></a>

 你可以使用 Amazon Kinesis Dat APIs a Streams 开发自定义消费者。本节介绍如何将 Kinesis Data APIs Streams 与配合 适用于 Java 的 AWS SDK使用。

**重要**  
有关开发具有共享吞吐量的自定义 Kinesis Data Streams 消费端的方法，建议使用 Kinesis Client Library（KCL）。KCL 通过处理许多与分布式计算相关的复杂任务，帮助您使用和处理 Kinesis 数据流中的数据。有关更多信息，请参阅 [使用 Java 通过 KCL 开发消费端](develop-kcl-consumers-java.md)。

**Topics**
+ [使用开发吞吐量共享的消费者 适用于 Java 的 AWS SDK](developing-consumers-with-sdk.md)
+ [通过以下方式培养更具吸引力的粉丝消费者 适用于 Java 的 AWS SDK](building-enhanced-consumers-api.md)
+ [使用 AWS Glue 架构注册表与数据交互](building-enhanced-consumers-glue-schema-registry.md)

# 使用开发吞吐量共享的消费者 适用于 Java 的 AWS SDK
<a name="developing-consumers-with-sdk"></a>

开发自定义 Kinesis Data Streams 使用者的方法之一是将 Amazon Kinesis Data APIs Streams 与. 适用于 Java 的 AWS SDK本节介绍如何将 Kinesis Data APIs Streams 与配合 适用于 Java 的 AWS SDK使用。你可以使用其他不同的编程语言调用 Kinesis Data APIs Streams。有关所有可用内容的更多信息 AWS SDKs，请参阅[开始使用 Amazon Web Services 进行开发](https://aws.amazon.com/developers/getting-started/)。

本部分的 Java 示例代码演示如何执行基本的 Kinesis Data Streams API 操作，并按照操作类型从逻辑上进行划分。这些示例并非可直接用于生产的代码。它们不会检查所有可能的异常，或者不会考虑到所有可能的安全或性能问题。

**Topics**
+ [从流中获取数据](#kinesis-using-sdk-java-get-data)
+ [使用分片迭代器](#kinesis-using-sdk-java-get-data-shard-iterators)
+ [使用 GetRecords](#kinesis-using-sdk-java-get-data-getrecords)
+ [适应重新分片](#kinesis-using-sdk-java-get-data-reshard)

## 从流中获取数据
<a name="kinesis-using-sdk-java-get-data"></a>

Kinesis Data APIs Streams 包括`getShardIterator``getRecords`和方法，您可以调用这些方法从数据流中检索记录。这是拉取模型，您的代码可以直接从数据流的分片中抽取数据记录。

**重要**  
我们建议您使用由 KCL 提供的记录处理器支持功能，以从数据流中检索记录。这是推送模型，您可以通过实现代码来处理数据。KCL 将从数据流中获取数据记录并将数据记录传送给您的应用程序代码。此外，KCL 还提供失效转移、恢复和负载均衡功能。有关更多信息，请参阅 [Developing Custom Consumers with Shared Throughput Using KCL](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html)。

但是，在某些情况下，你可能更喜欢使用 Kinesis Dat APIs a Streams。例如，在实施自定义工具以监控或调试数据流时。

**重要**  
Kinesis Data Streams 支持更改数据流的数据记录保留期。有关更多信息，请参阅 [更改数据留存期](kinesis-extended-retention.md)。

## 使用分片迭代器
<a name="kinesis-using-sdk-java-get-data-shard-iterators"></a>

可从流中按分片检索记录。对于每个分片以及您从分片中检索的每批记录，您必须获取*分片迭代器*。可在 `getRecordsRequest` 对象中使用分片迭代器来指定要从中检索记录的分片。与分片迭代器关联的类型决定了应在分片中检索记录的起点（有关更多信息，请参阅此部分中后面的内容）。您必须先检索分片，然后才能使用分片迭代器。有关更多信息，请参阅 [列出分片](kinesis-using-sdk-java-list-shards.md)。

使用 `getShardIterator` 方法获取初始分片迭代器。使用 `getNextShardIterator` 对象（由 `getRecordsResult` 方法返回）的 `getRecords` 方法为其他记录批次获取分片迭代器。分片迭代器的有效时间为 5 分钟。如果使用有效期内的分片迭代器，则将获得一个新的迭代器。每个分片迭代器在 5 分钟内一直有效，即使使用过也是如此。

要获取初始分片迭代器，请实例化 `GetShardIteratorRequest` 并将其传递给 `getShardIterator` 方法。要配置请求，请指定流和分片 ID。有关如何在您的 AWS 账户中获取直播的信息，请参阅[列出流](kinesis-using-sdk-java-list-streams.md)。有关如何获取流中分片的信息，请参阅 [列出分片](kinesis-using-sdk-java-list-shards.md)。

```
String shardIterator;
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setStreamName(myStreamName);
getShardIteratorRequest.setShardId(shard.getShardId());
getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON");

GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest);
shardIterator = getShardIteratorResult.getShardIterator();
```

此示例代码在获取初始分片迭代器时将 `TRIM_HORIZON` 指定为迭代器类型。此迭代器类型意味着记录应从添加到分片的第一个记录而不是从最近添加的记录（也称为*顶端*）开始返回。以下是可能的迭代器类型：
+ `AT_SEQUENCE_NUMBER`
+ `AFTER_SEQUENCE_NUMBER`
+ `AT_TIMESTAMP`
+ `TRIM_HORIZON`
+ `LATEST`

有关更多信息，请参阅 [ShardIteratorType](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#Kinesis-GetShardIterator-request-ShardIteratorType)。

部分迭代器类型除了需要指定类型之外，还需要指定序列号；例如：

```
getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER");
getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);
```

使用 `getRecords` 获取记录之后，可通过调用记录的 `getSequenceNumber` 方法来获取记录的序列号。

```
record.getSequenceNumber()
```

此外，将记录添加到数据流的代码可通过对 `getSequenceNumber` 的结果调用 `putRecord` 获取已添加记录的序列号。

```
lastSequenceNumber = putRecordResult.getSequenceNumber();
```

您可使用序列号确保记录的顺序严格递增。有关更多信息，请参阅 [PutRecord 示例](developing-producers-with-sdk.md#kinesis-using-sdk-java-putrecord-example)中的代码示例。

## 使用 GetRecords
<a name="kinesis-using-sdk-java-get-data-getrecords"></a>

获取分片迭代器之后，请实例化 `GetRecordsRequest` 对象。使用 `setShardIterator` 方法为请求指定迭代器。

(可选) 您还可使用 `setLimit` 方法设置要检索的记录的数量。`getRecords` 返回的记录数量始终等于或小于此限制。如果您未指定此限制，`getRecords` 将返回已检索记录的 10MB。以下示例代码将此限制设置为 25 个记录。

如果未返回任何记录，则意味着此分片中当前没有分片迭代器引用的序列号对应的可用数据记录。在这种情况下，您的应用程序应等待流的数据来源所需的时间。然后尝试使用对 `getRecords` 的上一调用返回的分片迭代器再次从分片获取数据。

将 `getRecordsRequest` 传递给 `getRecords` 方法并捕获返回的值作为 `getRecordsResult` 对象。要获取数据记录，请对 `getRecords` 对象调用 `getRecordsResult` 方法。

```
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(25);

GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest);
List<Record> records = getRecordsResult.getRecords();
```

要准备对 `getRecords` 的另一次调用，请通过 `getRecordsResult` 获取下一分片迭代器。

```
shardIterator = getRecordsResult.getNextShardIterator();
```

为获得最佳效果，请在对 `getRecords` 的各次调用之间停止至少 1 秒（1000 毫秒）以免超出 `getRecords` 频率限制。

```
try {
  Thread.sleep(1000);
}
catch (InterruptedException e) {}
```

通常，您应循环调用 `getRecords`，甚至当您在测试方案中检索单一记录时也是如此。对 `getRecords` 的单一调用可能返回空的记录列表，即使分片包含更多具有之后的序列号的记录也是如此。出现此情况时，将返回 `NextShardIterator`，同时空记录列表将引用分片中之后的序列号，并且后续的 `getRecords` 调用最终将返回记录。以下示例演示循环的使用。

**示例：getRecords**  
以下代码示例反映了此节中的 `getRecords` 顶端，包括循环发出调用。

```
// Continuously read data records from a shard
List<Record> records;
    
while (true) {
   
  // Create a new getRecordsRequest with an existing shardIterator 
  // Set the maximum records to return to 25
  
  GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
  getRecordsRequest.setShardIterator(shardIterator);
  getRecordsRequest.setLimit(25); 

  GetRecordsResult result = client.getRecords(getRecordsRequest);
  
  // Put the result into record list. The result can be empty.
  records = result.getRecords();
  
  try {
    Thread.sleep(1000);
  } 
  catch (InterruptedException exception) {
    throw new RuntimeException(exception);
  }
  
  shardIterator = result.getNextShardIterator();
}
```

如果您使用 Kinesis Client Library，则可能在返回数据之前发出多次调用。此行为是设计使然，不代表 KCL 或您的数据存在问题。

## 适应重新分片
<a name="kinesis-using-sdk-java-get-data-reshard"></a>

 如果 `getRecordsResult.getNextShardIterator` 返回 `null`，则表示发生了涉及此分片的分片拆分或合并。此分片现在处于 `CLOSED` 状态，并且您已从其中读取了所有可用的数据记录。

 在这种情况下，您可以使用 `getRecordsResult.childShards` 来了解正在处理的分片中由拆分或合并创建的新子分片。有关更多信息，请参阅 [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html)。

 在拆分中，两个新分片的 `parentShardId` 都与您之前处理的分片的分片 ID 相同。这两个分片的 `adjacentParentShardId` 值为 `null`。

 在合并中，合并创建的一个新分片的 `parentShardId` 等于父分片之一的分片 ID，并且 `adjacentParentShardId` 等于另一父分片的分片 ID。您的应用程序已读取这些分片之一中的所有数据。这是 `getRecordsResult.getNextShardIterator` 返回 `null` 的分片。如果数据顺序对于您的应用程序很重要，则应确保它在读取合并创建的子分片中的任何新数据之前，还读取另一父分片中的所有数据。

 如果您使用多个处理器从流检索数据（假定一个分片一个处理器），并且出现分片拆分或合并时，您应增加或减少处理器数量以适应分片数量的变化。

 有关重新分片的更多信息，包括有关分片状态（如 `CLOSED`）的讨论，请参阅 [对流进行重新分片](kinesis-using-sdk-java-resharding.md)。

# 通过以下方式培养更具吸引力的粉丝消费者 适用于 Java 的 AWS SDK
<a name="building-enhanced-consumers-api"></a>

*增强型扇出*是一种 Amazon Kinesis Data Streams 功能，支持消费端从数据流中接收记录，其中每分片每秒专用吞吐量高达 2MB 数据。使用增强型扇出功能的消费端不必与接收流中数据的其他消费端争夺。有关更多信息，请参阅 [开发具有专用吞吐量的增强扇出型消费端](enhanced-consumers.md)。

可以使用 API 操作构建在 Kinesis Data Streams 中使用增强型扇出功能的消费端。

**使用 Kinesis Data Streams API 注册采用增强型扇出功能的消费端**

1. 致电[RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html)将您的应用程序注册为使用增强型扇出功能的使用者。Kinesis Data Streams 为消费端生成一个 Amazon 资源名称（ARN）并在响应中返回此名称。

1. 要开始监听特定分片，请在调用中将使用者 ARN 传递给。[SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)然后，Kinesis Data Streams 开始通过 HTTP/2 连接以[SubscribeToShardEvent](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShardEvent.html)类型的事件形式将记录从该分片推送给你。此连接将保持打开状态长达 5 分钟。如果要在调用返回的记录正常或异常[SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)完成后继续从分片接收记录，请[SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)再次调用。`future`
**注意**  
到达当前分片的末尾时，`SubscribeToShard` API 还会返回当前分片的子分片列表。

1. 要取消注册使用增强型扇出功能的消费者，请致电。[DeregisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html)

以下代码是一个示例，演示如何为消费端订阅分片、定期续订订阅以及处理事件。

```
    import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
    import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
     
    import java.util.concurrent.CompletableFuture;
     
    /**
     * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java
     * for complete code and more examples.
     */
    public class SubscribeToShardSimpleImpl {
     
        private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737";
        private static final String SHARD_ID = "shardId-000000000000";
     
        public static void main(String[] args) {
     
            KinesisAsyncClient client = KinesisAsyncClient.create();
     
            SubscribeToShardRequest request = SubscribeToShardRequest.builder()
                    .consumerARN(CONSUMER_ARN)
                    .shardId(SHARD_ID)
                    .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build();
     
            // Call SubscribeToShard iteratively to renew the subscription periodically.
            while(true) {
                // Wait for the CompletableFuture to complete normally or exceptionally.
                callSubscribeToShardWithVisitor(client, request).join();
            }
     
            // Close the connection before exiting.
            // client.close();
        }
     
     
        /**
         * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface.
         */
        private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) {
            SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() {
                @Override
                public void visit(SubscribeToShardEvent event) {
                    System.out.println("Received subscribe to shard event " + event);
                }
            };
            SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
                    .builder()
                    .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
                    .subscriber(visitor)
                    .build();
            return client.subscribeToShard(request, responseHandler);
        }
    }
```

 如果 `event.ContinuationSequenceNumber` 返回 `null`，则表示发生了涉及此分片的分片拆分或合并。此分片现在处于 `CLOSED` 状态，并且您已从其中读取了所有可用的数据记录。在这种情况下，按照上文示例所述，您可以使用 `event.childShards` 来了解正在处理的分片中由拆分或合并创建的新子分片。有关更多信息，请参阅 [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html)。

# 使用 AWS Glue 架构注册表与数据交互
<a name="building-enhanced-consumers-glue-schema-registry"></a>

您可以将 Kinesis 数据流与 AWS Glue 架构注册表集成。 AWS Glue 架构注册表能帮助您集中发现、控制和演变架构，同时确保注册架构持续验证生成的数据。架构定义了数据记录的结构和格式。架构是用于可靠数据发布、使用或存储的版本化规范。 AWS Glue Schema Registry 使您能够改善流媒体应用程序中的 end-to-end数据质量和数据治理。有关更多信息，请参阅 [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)。设置此集成的方法之一是通过 Java SDK 中 AWS 提供的 `GetRecords` Kinesis Data Streams API。

有关如何使用 Kinesis 数据流设置 Kinesis 数据流与架构注册表集成的详细说明，请参阅用`GetRecords`[例：将 Amazon Kinesis 数据 APIs流与 Glue 架构注册表集成中的 “使用 Kin APIs esis 数据流与数据交互” 部分](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds)。 AWS 