

Doc AWS SDK Examples GitHub リポジトリには、他にも SDK の例があります。 [AWS](https://github.com/awsdocs/aws-doc-sdk-examples)

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# SDK for Rust を使用した Kinesis の例
<a name="rust_1_kinesis_code_examples"></a>

次のコード例は、Kinesis で AWS SDK for Rust を使用してアクションを実行し、一般的なシナリオを実装する方法を示しています。

*アクション*はより大きなプログラムからのコードの抜粋であり、コンテキスト内で実行する必要があります。アクションは個々のサービス機能を呼び出す方法を示していますが、コンテキスト内のアクションは、関連するシナリオで確認できます。

各例には完全なソースコードへのリンクが含まれており、コードの設定方法と実行方法に関する手順を確認できます。

**Topics**
+ [アクション](#actions)
+ [サーバーレスサンプル](#serverless_examples)

## アクション
<a name="actions"></a>

### `CreateStream`
<a name="kinesis_CreateStream_rust_1_topic"></a>

次のコード例は、`CreateStream` を使用する方法を示しています。

**SDK for Rust**  
 GitHub には、その他のリソースもあります。用例一覧を検索し、[AWS コード例リポジトリ](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/rustv1/examples/kinesis#code-examples)での設定と実行の方法を確認してください。

```
async fn make_stream(client: &Client, stream: &str) -> Result<(), Error> {
    client
        .create_stream()
        .stream_name(stream)
        .shard_count(4)
        .send()
        .await?;

    println!("Created stream");

    Ok(())
}
```
+  API の詳細については、*AWS SDK for Rust API リファレンス*の「[CreateStream](https://docs.rs/aws-sdk-kinesis/latest/aws_sdk_kinesis/client/struct.Client.html#method.create_stream)」を参照してください。

### `DeleteStream`
<a name="kinesis_DeleteStream_rust_1_topic"></a>

次のコード例は、`DeleteStream` を使用する方法を示しています。

**SDK for Rust**  
 GitHub には、その他のリソースもあります。用例一覧を検索し、[AWS コード例リポジトリ](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/rustv1/examples/kinesis#code-examples)での設定と実行の方法を確認してください。

```
async fn remove_stream(client: &Client, stream: &str) -> Result<(), Error> {
    client.delete_stream().stream_name(stream).send().await?;

    println!("Deleted stream.");

    Ok(())
}
```
+  API の詳細については、*AWS SDK for Rust API リファレンス*の「[DeleteStream](https://docs.rs/aws-sdk-kinesis/latest/aws_sdk_kinesis/client/struct.Client.html#method.delete_stream)」を参照してください。

### `DescribeStream`
<a name="kinesis_DescribeStream_rust_1_topic"></a>

次のコード例は、`DescribeStream` を使用する方法を示しています。

**SDK for Rust**  
 GitHub には、その他のリソースもあります。用例一覧を検索し、[AWS コード例リポジトリ](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/rustv1/examples/kinesis#code-examples)での設定と実行の方法を確認してください。

```
async fn show_stream(client: &Client, stream: &str) -> Result<(), Error> {
    let resp = client.describe_stream().stream_name(stream).send().await?;

    let desc = resp.stream_description.unwrap();

    println!("Stream description:");
    println!("  Name:              {}:", desc.stream_name());
    println!("  Status:            {:?}", desc.stream_status());
    println!("  Open shards:       {:?}", desc.shards.len());
    println!("  Retention (hours): {}", desc.retention_period_hours());
    println!("  Encryption:        {:?}", desc.encryption_type.unwrap());

    Ok(())
}
```
+  API の詳細については、*AWS SDK for Rust API リファレンス*の「[DescribeStream](https://docs.rs/aws-sdk-kinesis/latest/aws_sdk_kinesis/client/struct.Client.html#method.describe_stream)」を参照してください。

### `ListStreams`
<a name="kinesis_ListStreams_rust_1_topic"></a>

次のコード例は、`ListStreams` を使用する方法を示しています。

**SDK for Rust**  
 GitHub には、その他のリソースもあります。用例一覧を検索し、[AWS コード例リポジトリ](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/rustv1/examples/kinesis#code-examples)での設定と実行の方法を確認してください。

```
async fn show_streams(client: &Client) -> Result<(), Error> {
    let resp = client.list_streams().send().await?;

    println!("Stream names:");

    let streams = resp.stream_names;
    for stream in &streams {
        println!("  {}", stream);
    }

    println!("Found {} stream(s)", streams.len());

    Ok(())
}
```
+  API の詳細については、*AWS SDK for Rust API リファレンス*の「[ListStreams](https://docs.rs/aws-sdk-kinesis/latest/aws_sdk_kinesis/client/struct.Client.html#method.list_streams)」を参照してください。

### `PutRecord`
<a name="kinesis_PutRecord_rust_1_topic"></a>

次のコード例は、`PutRecord` を使用する方法を示しています。

**SDK for Rust**  
 GitHub には、その他のリソースもあります。用例一覧を検索し、[AWS コード例リポジトリ](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/rustv1/examples/kinesis#code-examples)での設定と実行の方法を確認してください。

```
async fn add_record(client: &Client, stream: &str, key: &str, data: &str) -> Result<(), Error> {
    let blob = Blob::new(data);

    client
        .put_record()
        .data(blob)
        .partition_key(key)
        .stream_name(stream)
        .send()
        .await?;

    println!("Put data into stream.");

    Ok(())
}
```
+  API の詳細については、*AWS SDK for Rust API リファレンス*の「[PutRecord](https://docs.rs/aws-sdk-kinesis/latest/aws_sdk_kinesis/client/struct.Client.html#method.put_record)」を参照してください。

## サーバーレスサンプル
<a name="serverless_examples"></a>

### Kinesis トリガーから Lambda 関数を呼び出す
<a name="serverless_Kinesis_Lambda_rust_1_topic"></a>

次のコード例では、Kinesis ストリームからレコードを受信することによってトリガーされるイベントを受け取る、Lambda 関数の実装方法を示しています。この関数は Kinesis ペイロードを取得し、それを Base64 からデコードして、そのレコードの内容をログ記録します。

**SDK for Rust**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Rust を使用した Lambda での Kinesis イベントの消費。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::event::kinesis::KinesisEvent;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> {
    if event.payload.records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(());
    }

    event.payload.records.iter().for_each(|record| {
        tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default());

        let record_data = std::str::from_utf8(&record.kinesis.data);

        match record_data {
            Ok(data) => {
                // log the record data
                tracing::info!("Data: {}", data);
            }
            Err(e) => {
                tracing::error!("Error: {}", e);
            }
        }
    });

    tracing::info!(
        "Successfully processed {} records",
        event.payload.records.len()
    );

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

### Kinesis トリガーを使用した Lambda 関数でのバッチアイテム失敗のレポート
<a name="serverless_Kinesis_Lambda_batch_item_failures_rust_1_topic"></a>

以下のコード例では、Kinesis ストリームからイベントを受け取る Lambda 関数のための、部分的なバッチレスポンスの実装方法を示しています。この関数は、レスポンスとしてバッチアイテムの失敗を報告し、対象のメッセージを後で再試行するよう Lambda に伝えます。

**SDK for Rust**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Rust を使用した Lambda での Kinesis バッチアイテム失敗のレポート。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::{
    event::kinesis::KinesisEvent,
    kinesis::KinesisEventRecord,
    streams::{KinesisBatchItemFailure, KinesisEventResponse},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<KinesisEventResponse, Error> {
    let mut response = KinesisEventResponse {
        batch_item_failures: vec![],
    };

    if event.payload.records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(response);
    }

    for record in &event.payload.records {
        tracing::info!(
            "EventId: {}",
            record.event_id.as_deref().unwrap_or_default()
        );

        let record_processing_result = process_record(record);

        if record_processing_result.is_err() {
            response.batch_item_failures.push(KinesisBatchItemFailure {
                item_identifier: record.kinesis.sequence_number.clone(),
            });
            /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
            return Ok(response);
        }
    }

    tracing::info!(
        "Successfully processed {} records",
        event.payload.records.len()
    );

    Ok(response)
}

fn process_record(record: &KinesisEventRecord) -> Result<(), Error> {
    let record_data = std::str::from_utf8(record.kinesis.data.as_slice());

    if let Some(err) = record_data.err() {
        tracing::error!("Error: {}", err);
        return Err(Error::from(err));
    }

    let record_data = record_data.unwrap_or_default();

    // do something interesting with the data
    tracing::info!("Data: {}", record_data);

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```