Esempi di Kinesis utilizzati per Rust SDK - Esempi di codice dell'AWS SDK

Ci sono altri AWS SDK esempi disponibili nel repository AWS Doc SDK Examples GitHub .

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Esempi di Kinesis utilizzati per Rust SDK

I seguenti esempi di codice mostrano come eseguire azioni e implementare scenari comuni utilizzando AWS SDK for Rust with Kinesis.

Le operazioni sono estratti di codice da programmi più grandi e devono essere eseguite nel contesto. Sebbene le azioni mostrino come richiamare le singole funzioni di servizio, puoi vedere le azioni nel loro contesto negli scenari correlati.

Ogni esempio include un collegamento al codice sorgente completo, in cui è possibile trovare istruzioni su come configurare ed eseguire il codice nel contesto.

Azioni

Il seguente esempio di codice mostra come utilizzareCreateStream.

SDKper Rust
Nota

c'è altro da fare GitHub. Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

async fn make_stream(client: &Client, stream: &str) -> Result<(), Error> { client .create_stream() .stream_name(stream) .shard_count(4) .send() .await?; println!("Created stream"); Ok(()) }
  • Per API i dettagli, CreateStreamconsulta AWS SDKRust API Reference.

Il seguente esempio di codice mostra come usareDeleteStream.

SDKper Rust
Nota

c'è altro da fare GitHub. Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

async fn remove_stream(client: &Client, stream: &str) -> Result<(), Error> { client.delete_stream().stream_name(stream).send().await?; println!("Deleted stream."); Ok(()) }
  • Per API i dettagli, DeleteStreamconsulta AWS SDKRust API Reference.

Il seguente esempio di codice mostra come usareDescribeStream.

SDKper Rust
Nota

c'è altro da fare GitHub. Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

async fn show_stream(client: &Client, stream: &str) -> Result<(), Error> { let resp = client.describe_stream().stream_name(stream).send().await?; let desc = resp.stream_description.unwrap(); println!("Stream description:"); println!(" Name: {}:", desc.stream_name()); println!(" Status: {:?}", desc.stream_status()); println!(" Open shards: {:?}", desc.shards.len()); println!(" Retention (hours): {}", desc.retention_period_hours()); println!(" Encryption: {:?}", desc.encryption_type.unwrap()); Ok(()) }
  • Per API i dettagli, DescribeStreamconsulta AWS SDKRust API Reference.

Il seguente esempio di codice mostra come usareListStreams.

SDKper Rust
Nota

c'è altro da fare GitHub. Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

async fn show_streams(client: &Client) -> Result<(), Error> { let resp = client.list_streams().send().await?; println!("Stream names:"); let streams = resp.stream_names; for stream in &streams { println!(" {}", stream); } println!("Found {} stream(s)", streams.len()); Ok(()) }
  • Per API i dettagli, ListStreamsconsulta AWS SDKRust API Reference.

Il seguente esempio di codice mostra come usarePutRecord.

SDKper Rust
Nota

c'è altro da fare GitHub. Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

async fn add_record(client: &Client, stream: &str, key: &str, data: &str) -> Result<(), Error> { let blob = Blob::new(data); client .put_record() .data(blob) .partition_key(key) .stream_name(stream) .send() .await?; println!("Put data into stream."); Ok(()) }
  • Per API i dettagli, PutRecordconsulta AWS SDKRust API Reference.

Esempi serverless

Il seguente esempio di codice mostra come implementare una funzione Lambda che riceve un evento attivato dalla ricezione di record da un flusso Kinesis. La funzione recupera il payload Kinesis, lo decodifica da Base64 e registra il contenuto del record.

SDKper Rust
Nota

c'è altro da fare GitHub. Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.

Consumo di un evento Kinesis con Lambda utilizzando Rust.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::event::kinesis::KinesisEvent; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> { if event.payload.records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(()); } event.payload.records.iter().for_each(|record| { tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default()); let record_data = std::str::from_utf8(&record.kinesis.data); match record_data { Ok(data) => { // log the record data tracing::info!("Data: {}", data); } Err(e) => { tracing::error!("Error: {}", e); } } }); tracing::info!( "Successfully processed {} records", event.payload.records.len() ); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }

Il seguente esempio di codice mostra come implementare una risposta batch parziale per le funzioni Lambda che ricevono eventi da un flusso Kinesis. La funzione riporta gli errori degli elementi batch nella risposta, segnalando a Lambda di riprovare tali messaggi in un secondo momento.

SDKper Rust
Nota

c'è altro da fare GitHub. Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.

Segnalazione degli errori degli elementi batch Kinesis con Lambda utilizzando Rust.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::{ event::kinesis::KinesisEvent, kinesis::KinesisEventRecord, streams::{KinesisBatchItemFailure, KinesisEventResponse}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<KinesisEventResponse, Error> { let mut response = KinesisEventResponse { batch_item_failures: vec![], }; if event.payload.records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(response); } for record in &event.payload.records { tracing::info!( "EventId: {}", record.event_id.as_deref().unwrap_or_default() ); let record_processing_result = process_record(record); if record_processing_result.is_err() { response.batch_item_failures.push(KinesisBatchItemFailure { item_identifier: record.kinesis.sequence_number.clone(), }); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return Ok(response); } } tracing::info!( "Successfully processed {} records", event.payload.records.len() ); Ok(response) } fn process_record(record: &KinesisEventRecord) -> Result<(), Error> { let record_data = std::str::from_utf8(record.kinesis.data.as_slice()); if let Some(err) = record_data.err() { tracing::error!("Error: {}", err); return Err(Error::from(err)); } let record_data = record_data.unwrap_or_default(); // do something interesting with the data tracing::info!("Data: {}", record_data); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }