与 AWS SDK或GetRecords一起使用 CLI - AWS SDK代码示例

AWS 文档 AWS SDK示例 GitHub 存储库中还有更多SDK示例

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

与 AWS SDK或GetRecords一起使用 CLI

以下代码示例演示如何使用 GetRecords

操作示例是大型程序的代码摘录,必须在上下文中运行。在以下代码示例中,您可以查看此操作的上下文:

CLI
AWS CLI

获取分片中的记录

以下 get-records 示例将使用指定的分片迭代器从 Kinesis 数据流的分片中获取数据记录。

aws kinesis get-records \ --shard-iterator AAAAAAAAAAF7/0mWD7IuHj1yGv/TKuNgx2ukD5xipCY4cy4gU96orWwZwcSXh3K9tAmGYeOZyLZrvzzeOFVf9iN99hUPw/w/b0YWYeehfNvnf1DYt5XpDJghLKr3DzgznkTmMymDP3R+3wRKeuEw6/kdxY2yKJH0veaiekaVc4N2VwK/GvaGP2Hh9Fg7N++q0Adg6fIDQPt4p8RpavDbk+A4sL9SWGE1

输出:

{ "Records": [], "MillisBehindLatest": 80742000 }

有关更多信息,请参阅亚马逊 Kinesis Data Streams 开发者指南 AWS SDK》中的使用 Kinesis Data API Streams 和 Java 版开发消费者

Java
SDK适用于 Java 2.x
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import java.util.ArrayList; import java.util.List; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class GetRecords { public static void main(String[] args) { final String usage = """ Usage: <streamName> Where: streamName - The Amazon Kinesis data stream to read from (for example, StockTradeStream). """; if (args.length != 1) { System.out.println(usage); System.exit(1); } String streamName = args[0]; Region region = Region.US_EAST_1; KinesisClient kinesisClient = KinesisClient.builder() .region(region) .build(); getStockTrades(kinesisClient, streamName); kinesisClient.close(); } public static void getStockTrades(KinesisClient kinesisClient, String streamName) { String shardIterator; String lastShardId = null; DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder() .streamName(streamName) .build(); List<Shard> shards = new ArrayList<>(); DescribeStreamResponse streamRes; do { streamRes = kinesisClient.describeStream(describeStreamRequest); shards.addAll(streamRes.streamDescription().shards()); if (shards.size() > 0) { lastShardId = shards.get(shards.size() - 1).shardId(); } } while (streamRes.streamDescription().hasMoreShards()); GetShardIteratorRequest itReq = GetShardIteratorRequest.builder() .streamName(streamName) .shardIteratorType("TRIM_HORIZON") .shardId(lastShardId) .build(); GetShardIteratorResponse shardIteratorResult = kinesisClient.getShardIterator(itReq); shardIterator = shardIteratorResult.shardIterator(); // Continuously read data records from shard. List<Record> records; // Create new GetRecordsRequest with existing shardIterator. // Set maximum records to return to 1000. GetRecordsRequest recordsRequest = GetRecordsRequest.builder() .shardIterator(shardIterator) .limit(1000) .build(); GetRecordsResponse result = kinesisClient.getRecords(recordsRequest); // Put result into record list. Result may be empty. records = result.records(); // Print records for (Record record : records) { SdkBytes byteBuffer = record.data(); System.out.printf("Seq No: %s - %s%n", record.sequenceNumber(), new String(byteBuffer.asByteArray())); } } }
  • 有关API详细信息,请参阅 “AWS SDK for Java 2.x API参考 GetRecords” 中的。

PowerShell
用于 PowerShell

示例 1:此示例将展示如何从一条或多条记录中返回和提取数据。提供给 Get-的迭代器KINRecord确定要返回的记录的起始位置,在本例中,这些记录被捕获到变量 $records 中。然后,可以通过对 $records 集合进行索引来访问每条记录。假设记录中的数据是 UTF -8 编码的文本,则最后一个命令显示如何从对象 MemoryStream 中的中提取数据并将其作为文本返回到控制台。

$records $records = Get-KINRecord -ShardIterator "AAAAAAAAAAGIc....9VnbiRNaP"

输出:

MillisBehindLatest NextShardIterator Records ------------------ ----------------- ------- 0 AAAAAAAAAAERNIq...uDn11HuUs {Key1, Key2}
$records.Records[0]

输出:

ApproximateArrivalTimestamp Data PartitionKey SequenceNumber --------------------------- ---- ------------ -------------- 3/7/2016 5:14:33 PM System.IO.MemoryStream Key1 4955986459776...931586
[Text.Encoding]::UTF8.GetString($records.Records[0].Data.ToArray())

输出:

test data from string
  • 有关API详细信息,请参阅 AWS Tools for PowerShell Cmdlet 参考GetRecords中的。

Python
SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class KinesisStream: """Encapsulates a Kinesis stream.""" def __init__(self, kinesis_client): """ :param kinesis_client: A Boto3 Kinesis client. """ self.kinesis_client = kinesis_client self.name = None self.details = None self.stream_exists_waiter = kinesis_client.get_waiter("stream_exists") def get_records(self, max_records): """ Gets records from the stream. This function is a generator that first gets a shard iterator for the stream, then uses the shard iterator to get records in batches from the stream. The shard iterator can be accessed through the 'details' property, which is populated using the 'describe' function of this class. Each batch of records is yielded back to the caller until the specified maximum number of records has been retrieved. :param max_records: The maximum number of records to retrieve. :return: Yields the current batch of retrieved records. """ try: response = self.kinesis_client.get_shard_iterator( StreamName=self.name, ShardId=self.details["Shards"][0]["ShardId"], ShardIteratorType="LATEST", ) shard_iter = response["ShardIterator"] record_count = 0 while record_count < max_records: response = self.kinesis_client.get_records( ShardIterator=shard_iter, Limit=10 ) shard_iter = response["NextShardIterator"] records = response["Records"] logger.info("Got %s records.", len(records)) record_count += len(records) yield records except ClientError: logger.exception("Couldn't get records from stream %s.", self.name) raise def describe(self, name): """ Gets metadata about a stream. :param name: The name of the stream. :return: Metadata about the stream. """ try: response = self.kinesis_client.describe_stream(StreamName=name) self.name = name self.details = response["StreamDescription"] logger.info("Got stream %s.", name) except ClientError: logger.exception("Couldn't get %s.", name) raise else: return self.details
  • 有关API详细信息,请参阅GetRecords中的 AWS SDKPython (Boto3) API 参考。

SAP ABAP
SDK对于 SAP ABAP
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

TRY. oo_result = lo_kns->getrecords( " oo_result is returned for testing purposes. " iv_sharditerator = iv_shard_iterator ). DATA(lt_records) = oo_result->get_records( ). MESSAGE 'Record retrieved.' TYPE 'I'. CATCH /aws1/cx_knsexpirediteratorex . MESSAGE 'Iterator expired.' TYPE 'E'. CATCH /aws1/cx_knsinvalidargumentex . MESSAGE 'The specified argument was not valid.' TYPE 'E'. CATCH /aws1/cx_knskmsaccessdeniedex . MESSAGE 'You do not have permission to perform this AWS KMS action.' TYPE 'E'. CATCH /aws1/cx_knskmsdisabledex . MESSAGE 'KMS key used is disabled.' TYPE 'E'. CATCH /aws1/cx_knskmsinvalidstateex . MESSAGE 'KMS key used is in an invalid state. ' TYPE 'E'. CATCH /aws1/cx_knskmsnotfoundex . MESSAGE 'KMS key used is not found.' TYPE 'E'. CATCH /aws1/cx_knskmsoptinrequired . MESSAGE 'KMS key option is required.' TYPE 'E'. CATCH /aws1/cx_knskmsthrottlingex . MESSAGE 'The rate of requests to AWS KMS is exceeding the request quotas.' TYPE 'E'. CATCH /aws1/cx_knsprovthruputexcdex . MESSAGE 'The request rate for the stream is too high, or the requested data is too large for the available throughput.' TYPE 'E'. CATCH /aws1/cx_knsresourcenotfoundex . MESSAGE 'Resource being accessed is not found.' TYPE 'E'. ENDTRY.
  • 有关API详细信息,请参阅GetRecords中的AWS SDK以供SAPABAPAPI参考