

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 Amazon Kinesis Data Streams API 开发制作人 适用于 Java 的 AWS SDK
<a name="developing-producers-with-sdk"></a>

你可以使用 Amazon Kinesis Data Streams API 和适用于 Java 的软件开发工具包 AWS 来开发制作者。如果是首次使用 Kinesis Data Streams，请先熟悉 [什么是 Amazon Kinesis Data Streams？](introduction.md) 和 [使用 AWS CLI 来执行 Amazon Kinesis Data Streams 操作](getting-started.md) 中介绍的概念和术语。

这些示例讨论 [Kinesis Data Streams API](https://docs.aws.amazon.com/kinesis/latest/APIReference/) 并使用[适用于 Java 的AWS 开发工具包](https://aws.amazon.com/sdk-for-java/)向流中添加（放置）数据。但是，在大多数使用案例中，您应会更喜欢使用 Kinesis Data Streams KPL 库。有关更多信息，请参阅 [使用 Amazon Kinesis Producer Library（KPL）开发产生器](developing-producers-with-kpl.md)。

本章中的 Java 示例代码演示如何执行基本的 Kinesis Data Streams API 操作，并按照操作类型从逻辑上进行划分。这些示例并非可直接用于生产的代码，因为它们不会检查所有可能的异常，或者不会考虑到所有可能的安全或性能问题。此外，您可使用其他编程语言调用 [Kinesis Data Streams API](https://docs.aws.amazon.com/kinesis/latest/APIReference/)。有关所有可用内容的更多信息 AWS SDKs，请参阅[开始使用 Amazon Web Services 进行开发](https://aws.amazon.com/developers/getting-started/)。

每个任务都有先决条件；例如，您在创建流之后才能向流中添加数据，而创建流需要先创建一个客户端。有关更多信息，请参阅 [创建和管理 Kinesis 数据流](working-with-streams.md)。

**Topics**
+ [向流添加数据](#kinesis-using-sdk-java-add-data-to-stream)
+ [使用 AWS Glue 架构注册表与数据交互](kinesis-integration-glue-schema-registry.md)

## 向流添加数据
<a name="kinesis-using-sdk-java-add-data-to-stream"></a>

在创建流之后，您可以记录的形式向其中添加数据。记录是一种数据结构，其中包含要处理的数据（采用数据 Blob 形式）。在将数据存储到记录中之后，Kinesis Data Streams 不会以任何形式检查、解释或更改数据。每个记录还有一个关联的序列号和分区键。

Kinesis Data Streams API 中有两种不同操作可向流添加数据：[https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) 和 [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html)。`PutRecords` 操作将按 HTTP 请求向您的流发送多个记录，并且单个 `PutRecord` 操作一次可向您的流发送多个记录（每个记录需要单独的 HTTP 请求）。对于大多数应用程序，您应会更喜欢使用 `PutRecords`，因为这将使每个数据创建者实现更高的吞吐量。有关每种操作的更多信息，请参阅以下各小节。

**Topics**
+ [使用添加多条记录 PutRecords](#kinesis-using-sdk-java-putrecords)
+ [使用添加单条记录 PutRecord](#kinesis-using-sdk-java-putrecord)

请始终记住，在您的源应用程序使用 Kinesis Data Streams API 向流添加数据时，很可能存在一个或多个同时处理离开流的数据的消费端应用程序。有关消费端如何使用 Kinesis Data Streams API 获取数据的信息，请参阅 [从流中获取数据](developing-consumers-with-sdk.md#kinesis-using-sdk-java-get-data)。

**重要**  
[更改数据留存期](kinesis-extended-retention.md)

### 使用添加多条记录 PutRecords
<a name="kinesis-using-sdk-java-putrecords"></a>

[https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) 操作可在一个请求中向 Kinesis Data Streams 发送多条记录。向 Kinesis 数据流发送数据时，创建器可以使用 `PutRecords` 实现更高的吞吐量。每个 `PutRecords` 请求最多可以支持 500 条记录。请求中的每一个记录最大可以为 1 MB，整个请求的上限为 5 MB，包括分区键。与下面描述的单个 `PutRecord` 操作一样，`PutRecords` 将使用序列号和分区键。但是，`PutRecord` 参数 `SequenceNumberForOrdering` 未包含在 `PutRecords` 调用中。`PutRecords` 操作将尝试按请求的自然顺序处理所有记录。

每个数据记录都有一个唯一的序列号。此序列号在您调用 `client.putRecords` 向流添加数据记录之后由 Kinesis Data Streams 分配。同一分区键的序列号通常会随时间变化增加；`PutRecords` 请求之间的时间段越长，序列号变得越大。

**注意**  
序列号不能用作相同流中的数据集的索引。为了在逻辑上分隔数据集，请使用分区键或者为每个数据集创建单独的流。

`PutRecords` 请求可包含具有不同分区键的记录。请求的应用范围是一个流；每个请求可包含分区键和记录的任何组合，直到达到请求限制。使用许多不同的分区键对具有许多不同分片的流进行的请求一般快于使用少量分区键对少量分片进行的请求。分区键的数量应远大于分片的数量以减少延迟并最大程度提高吞吐量。

#### PutRecords 示例
<a name="kinesis-using-sdk-java-putrecords-example"></a>

以下代码创建 100 个使用连续分区键的数据记录并将其放入名为 `DataStream` 的流中。

```
        AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
        
        clientBuilder.setRegion(regionName);
        clientBuilder.setCredentials(credentialsProvider);
        clientBuilder.setClientConfiguration(config);
        
        AmazonKinesis kinesisClient = clientBuilder.build();
 
        PutRecordsRequest putRecordsRequest  = new PutRecordsRequest();
        putRecordsRequest.setStreamName(streamName);
        List <PutRecordsRequestEntry> putRecordsRequestEntryList  = new ArrayList<>(); 
        for (int i = 0; i < 100; i++) {
            PutRecordsRequestEntry putRecordsRequestEntry  = new PutRecordsRequestEntry();
            putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes()));
            putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i));
            putRecordsRequestEntryList.add(putRecordsRequestEntry); 
        }

        putRecordsRequest.setRecords(putRecordsRequestEntryList);
        PutRecordsResult putRecordsResult  = kinesisClient.putRecords(putRecordsRequest);
        System.out.println("Put Result" + putRecordsResult);
```

`PutRecords` 响应包含响应 `Records` 的数组。响应数组中的每个记录按自然顺序（从请求和响应的顶部到底部）直接与请求数组中的一个记录关联。响应 `Records` 数组包含的记录数量始终与请求数组相同。

#### 使用时处理故障 PutRecords
<a name="kinesis-using-sdk-java-putrecords-handling-failures"></a>

默认情况下，请求内的单个记录的失败不会中止对 `PutRecords` 请求中后续记录的处理。这意味着，响应 `Records` 数组包含处理成功和不成功的记录。您必须删除处理不成功的记录并在后续调用中包括它们。

成功的记录包括 `SequenceNumber` 和 `ShardID` 值，而不成功的记录包含 `ErrorCode` 和 `ErrorMessage` 值。`ErrorCode` 参数反映了错误类型，可能为下列值之一：`ProvisionedThroughputExceededException` 或 `InternalFailure`。`ErrorMessage` 提供有关 `ProvisionedThroughputExceededException` 异常的更多详细信息，包括账户 ID、流名称和已阻止的记录的分片 ID。以下示例在 `PutRecords` 请求中有三个记录。第二个记录失败并反映在响应中。

**Example PutRecords 请求语法**  

```
{
    "Records": [
        {
    	"Data": "XzxkYXRhPl8w",
	    "PartitionKey": "partitionKey1"
        },
        {
    	"Data": "AbceddeRFfg12asd",
	    "PartitionKey": "partitionKey1"	
        },
        {
    	"Data": "KFpcd98*7nd1",
	    "PartitionKey": "partitionKey3"
        }
    ],
    "StreamName": "myStream"
}
```

**Example PutRecords 响应语法**  

```
{
    "FailedRecordCount”: 1,
    "Records": [
        {
	    "SequenceNumber": "21269319989900637946712965403778482371",
	    "ShardId": "shardId-000000000001"

        },
        {
	    “ErrorCode":”ProvisionedThroughputExceededException”,
	    “ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."

        },
        {
	    "SequenceNumber": "21269319989999637946712965403778482985",
	    "ShardId": "shardId-000000000002"
        }
    ]
}
```

处理不成功的请求可包含在后续 `PutRecords` 请求中。首先，查看 `putRecordsResult` 中的 `FailedRecordCount` 参数以确认请求中是否存在失败的记录。如果存在，则应将具有 `putRecordsEntry`（不是 `ErrorCode`）的每个 `null` 添加到后续请求中。有关此类处理程序的示例，请参阅以下代码。

**Example PutRecords 失败处理程序**  

```
PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
putRecordsRequest.setStreamName(myStreamName);
List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
for (int j = 0; j < 100; j++) {
    PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
    putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes()));
    putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j));
    putRecordsRequestEntryList.add(putRecordsRequestEntry);
}

putRecordsRequest.setRecords(putRecordsRequestEntryList);
PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest);

while (putRecordsResult.getFailedRecordCount() > 0) {
    final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>();
    final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords();
    for (int i = 0; i < putRecordsResultEntryList.size(); i++) {
        final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i);
        final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i);
        if (putRecordsResultEntry.getErrorCode() != null) {
            failedRecordsList.add(putRecordRequestEntry);
        }
    }
    putRecordsRequestEntryList = failedRecordsList;
    putRecordsRequest.setRecords(putRecordsRequestEntryList);
    putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest);
}
```

### 使用添加单条记录 PutRecord
<a name="kinesis-using-sdk-java-putrecord"></a>

对 [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html) 的每次调用对一个记录起作用。应首选 `PutRecords` 中描述的 [使用添加多条记录 PutRecords](#kinesis-using-sdk-java-putrecords) 操作，除非您的应用程序明确需要每一请求始终发送一条记录，或因某种其他原因无法使用 `PutRecords`。

每个数据记录都有一个唯一的序列号。此序列号在您调用 `client.putRecord` 向流添加数据记录之后由 Kinesis Data Streams 分配。同一分区键的序列号通常会随时间变化增加；`PutRecord` 请求之间的时间段越长，序列号变得越大。

 在快速连续进行放置时，不保证返回的序列号会递增，因为放置操作的发生对于 Kinesis Data Streams 实际上是同步进行的。要确保相同分区键的序列号严格递增，请使用 `SequenceNumberForOrdering` 参数，如 [PutRecord 示例](#kinesis-using-sdk-java-putrecord-example) 代码示例中所示。

 无论您是否使用 `SequenceNumberForOrdering`，Kinesis Data Streams 通过 `GetRecords` 调用接收的记录都将按序列号严格进行排序。

**注意**  
序列号不能用作相同流中的数据集的索引。为了在逻辑上分隔数据集，请使用分区键或者为每个数据集创建单独的流。

分区键用于对流中的数据进行分组。数据记录将基于其分区键分配给流中的分片。具体来说，Kinesis Data Streams 使用分区键作为将分区键（和关联数据）映射到特定分片的哈希函数的输入。

 作为此哈希机制的结果，具有相同分区键的所有数据记录将映射到流中的同一分片。但是，如果分区键的数量超出分片的数量，则一些分片必定会包含具有不同分区键的记录。从设计的角度看，要确保您的所有分片得到充分利用，分片的数量（由 `setShardCount` 的 `CreateStreamRequest` 方法指定）应远少于唯一分区键的数量，并且流至单一分区键的数据量应远少于分片容量。

#### PutRecord 示例
<a name="kinesis-using-sdk-java-putrecord-example"></a>

以下代码创建跨两个分区键分配的 10 条数据记录，并将它们放入名为 `myStreamName` 的流中。

```
for (int j = 0; j < 10; j++) 
{
  PutRecordRequest putRecordRequest = new PutRecordRequest();
  putRecordRequest.setStreamName( myStreamName );
  putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() ));
  putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 ));  
  putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord );
  PutRecordResult putRecordResult = client.putRecord( putRecordRequest );
  sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber();
}
```

上一个代码示例使用 `setSequenceNumberForOrdering` 来确保每个分区键内的顺序严格递增。要有效使用此参数，请将当前记录（记录 *n*）的 `SequenceNumberForOrdering` 设置为前一条记录（记录 *n-1*）的序列号。要获取已添加到流的记录的序列号，请对 `putRecord` 的结果调用 `getSequenceNumber`。

`SequenceNumberForOrdering` 参数可确保相同分区键的序列号严格递增。`SequenceNumberForOrdering` 不提供跨多个分区键的记录排序。

# 使用 AWS Glue 架构注册表与数据交互
<a name="kinesis-integration-glue-schema-registry"></a>

您可以将 Kinesis 数据流与 AWS Glue 架构注册表集成。 AWS Glue 架构注册表能帮助您集中发现、控制和演变架构，同时确保注册架构持续验证生成的数据。架构定义了数据记录的结构和格式。架构是用于可靠数据发布、使用或存储的版本化规范。 AWS Glue Schema Registr end-to-end y 允许您改善流媒体应用程序中的数据质量和数据治理。有关更多信息，请参阅 [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)。设置此集成的方法之一是通过 Java SDK 中 AWS 提供的`PutRecords`和 `PutRecord` Kinesis Data APIs Streams。

有关如何使用 PutRecords PutRecord 和 Kinesis Data Streams 设置 Kinesis Data Streams 与架构注册表集成的详细说明，请参阅用[例：将 Amazon Kinesis APIs 数据流与 Glue 架构注册表集成中的 “使用 Kinesis 数据流与数据交互” 部分](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds)。 APIs AWS 