与 AWS SDK或PutRecordBatch一起使用 CLI - AWS SDK代码示例

AWS 文档 AWS SDK示例 GitHub 存储库中还有更多SDK示例

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

与 AWS SDK或PutRecordBatch一起使用 CLI

以下代码示例演示如何使用 PutRecordBatch

操作示例是大型程序的代码摘录,必须在上下文中运行。在以下代码示例中,您可以查看此操作的上下文:

CLI
AWS CLI

将多条记录写入流中

以下 put-record-batch 示例将三条记录写入流中。数据以 Base64 格式编码。

aws firehose put-record-batch \ --delivery-stream-name my-stream \ --records file://records.json

myfile.json 的内容:

[ {"Data": "Rmlyc3QgdGhpbmc="}, {"Data": "U2Vjb25kIHRoaW5n"}, {"Data": "VGhpcmQgdGhpbmc="} ]

输出:

{ "FailedPutCount": 0, "Encrypted": false, "RequestResponses": [ { "RecordId": "9D2OJ6t2EqCTZTXwGzeSv/EVHxRoRCw89xd+o3+sXg8DhYOaWKPSmZy/CGlRVEys1u1xbeKh6VofEYKkoeiDrcjrxhQp9iF7sUW7pujiMEQ5LzlrzCkGosxQn+3boDnURDEaD42V7GiixpOyLJkYZcae1i7HzlCEoy9LJhMr8EjDSi4Om/9Vc2uhwwuAtGE0XKpxJ2WD7ZRWtAnYlKAnvgSPRgg7zOWL" }, { "RecordId": "jFirejqxCLlK5xjH/UNmlMVcjktEN76I7916X9PaZ+PVaOSXDfU1WGOqEZhxq2js7xcZ552eoeDxsuTU1MSq9nZTbVfb6cQTIXnm/GsuF37Uhg67GKmR5z90l6XKJ+/+pDloFv7Hh9a3oUS6wYm3DcNRLTHHAimANp1PhkQvWpvLRfzbuCUkBphR2QVzhP9OiHLbzGwy8/DfH8sqWEUYASNJKS8GXP5s" }, { "RecordId": "oy0amQ40o5Y2YV4vxzufdcMOOw6n3EPr3tpPJGoYVNKH4APPVqNcbUgefo1stEFRg4hTLrf2k6eliHu/9+YJ5R3iiedHkdsfkIqX0XTySSutvgFYTjNY1TSrK0pM2sWxpjqqnk3+2UX1MV5z88xGro3cQm/DTBt3qBlmTj7Xq8SKVbO1S7YvMTpWkMKA86f8JfmT8BMKoMb4XZS/sOkQLe+qh0sYKXWl" } ] }

有关更多信息,请参阅《Amazon Kinesis Data Firehose 开发人员指南》中的将数据发送到 Amazon Kinesis Data Firehose 传输流

Python
SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class FirehoseClient: """ AWS Firehose client to send records and monitor metrics. Attributes: config (object): Configuration object with delivery stream name and region. delivery_stream_name (str): Name of the Firehose delivery stream. region (str): AWS region for Firehose and CloudWatch clients. firehose (boto3.client): Boto3 Firehose client. cloudwatch (boto3.client): Boto3 CloudWatch client. """ def __init__(self, config): """ Initialize the FirehoseClient. Args: config (object): Configuration object with delivery stream name and region. """ self.config = config self.delivery_stream_name = config.delivery_stream_name self.region = config.region self.firehose = boto3.client("firehose", region_name=self.region) self.cloudwatch = boto3.client("cloudwatch", region_name=self.region) @backoff.on_exception( backoff.expo, Exception, max_tries=5, jitter=backoff.full_jitter ) def put_record_batch(self, data: list, batch_size: int = 500): """ Put records in batches to Firehose with backoff and retry. Args: data (list): List of data records to be sent to Firehose. batch_size (int): Number of records to send in each batch. Default is 500. This method attempts to send records in batches to the Firehose delivery stream. It retries with exponential backoff in case of exceptions. """ for i in range(0, len(data), batch_size): batch = data[i : i + batch_size] record_dicts = [{"Data": json.dumps(record)} for record in batch] try: response = self.firehose.put_record_batch( DeliveryStreamName=self.delivery_stream_name, Records=record_dicts ) self._log_batch_response(response, len(batch)) except Exception as e: logger.info(f"Failed to send batch of {len(batch)} records. Error: {e}")
  • 有关API详细信息,请参阅PutRecordBatch中的 AWS SDKPython (Boto3) API 参考。

Rust
SDK对于 Rust
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

async fn put_record_batch( client: &Client, stream: &str, data: Vec<Record>, ) -> Result<PutRecordBatchOutput, SdkError<PutRecordBatchError>> { client .put_record_batch() .delivery_stream_name(stream) .set_records(Some(data)) .send() .await }
  • 有关API详细信息,请参见PutRecordBatch中的 Rust AWS SDK API 参考