Kinesis-Beispiele für die Verwendung von SDK for Rust - AWS SDKCode-Beispiele

Weitere AWS SDK Beispiele sind im Repo AWS Doc SDK Examples GitHub verfügbar.

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Kinesis-Beispiele für die Verwendung von SDK for Rust

Die folgenden Codebeispiele zeigen Ihnen, wie Sie mithilfe von AWS SDK for Rust mit Kinesis Aktionen ausführen und allgemeine Szenarien implementieren.

Aktionen sind Codeauszüge aus größeren Programmen und müssen im Kontext ausgeführt werden. Aktionen zeigen Ihnen zwar, wie Sie einzelne Servicefunktionen aufrufen, aber Sie können Aktionen in den zugehörigen Szenarien im Kontext sehen.

Jedes Beispiel enthält einen Link zum vollständigen Quellcode, in dem Sie Anweisungen zum Einrichten und Ausführen des Codes im Kontext finden.

Aktionen

Das folgende Codebeispiel zeigt die VerwendungCreateStream.

SDKfür Rust
Anmerkung

Es gibt noch mehr dazu GitHub. Sie sehen das vollständige Beispiel und erfahren, wie Sie das AWS -Code-Beispiel-Repository einrichten und ausführen.

async fn make_stream(client: &Client, stream: &str) -> Result<(), Error> { client .create_stream() .stream_name(stream) .shard_count(4) .send() .await?; println!("Created stream"); Ok(()) }
  • APIEinzelheiten finden Sie CreateStreamin der AWS SDKAPIRust-Referenz.

Das folgende Codebeispiel zeigt die VerwendungDeleteStream.

SDKfür Rust
Anmerkung

Es gibt noch mehr dazu GitHub. Sie sehen das vollständige Beispiel und erfahren, wie Sie das AWS -Code-Beispiel-Repository einrichten und ausführen.

async fn remove_stream(client: &Client, stream: &str) -> Result<(), Error> { client.delete_stream().stream_name(stream).send().await?; println!("Deleted stream."); Ok(()) }
  • APIEinzelheiten finden Sie DeleteStreamin der AWS SDKAPIRust-Referenz.

Das folgende Codebeispiel zeigt die VerwendungDescribeStream.

SDKfür Rust
Anmerkung

Es gibt noch mehr dazu GitHub. Sie sehen das vollständige Beispiel und erfahren, wie Sie das AWS -Code-Beispiel-Repository einrichten und ausführen.

async fn show_stream(client: &Client, stream: &str) -> Result<(), Error> { let resp = client.describe_stream().stream_name(stream).send().await?; let desc = resp.stream_description.unwrap(); println!("Stream description:"); println!(" Name: {}:", desc.stream_name()); println!(" Status: {:?}", desc.stream_status()); println!(" Open shards: {:?}", desc.shards.len()); println!(" Retention (hours): {}", desc.retention_period_hours()); println!(" Encryption: {:?}", desc.encryption_type.unwrap()); Ok(()) }
  • APIEinzelheiten finden Sie DescribeStreamin der AWS SDKAPIRust-Referenz.

Das folgende Codebeispiel zeigt die VerwendungListStreams.

SDKfür Rust
Anmerkung

Es gibt noch mehr dazu GitHub. Sie sehen das vollständige Beispiel und erfahren, wie Sie das AWS -Code-Beispiel-Repository einrichten und ausführen.

async fn show_streams(client: &Client) -> Result<(), Error> { let resp = client.list_streams().send().await?; println!("Stream names:"); let streams = resp.stream_names; for stream in &streams { println!(" {}", stream); } println!("Found {} stream(s)", streams.len()); Ok(()) }
  • APIEinzelheiten finden Sie ListStreamsin der AWS SDKAPIRust-Referenz.

Das folgende Codebeispiel zeigt die VerwendungPutRecord.

SDKfür Rust
Anmerkung

Es gibt noch mehr dazu GitHub. Sie sehen das vollständige Beispiel und erfahren, wie Sie das AWS -Code-Beispiel-Repository einrichten und ausführen.

async fn add_record(client: &Client, stream: &str, key: &str, data: &str) -> Result<(), Error> { let blob = Blob::new(data); client .put_record() .data(blob) .partition_key(key) .stream_name(stream) .send() .await?; println!("Put data into stream."); Ok(()) }
  • APIEinzelheiten finden Sie PutRecordin der AWS SDKAPIRust-Referenz.

Serverless-Beispiele

Das folgende Codebeispiel zeigt, wie eine Lambda-Funktion implementiert wird, die ein Ereignis empfängt, das durch den Empfang von Datensätzen aus einem Kinesis-Stream ausgelöst wird. Die Funktion ruft die Kinesis-Nutzlast ab, dekodiert von Base64 und protokolliert den Datensatzinhalt.

SDKfür Rust
Anmerkung

Es gibt noch mehr dazu GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Ein Kinesis-Ereignis mit Lambda mithilfe von Rust konsumieren.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::event::kinesis::KinesisEvent; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> { if event.payload.records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(()); } event.payload.records.iter().for_each(|record| { tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default()); let record_data = std::str::from_utf8(&record.kinesis.data); match record_data { Ok(data) => { // log the record data tracing::info!("Data: {}", data); } Err(e) => { tracing::error!("Error: {}", e); } } }); tracing::info!( "Successfully processed {} records", event.payload.records.len() ); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }

Das folgende Codebeispiel zeigt, wie eine partielle Batch-Antwort für Lambda-Funktionen implementiert wird, die Ereignisse aus einem Kinesis-Stream empfangen. Die Funktion meldet die Batch-Elementfehler in der Antwort und signalisiert Lambda, diese Nachrichten später erneut zu versuchen.

SDKfür Rust
Anmerkung

Es gibt noch mehr dazu GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Melden von Fehlern Kinesis Kinesis-Batch-Elementen mit Lambda mithilfe von Rust.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::{ event::kinesis::KinesisEvent, kinesis::KinesisEventRecord, streams::{KinesisBatchItemFailure, KinesisEventResponse}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<KinesisEventResponse, Error> { let mut response = KinesisEventResponse { batch_item_failures: vec![], }; if event.payload.records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(response); } for record in &event.payload.records { tracing::info!( "EventId: {}", record.event_id.as_deref().unwrap_or_default() ); let record_processing_result = process_record(record); if record_processing_result.is_err() { response.batch_item_failures.push(KinesisBatchItemFailure { item_identifier: record.kinesis.sequence_number.clone(), }); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return Ok(response); } } tracing::info!( "Successfully processed {} records", event.payload.records.len() ); Ok(response) } fn process_record(record: &KinesisEventRecord) -> Result<(), Error> { let record_data = std::str::from_utf8(record.kinesis.data.as_slice()); if let Some(err) = record_data.err() { tracing::error!("Error: {}", err); return Err(Error::from(err)); } let record_data = record_data.unwrap_or_default(); // do something interesting with the data tracing::info!("Data: {}", record_data); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }