SDK for Rust を使用した Kinesis の例 - AWS SDKコードの例

Doc AWS SDK ExamplesWord リポジトリには、さらに多くの GitHub の例があります。 AWS SDK


SDK for Rust を使用した Kinesis の例

次のコード例は、 AWS SDK for Rust with Kinesis を使用してアクションを実行し、一般的なシナリオを実装する方法を示しています。




次のコード例は、CreateStream を使用する方法を示しています。

Rust のSDK

GitHub には他にもあります。用例一覧を検索し、AWS コード例リポジトリでの設定と実行の方法を確認してください。

async fn make_stream(client: &Client, stream: &str) -> Result<(), Error> { client .create_stream() .stream_name(stream) .shard_count(4) .send() .await?; println!("Created stream"); Ok(()) }
  • API の詳細については、CreateStream AWS SDK for Rust API リファレンス」を参照してください。

次のコード例は、DeleteStream を使用する方法を示しています。

Rust のSDK

GitHub には他にもあります。用例一覧を検索し、AWS コード例リポジトリでの設定と実行の方法を確認してください。

async fn remove_stream(client: &Client, stream: &str) -> Result<(), Error> { client.delete_stream().stream_name(stream).send().await?; println!("Deleted stream."); Ok(()) }
  • API の詳細については、DeleteStream AWS SDK for Rust API リファレンス」を参照してください。

次の例は、DescribeStream を使用する方法を説明しています。

Rust のSDK

GitHub には他にもあります。用例一覧を検索し、AWS コード例リポジトリでの設定と実行の方法を確認してください。

async fn show_stream(client: &Client, stream: &str) -> Result<(), Error> { let resp = client.describe_stream().stream_name(stream).send().await?; let desc = resp.stream_description.unwrap(); println!("Stream description:"); println!(" Name: {}:", desc.stream_name()); println!(" Status: {:?}", desc.stream_status()); println!(" Open shards: {:?}", desc.shards.len()); println!(" Retention (hours): {}", desc.retention_period_hours()); println!(" Encryption: {:?}", desc.encryption_type.unwrap()); Ok(()) }
  • API の詳細については、DescribeStream AWS SDK for Rust API リファレンス」を参照してください。

次のコード例は、ListStreams を使用する方法を示しています。

Rust のSDK

GitHub には他にもあります。用例一覧を検索し、AWS コード例リポジトリでの設定と実行の方法を確認してください。

async fn show_streams(client: &Client) -> Result<(), Error> { let resp = client.list_streams().send().await?; println!("Stream names:"); let streams = resp.stream_names; for stream in &streams { println!(" {}", stream); } println!("Found {} stream(s)", streams.len()); Ok(()) }
  • API の詳細については、ListStreams AWS SDK for Rust API リファレンス」を参照してください。

次の例は、PutRecord を使用する方法を説明しています。

Rust のSDK

GitHub には他にもあります。用例一覧を検索し、AWS コード例リポジトリでの設定と実行の方法を確認してください。

async fn add_record(client: &Client, stream: &str, key: &str, data: &str) -> Result<(), Error> { let blob = Blob::new(data); client .put_record() .data(blob) .partition_key(key) .stream_name(stream) .send() .await?; println!("Put data into stream."); Ok(()) }
  • API の詳細については、PutRecord AWS SDK for Rust API リファレンス」を参照してください。


次のコード例では、Kinesis ストリームからレコードを受信することによってトリガーされるイベントを受け取る、Lambda 関数の実装方法を示しています。この関数は Kinesis ペイロードを取得し、それを Base64 からデコードして、そのレコードの内容をログ記録します。

Rust のSDK

GitHub には他にもあります。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

Rust を使用した Lambda での Kinesis イベントの消費。

// Copyright, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::event::kinesis::KinesisEvent; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> { if event.payload.records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(()); } event.payload.records.iter().for_each(|record| { tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default()); let record_data = std::str::from_utf8(&; match record_data { Ok(data) => { // log the record data tracing::info!("Data: {}", data); } Err(e) => { tracing::error!("Error: {}", e); } } }); tracing::info!( "Successfully processed {} records", event.payload.records.len() ); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }

以下のコード例では、Kinesis ストリームからイベントを受け取る Lambda 関数のための、部分的なバッチレスポンスの実装方法を示しています。この関数は、レスポンスとしてバッチアイテムの失敗を報告し、対象のメッセージを後で再試行するよう Lambda に伝えます。

Rust のSDK

GitHub には他にもあります。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

Rust を使用した Lambda での Kinesis バッチアイテム失敗のレポート。

// Copyright, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::{ event::kinesis::KinesisEvent, kinesis::KinesisEventRecord, streams::{KinesisBatchItemFailure, KinesisEventResponse}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<KinesisEventResponse, Error> { let mut response = KinesisEventResponse { batch_item_failures: vec![], }; if event.payload.records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(response); } for record in &event.payload.records { tracing::info!( "EventId: {}", record.event_id.as_deref().unwrap_or_default() ); let record_processing_result = process_record(record); if record_processing_result.is_err() { response.batch_item_failures.push(KinesisBatchItemFailure { item_identifier: record.kinesis.sequence_number.clone(), }); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return Ok(response); } } tracing::info!( "Successfully processed {} records", event.payload.records.len() ); Ok(response) } fn process_record(record: &KinesisEventRecord) -> Result<(), Error> { let record_data = std::str::from_utf8(; if let Some(err) = record_data.err() { tracing::error!("Error: {}", err); return Err(Error::from(err)); } let record_data = record_data.unwrap_or_default(); // do something interesting with the data tracing::info!("Data: {}", record_data); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }