用于 Rust 的 Lambda 示例 SDK - AWS SDK代码示例

AWS 文档 AWS SDK示例 GitHub 存储库中还有更多SDK示例

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

用于 Rust 的 Lambda 示例 SDK

以下代码示例向您展示了如何使用 for Rust 和 Lambda 来执行操作和实现常见场景。 AWS SDK

基础知识是向您展示如何在服务中执行基本操作的代码示例。

操作是大型程序的代码摘录,必须在上下文中运行。您可以通过操作了解如何调用单个服务函数,还可以通过函数相关场景的上下文查看操作。

场景是向您展示如何通过在一个服务中调用多个函数或与其他 AWS 服务结合来完成特定任务的代码示例。

每个示例都包含一个指向完整源代码的链接,您可以在其中找到有关如何在上下文中设置和运行代码的说明。

基础知识

以下代码示例展示了如何:

  • 创建IAM角色和 Lambda 函数,然后上传处理程序代码。

  • 使用单个参数来调用函数并获取结果。

  • 更新函数代码并使用环境变量进行配置。

  • 使用新参数来调用函数并获取结果。显示返回的执行日志。

  • 列出账户函数,然后清除函数。

有关更多信息,请参阅使用控制台创建 Lambda 函数

SDK对于 Rust
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

在这种情况下使用的带依赖项的 Cargo.toml。

[package] name = "lambda-code-examples" version = "0.1.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] aws-config = { version = "1.0.1", features = ["behavior-version-latest"] } aws-sdk-ec2 = { version = "1.3.0" } aws-sdk-iam = { version = "1.3.0" } aws-sdk-lambda = { version = "1.3.0" } aws-sdk-s3 = { version = "1.4.0" } aws-smithy-types = { version = "1.0.1" } aws-types = { version = "1.0.1" } clap = { version = "4.4", features = ["derive"] } tokio = { version = "1.20.1", features = ["full"] } tracing-subscriber = { version = "0.3.15", features = ["env-filter"] } tracing = "0.1.37" serde_json = "1.0.94" anyhow = "1.0.71" uuid = { version = "1.3.3", features = ["v4"] } lambda_runtime = "0.8.0" serde = "1.0.164"

可就这种情况简化 Lambda 调用的一组实用程序。此文件在 crate 中是 src/ations.rs。

use anyhow::anyhow; use aws_sdk_iam::operation::{create_role::CreateRoleError, delete_role::DeleteRoleOutput}; use aws_sdk_lambda::{ operation::{ delete_function::DeleteFunctionOutput, get_function::GetFunctionOutput, invoke::InvokeOutput, list_functions::ListFunctionsOutput, update_function_code::UpdateFunctionCodeOutput, update_function_configuration::UpdateFunctionConfigurationOutput, }, primitives::ByteStream, types::{Environment, FunctionCode, LastUpdateStatus, State}, }; use aws_sdk_s3::{ error::ErrorMetadata, operation::{delete_bucket::DeleteBucketOutput, delete_object::DeleteObjectOutput}, types::CreateBucketConfiguration, }; use aws_smithy_types::Blob; use serde::{ser::SerializeMap, Serialize}; use std::{fmt::Display, path::PathBuf, str::FromStr, time::Duration}; use tracing::{debug, info, warn}; /* Operation describes */ #[derive(Clone, Copy, Debug, Serialize)] pub enum Operation { #[serde(rename = "plus")] Plus, #[serde(rename = "minus")] Minus, #[serde(rename = "times")] Times, #[serde(rename = "divided-by")] DividedBy, } impl FromStr for Operation { type Err = anyhow::Error; fn from_str(s: &str) -> Result<Self, Self::Err> { match s { "plus" => Ok(Operation::Plus), "minus" => Ok(Operation::Minus), "times" => Ok(Operation::Times), "divided-by" => Ok(Operation::DividedBy), _ => Err(anyhow!("Unknown operation {s}")), } } } impl Display for Operation { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Operation::Plus => write!(f, "plus"), Operation::Minus => write!(f, "minus"), Operation::Times => write!(f, "times"), Operation::DividedBy => write!(f, "divided-by"), } } } /** * InvokeArgs will be serialized as JSON and sent to the AWS Lambda handler. */ #[derive(Debug)] pub enum InvokeArgs { Increment(i32), Arithmetic(Operation, i32, i32), } impl Serialize for InvokeArgs { fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: serde::Serializer, { match self { InvokeArgs::Increment(i) => serializer.serialize_i32(*i), InvokeArgs::Arithmetic(o, i, j) => { let mut map: S::SerializeMap = serializer.serialize_map(Some(3))?; map.serialize_key(&"op".to_string())?; map.serialize_value(&o.to_string())?; map.serialize_key(&"i".to_string())?; map.serialize_value(&i)?; map.serialize_key(&"j".to_string())?; map.serialize_value(&j)?; map.end() } } } } /** A policy document allowing Lambda to execute this function on the account's behalf. */ const ROLE_POLICY_DOCUMENT: &str = r#"{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }"#; /** * A LambdaManager gathers all the resources necessary to run the Lambda example scenario. * This includes instantiated aws_sdk clients and details of resource names. */ pub struct LambdaManager { iam_client: aws_sdk_iam::Client, lambda_client: aws_sdk_lambda::Client, s3_client: aws_sdk_s3::Client, lambda_name: String, role_name: String, bucket: String, own_bucket: bool, } // These unit type structs provide nominal typing on top of String parameters for LambdaManager::new pub struct LambdaName(pub String); pub struct RoleName(pub String); pub struct Bucket(pub String); pub struct OwnBucket(pub bool); impl LambdaManager { pub fn new( iam_client: aws_sdk_iam::Client, lambda_client: aws_sdk_lambda::Client, s3_client: aws_sdk_s3::Client, lambda_name: LambdaName, role_name: RoleName, bucket: Bucket, own_bucket: OwnBucket, ) -> Self { Self { iam_client, lambda_client, s3_client, lambda_name: lambda_name.0, role_name: role_name.0, bucket: bucket.0, own_bucket: own_bucket.0, } } /** * Load the AWS configuration from the environment. * Look up lambda_name and bucket if none are given, or generate a random name if not present in the environment. * If the bucket name is provided, the caller needs to have created the bucket. * If the bucket name is generated, it will be created. */ pub async fn load_from_env(lambda_name: Option<String>, bucket: Option<String>) -> Self { let sdk_config = aws_config::load_from_env().await; let lambda_name = LambdaName(lambda_name.unwrap_or_else(|| { std::env::var("LAMBDA_NAME").unwrap_or_else(|_| "rust_lambda_example".to_string()) })); let role_name = RoleName(format!("{}_role", lambda_name.0)); let (bucket, own_bucket) = match bucket { Some(bucket) => (Bucket(bucket), false), None => ( Bucket(std::env::var("LAMBDA_BUCKET").unwrap_or_else(|_| { format!("rust-lambda-example-{}", uuid::Uuid::new_v4()) })), true, ), }; let s3_client = aws_sdk_s3::Client::new(&sdk_config); if own_bucket { info!("Creating bucket for demo: {}", bucket.0); s3_client .create_bucket() .bucket(bucket.0.clone()) .create_bucket_configuration( CreateBucketConfiguration::builder() .location_constraint(aws_sdk_s3::types::BucketLocationConstraint::from( sdk_config.region().unwrap().as_ref(), )) .build(), ) .send() .await .unwrap(); } Self::new( aws_sdk_iam::Client::new(&sdk_config), aws_sdk_lambda::Client::new(&sdk_config), s3_client, lambda_name, role_name, bucket, OwnBucket(own_bucket), ) } /** * Upload function code from a path to a zip file. * The zip file must have an AL2 Linux-compatible binary called `bootstrap`. * The easiest way to create such a zip is to use `cargo lambda build --output-format Zip`. */ async fn prepare_function( &self, zip_file: PathBuf, key: Option<String>, ) -> Result<FunctionCode, anyhow::Error> { let body = ByteStream::from_path(zip_file).await?; let key = key.unwrap_or_else(|| format!("{}_code", self.lambda_name)); info!("Uploading function code to s3://{}/{}", self.bucket, key); let _ = self .s3_client .put_object() .bucket(self.bucket.clone()) .key(key.clone()) .body(body) .send() .await?; Ok(FunctionCode::builder() .s3_bucket(self.bucket.clone()) .s3_key(key) .build()) } /** * Create a function, uploading from a zip file. */ pub async fn create_function(&self, zip_file: PathBuf) -> Result<String, anyhow::Error> { let code = self.prepare_function(zip_file, None).await?; let key = code.s3_key().unwrap().to_string(); let role = self.create_role().await.map_err(|e| anyhow!(e))?; info!("Created iam role, waiting 15s for it to become active"); tokio::time::sleep(Duration::from_secs(15)).await; info!("Creating lambda function {}", self.lambda_name); let _ = self .lambda_client .create_function() .function_name(self.lambda_name.clone()) .code(code) .role(role.arn()) .runtime(aws_sdk_lambda::types::Runtime::Providedal2) .handler("_unused") .send() .await .map_err(anyhow::Error::from)?; self.wait_for_function_ready().await?; self.lambda_client .publish_version() .function_name(self.lambda_name.clone()) .send() .await?; Ok(key) } /** * Create an IAM execution role for the managed Lambda function. * If the role already exists, use that instead. */ async fn create_role(&self) -> Result<aws_sdk_iam::types::Role, CreateRoleError> { info!("Creating execution role for function"); let get_role = self .iam_client .get_role() .role_name(self.role_name.clone()) .send() .await; if let Ok(get_role) = get_role { if let Some(role) = get_role.role { return Ok(role); } } let create_role = self .iam_client .create_role() .role_name(self.role_name.clone()) .assume_role_policy_document(ROLE_POLICY_DOCUMENT) .send() .await; match create_role { Ok(create_role) => match create_role.role { Some(role) => Ok(role), None => Err(CreateRoleError::generic( ErrorMetadata::builder() .message("CreateRole returned empty success") .build(), )), }, Err(err) => Err(err.into_service_error()), } } /** * Poll `is_function_ready` with a 1-second delay. It returns when the function is ready or when there's an error checking the function's state. */ pub async fn wait_for_function_ready(&self) -> Result<(), anyhow::Error> { info!("Waiting for function"); while !self.is_function_ready(None).await? { info!("Function is not ready, sleeping 1s"); tokio::time::sleep(Duration::from_secs(1)).await; } Ok(()) } /** * Check if a Lambda function is ready to be invoked. * A Lambda function is ready for this scenario when its state is active and its LastUpdateStatus is Successful. * Additionally, if a sha256 is provided, the function must have that as its current code hash. * Any missing properties or failed requests will be reported as an Err. */ async fn is_function_ready( &self, expected_code_sha256: Option<&str>, ) -> Result<bool, anyhow::Error> { match self.get_function().await { Ok(func) => { if let Some(config) = func.configuration() { if let Some(state) = config.state() { info!(?state, "Checking if function is active"); if !matches!(state, State::Active) { return Ok(false); } } match config.last_update_status() { Some(last_update_status) => { info!(?last_update_status, "Checking if function is ready"); match last_update_status { LastUpdateStatus::Successful => { // continue } LastUpdateStatus::Failed | LastUpdateStatus::InProgress => { return Ok(false); } unknown => { warn!( status_variant = unknown.as_str(), "LastUpdateStatus unknown" ); return Err(anyhow!( "Unknown LastUpdateStatus, fn config is {config:?}" )); } } } None => { warn!("Missing last update status"); return Ok(false); } }; if expected_code_sha256.is_none() { return Ok(true); } if let Some(code_sha256) = config.code_sha256() { return Ok(code_sha256 == expected_code_sha256.unwrap_or_default()); } } } Err(e) => { warn!(?e, "Could not get function while waiting"); } } Ok(false) } /** Get the Lambda function with this Manager's name. */ pub async fn get_function(&self) -> Result<GetFunctionOutput, anyhow::Error> { info!("Getting lambda function"); self.lambda_client .get_function() .function_name(self.lambda_name.clone()) .send() .await .map_err(anyhow::Error::from) } /** List all Lambda functions in the current Region. */ pub async fn list_functions(&self) -> Result<ListFunctionsOutput, anyhow::Error> { info!("Listing lambda functions"); self.lambda_client .list_functions() .send() .await .map_err(anyhow::Error::from) } /** Invoke the lambda function using calculator InvokeArgs. */ pub async fn invoke(&self, args: InvokeArgs) -> Result<InvokeOutput, anyhow::Error> { info!(?args, "Invoking {}", self.lambda_name); let payload = serde_json::to_string(&args)?; debug!(?payload, "Sending payload"); self.lambda_client .invoke() .function_name(self.lambda_name.clone()) .payload(Blob::new(payload)) .send() .await .map_err(anyhow::Error::from) } /** Given a Path to a zip file, update the function's code and wait for the update to finish. */ pub async fn update_function_code( &self, zip_file: PathBuf, key: String, ) -> Result<UpdateFunctionCodeOutput, anyhow::Error> { let function_code = self.prepare_function(zip_file, Some(key)).await?; info!("Updating code for {}", self.lambda_name); let update = self .lambda_client .update_function_code() .function_name(self.lambda_name.clone()) .s3_bucket(self.bucket.clone()) .s3_key(function_code.s3_key().unwrap().to_string()) .send() .await .map_err(anyhow::Error::from)?; self.wait_for_function_ready().await?; Ok(update) } /** Update the environment for a function. */ pub async fn update_function_configuration( &self, environment: Environment, ) -> Result<UpdateFunctionConfigurationOutput, anyhow::Error> { info!( ?environment, "Updating environment for {}", self.lambda_name ); let updated = self .lambda_client .update_function_configuration() .function_name(self.lambda_name.clone()) .environment(environment) .send() .await .map_err(anyhow::Error::from)?; self.wait_for_function_ready().await?; Ok(updated) } /** Delete a function and its role, and if possible or necessary, its associated code object and bucket. */ pub async fn delete_function( &self, location: Option<String>, ) -> ( Result<DeleteFunctionOutput, anyhow::Error>, Result<DeleteRoleOutput, anyhow::Error>, Option<Result<DeleteObjectOutput, anyhow::Error>>, ) { info!("Deleting lambda function {}", self.lambda_name); let delete_function = self .lambda_client .delete_function() .function_name(self.lambda_name.clone()) .send() .await .map_err(anyhow::Error::from); info!("Deleting iam role {}", self.role_name); let delete_role = self .iam_client .delete_role() .role_name(self.role_name.clone()) .send() .await .map_err(anyhow::Error::from); let delete_object: Option<Result<DeleteObjectOutput, anyhow::Error>> = if let Some(location) = location { info!("Deleting object {location}"); Some( self.s3_client .delete_object() .bucket(self.bucket.clone()) .key(location) .send() .await .map_err(anyhow::Error::from), ) } else { info!(?location, "Skipping delete object"); None }; (delete_function, delete_role, delete_object) } pub async fn cleanup( &self, location: Option<String>, ) -> ( ( Result<DeleteFunctionOutput, anyhow::Error>, Result<DeleteRoleOutput, anyhow::Error>, Option<Result<DeleteObjectOutput, anyhow::Error>>, ), Option<Result<DeleteBucketOutput, anyhow::Error>>, ) { let delete_function = self.delete_function(location).await; let delete_bucket = if self.own_bucket { info!("Deleting bucket {}", self.bucket); if delete_function.2.is_none() || delete_function.2.as_ref().unwrap().is_ok() { Some( self.s3_client .delete_bucket() .bucket(self.bucket.clone()) .send() .await .map_err(anyhow::Error::from), ) } else { None } } else { info!("No bucket to clean up"); None }; (delete_function, delete_bucket) } } /** * Testing occurs primarily as an integration test running the `scenario` bin successfully. * Each action relies deeply on the internal workings and state of Amazon Simple Storage Service (Amazon S3), Lambda, and IAM working together. * It is therefore infeasible to mock the clients to test the individual actions. */ #[cfg(test)] mod test { use super::{InvokeArgs, Operation}; use serde_json::json; /** Make sure that the JSON output of serializing InvokeArgs is what's expected by the calculator. */ #[test] fn test_serialize() { assert_eq!(json!(InvokeArgs::Increment(5)), 5); assert_eq!( json!(InvokeArgs::Arithmetic(Operation::Plus, 5, 7)).to_string(), r#"{"op":"plus","i":5,"j":7}"#.to_string(), ); } }

一个从头到尾运行场景的二进制文件,使用命令行标志来控制某些行为。这个文件是箱子里的 src/bin/scenario .rs。

/* ## Service actions Service actions wrap the SDK call, taking a client and any specific parameters necessary for the call. * CreateFunction * GetFunction * ListFunctions * Invoke * UpdateFunctionCode * UpdateFunctionConfiguration * DeleteFunction ## Scenario A scenario runs at a command prompt and prints output to the user on the result of each service action. A scenario can run in one of two ways: straight through, printing out progress as it goes, or as an interactive question/answer script. ## Getting started with functions Use an SDK to manage AWS Lambda functions: create a function, invoke it, update its code, invoke it again, view its output and logs, and delete it. This scenario uses two Lambda handlers: _Note: Handlers don't use AWS SDK API calls._ The increment handler is straightforward: 1. It accepts a number, increments it, and returns the new value. 2. It performs simple logging of the result. The arithmetic handler is more complex: 1. It accepts a set of actions ['plus', 'minus', 'times', 'divided-by'] and two numbers, and returns the result of the calculation. 2. It uses an environment variable to control log level (such as DEBUG, INFO, WARNING, ERROR). It logs a few things at different levels, such as: * DEBUG: Full event data. * INFO: The calculation result. * WARN~ING~: When a divide by zero error occurs. * This will be the typical `RUST_LOG` variable. The steps of the scenario are: 1. Create an AWS Identity and Access Management (IAM) role that meets the following requirements: * Has an assume_role policy that grants 'lambda.amazonaws.com' the 'sts:AssumeRole' action. * Attaches the 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole' managed role. * _You must wait for ~10 seconds after the role is created before you can use it!_ 2. Create a function (CreateFunction) for the increment handler by packaging it as a zip and doing one of the following: * Adding it with CreateFunction Code.ZipFile. * --or-- * Uploading it to Amazon Simple Storage Service (Amazon S3) and adding it with CreateFunction Code.S3Bucket/S3Key. * _Note: Zipping the file does not have to be done in code._ * If you have a waiter, use it to wait until the function is active. Otherwise, call GetFunction until State is Active. 3. Invoke the function with a number and print the result. 4. Update the function (UpdateFunctionCode) to the arithmetic handler by packaging it as a zip and doing one of the following: * Adding it with UpdateFunctionCode ZipFile. * --or-- * Uploading it to Amazon S3 and adding it with UpdateFunctionCode S3Bucket/S3Key. 5. Call GetFunction until Configuration.LastUpdateStatus is 'Successful' (or 'Failed'). 6. Update the environment variable by calling UpdateFunctionConfiguration and pass it a log level, such as: * Environment={'Variables': {'RUST_LOG': 'TRACE'}} 7. Invoke the function with an action from the list and a couple of values. Include LogType='Tail' to get logs in the result. Print the result of the calculation and the log. 8. [Optional] Invoke the function to provoke a divide-by-zero error and show the log result. 9. List all functions for the account, using pagination (ListFunctions). 10. Delete the function (DeleteFunction). 11. Delete the role. Each step should use the function created in Service Actions to abstract calling the SDK. */ use aws_sdk_lambda::{operation::invoke::InvokeOutput, types::Environment}; use clap::Parser; use std::{collections::HashMap, path::PathBuf}; use tracing::{debug, info, warn}; use tracing_subscriber::EnvFilter; use lambda_code_examples::actions::{ InvokeArgs::{Arithmetic, Increment}, LambdaManager, Operation, }; #[derive(Debug, Parser)] pub struct Opt { /// The AWS Region. #[structopt(short, long)] pub region: Option<String>, // The bucket to use for the FunctionCode. #[structopt(short, long)] pub bucket: Option<String>, // The name of the Lambda function. #[structopt(short, long)] pub lambda_name: Option<String>, // The number to increment. #[structopt(short, long, default_value = "12")] pub inc: i32, // The left operand. #[structopt(long, default_value = "19")] pub num_a: i32, // The right operand. #[structopt(long, default_value = "23")] pub num_b: i32, // The arithmetic operation. #[structopt(short, long, default_value = "plus")] pub operation: Operation, #[structopt(long)] pub cleanup: Option<bool>, #[structopt(long)] pub no_cleanup: Option<bool>, } fn code_path(lambda: &str) -> PathBuf { PathBuf::from(format!("../target/lambda/{lambda}/bootstrap.zip")) } fn log_invoke_output(invoke: &InvokeOutput, message: &str) { if let Some(payload) = invoke.payload().cloned() { let payload = String::from_utf8(payload.into_inner()); info!(?payload, message); } else { info!("Could not extract payload") } if let Some(logs) = invoke.log_result() { debug!(?logs, "Invoked function logs") } else { debug!("Invoked function had no logs") } } async fn main_block( opt: &Opt, manager: &LambdaManager, code_location: String, ) -> Result<(), anyhow::Error> { let invoke = manager.invoke(Increment(opt.inc)).await?; log_invoke_output(&invoke, "Invoked function configured as increment"); let update_code = manager .update_function_code(code_path("arithmetic"), code_location.clone()) .await?; let code_sha256 = update_code.code_sha256().unwrap_or("Unknown SHA"); info!(?code_sha256, "Updated function code with arithmetic.zip"); let arithmetic_args = Arithmetic(opt.operation, opt.num_a, opt.num_b); let invoke = manager.invoke(arithmetic_args).await?; log_invoke_output(&invoke, "Invoked function configured as arithmetic"); let update = manager .update_function_configuration( Environment::builder() .set_variables(Some(HashMap::from([( "RUST_LOG".to_string(), "trace".to_string(), )]))) .build(), ) .await?; let updated_environment = update.environment(); info!(?updated_environment, "Updated function configuration"); let invoke = manager .invoke(Arithmetic(opt.operation, opt.num_a, opt.num_b)) .await?; log_invoke_output( &invoke, "Invoked function configured as arithmetic with increased logging", ); let invoke = manager .invoke(Arithmetic(Operation::DividedBy, opt.num_a, 0)) .await?; log_invoke_output( &invoke, "Invoked function configured as arithmetic with divide by zero", ); Ok::<(), anyhow::Error>(()) } #[tokio::main] async fn main() { tracing_subscriber::fmt() .without_time() .with_file(true) .with_line_number(true) .with_env_filter(EnvFilter::from_default_env()) .init(); let opt = Opt::parse(); let manager = LambdaManager::load_from_env(opt.lambda_name.clone(), opt.bucket.clone()).await; let key = match manager.create_function(code_path("increment")).await { Ok(init) => { info!(?init, "Created function, initially with increment.zip"); let run_block = main_block(&opt, &manager, init.clone()).await; info!(?run_block, "Finished running example, cleaning up"); Some(init) } Err(err) => { warn!(?err, "Error happened when initializing function"); None } }; if Some(false) == opt.cleanup || Some(true) == opt.no_cleanup { info!("Skipping cleanup") } else { let delete = manager.cleanup(key).await; info!(?delete, "Deleted function & cleaned up resources"); } }

操作

以下代码示例演示如何使用 CreateFunction

SDK对于 Rust
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

/** * Create a function, uploading from a zip file. */ pub async fn create_function(&self, zip_file: PathBuf) -> Result<String, anyhow::Error> { let code = self.prepare_function(zip_file, None).await?; let key = code.s3_key().unwrap().to_string(); let role = self.create_role().await.map_err(|e| anyhow!(e))?; info!("Created iam role, waiting 15s for it to become active"); tokio::time::sleep(Duration::from_secs(15)).await; info!("Creating lambda function {}", self.lambda_name); let _ = self .lambda_client .create_function() .function_name(self.lambda_name.clone()) .code(code) .role(role.arn()) .runtime(aws_sdk_lambda::types::Runtime::Providedal2) .handler("_unused") .send() .await .map_err(anyhow::Error::from)?; self.wait_for_function_ready().await?; self.lambda_client .publish_version() .function_name(self.lambda_name.clone()) .send() .await?; Ok(key) } /** * Upload function code from a path to a zip file. * The zip file must have an AL2 Linux-compatible binary called `bootstrap`. * The easiest way to create such a zip is to use `cargo lambda build --output-format Zip`. */ async fn prepare_function( &self, zip_file: PathBuf, key: Option<String>, ) -> Result<FunctionCode, anyhow::Error> { let body = ByteStream::from_path(zip_file).await?; let key = key.unwrap_or_else(|| format!("{}_code", self.lambda_name)); info!("Uploading function code to s3://{}/{}", self.bucket, key); let _ = self .s3_client .put_object() .bucket(self.bucket.clone()) .key(key.clone()) .body(body) .send() .await?; Ok(FunctionCode::builder() .s3_bucket(self.bucket.clone()) .s3_key(key) .build()) }
  • 有关API详细信息,请参见CreateFunction中的 Rust AWS SDK API 参考

以下代码示例演示如何使用 DeleteFunction

SDK对于 Rust
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

/** Delete a function and its role, and if possible or necessary, its associated code object and bucket. */ pub async fn delete_function( &self, location: Option<String>, ) -> ( Result<DeleteFunctionOutput, anyhow::Error>, Result<DeleteRoleOutput, anyhow::Error>, Option<Result<DeleteObjectOutput, anyhow::Error>>, ) { info!("Deleting lambda function {}", self.lambda_name); let delete_function = self .lambda_client .delete_function() .function_name(self.lambda_name.clone()) .send() .await .map_err(anyhow::Error::from); info!("Deleting iam role {}", self.role_name); let delete_role = self .iam_client .delete_role() .role_name(self.role_name.clone()) .send() .await .map_err(anyhow::Error::from); let delete_object: Option<Result<DeleteObjectOutput, anyhow::Error>> = if let Some(location) = location { info!("Deleting object {location}"); Some( self.s3_client .delete_object() .bucket(self.bucket.clone()) .key(location) .send() .await .map_err(anyhow::Error::from), ) } else { info!(?location, "Skipping delete object"); None }; (delete_function, delete_role, delete_object) }
  • 有关API详细信息,请参见DeleteFunction中的 Rust AWS SDK API 参考

以下代码示例演示如何使用 GetFunction

SDK对于 Rust
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

/** Get the Lambda function with this Manager's name. */ pub async fn get_function(&self) -> Result<GetFunctionOutput, anyhow::Error> { info!("Getting lambda function"); self.lambda_client .get_function() .function_name(self.lambda_name.clone()) .send() .await .map_err(anyhow::Error::from) }
  • 有关API详细信息,请参见GetFunction中的 Rust AWS SDK API 参考

以下代码示例演示如何使用 Invoke

SDK对于 Rust
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

/** Invoke the lambda function using calculator InvokeArgs. */ pub async fn invoke(&self, args: InvokeArgs) -> Result<InvokeOutput, anyhow::Error> { info!(?args, "Invoking {}", self.lambda_name); let payload = serde_json::to_string(&args)?; debug!(?payload, "Sending payload"); self.lambda_client .invoke() .function_name(self.lambda_name.clone()) .payload(Blob::new(payload)) .send() .await .map_err(anyhow::Error::from) } fn log_invoke_output(invoke: &InvokeOutput, message: &str) { if let Some(payload) = invoke.payload().cloned() { let payload = String::from_utf8(payload.into_inner()); info!(?payload, message); } else { info!("Could not extract payload") } if let Some(logs) = invoke.log_result() { debug!(?logs, "Invoked function logs") } else { debug!("Invoked function had no logs") } }
  • 有关API详细信息,请参见调用AWSSDK以获取 Rust API 参考

以下代码示例演示如何使用 ListFunctions

SDK对于 Rust
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

/** List all Lambda functions in the current Region. */ pub async fn list_functions(&self) -> Result<ListFunctionsOutput, anyhow::Error> { info!("Listing lambda functions"); self.lambda_client .list_functions() .send() .await .map_err(anyhow::Error::from) }
  • 有关API详细信息,请参见ListFunctions中的 Rust AWS SDK API 参考

以下代码示例演示如何使用 UpdateFunctionCode

SDK对于 Rust
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

/** Given a Path to a zip file, update the function's code and wait for the update to finish. */ pub async fn update_function_code( &self, zip_file: PathBuf, key: String, ) -> Result<UpdateFunctionCodeOutput, anyhow::Error> { let function_code = self.prepare_function(zip_file, Some(key)).await?; info!("Updating code for {}", self.lambda_name); let update = self .lambda_client .update_function_code() .function_name(self.lambda_name.clone()) .s3_bucket(self.bucket.clone()) .s3_key(function_code.s3_key().unwrap().to_string()) .send() .await .map_err(anyhow::Error::from)?; self.wait_for_function_ready().await?; Ok(update) } /** * Upload function code from a path to a zip file. * The zip file must have an AL2 Linux-compatible binary called `bootstrap`. * The easiest way to create such a zip is to use `cargo lambda build --output-format Zip`. */ async fn prepare_function( &self, zip_file: PathBuf, key: Option<String>, ) -> Result<FunctionCode, anyhow::Error> { let body = ByteStream::from_path(zip_file).await?; let key = key.unwrap_or_else(|| format!("{}_code", self.lambda_name)); info!("Uploading function code to s3://{}/{}", self.bucket, key); let _ = self .s3_client .put_object() .bucket(self.bucket.clone()) .key(key.clone()) .body(body) .send() .await?; Ok(FunctionCode::builder() .s3_bucket(self.bucket.clone()) .s3_key(key) .build()) }

以下代码示例演示如何使用 UpdateFunctionConfiguration

SDK对于 Rust
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

/** Update the environment for a function. */ pub async fn update_function_configuration( &self, environment: Environment, ) -> Result<UpdateFunctionConfigurationOutput, anyhow::Error> { info!( ?environment, "Updating environment for {}", self.lambda_name ); let updated = self .lambda_client .update_function_configuration() .function_name(self.lambda_name.clone()) .environment(environment) .send() .await .map_err(anyhow::Error::from)?; self.wait_for_function_ready().await?; Ok(updated) }

场景

以下代码示例演示如何创建无服务器应用程序,让用户能够使用标签管理照片。

SDK对于 Rust

演示如何开发照片资产管理应用程序,该应用程序使用 Amazon Rekognition 检测图像中的标签并将其存储以供日后检索。

有关如何设置和运行的完整源代码和说明,请参阅上的完整示例 GitHub

要深入了解这个例子的起源,请参阅 AWS 社区上的博文。

本示例中使用的服务
  • API网关

  • DynamoDB

  • Lambda

  • Amazon Rekognition

  • Amazon S3

  • Amazon SNS

无服务器示例

以下代码示例说明如何实现连接到数据库的 Lambda 函数。RDS该函数发出一个简单的数据库请求并返回结果。

SDK对于 Rust
注意

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Rust 在 Lambda 函数中连接到亚马逊RDS数据库。

use aws_config::BehaviorVersion; use aws_credential_types::provider::ProvideCredentials; use aws_sigv4::{ http_request::{sign, SignableBody, SignableRequest, SigningSettings}, sign::v4, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; use serde_json::{json, Value}; use sqlx::postgres::PgConnectOptions; use std::env; use std::time::{Duration, SystemTime}; const RDS_CERTS: &[u8] = include_bytes!("global-bundle.pem"); async fn generate_rds_iam_token( db_hostname: &str, port: u16, db_username: &str, ) -> Result<String, Error> { let config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await; let credentials = config .credentials_provider() .expect("no credentials provider found") .provide_credentials() .await .expect("unable to load credentials"); let identity = credentials.into(); let region = config.region().unwrap().to_string(); let mut signing_settings = SigningSettings::default(); signing_settings.expires_in = Some(Duration::from_secs(900)); signing_settings.signature_location = aws_sigv4::http_request::SignatureLocation::QueryParams; let signing_params = v4::SigningParams::builder() .identity(&identity) .region(&region) .name("rds-db") .time(SystemTime::now()) .settings(signing_settings) .build()?; let url = format!( "https://{db_hostname}:{port}/?Action=connect&DBUser={db_user}", db_hostname = db_hostname, port = port, db_user = db_username ); let signable_request = SignableRequest::new("GET", &url, std::iter::empty(), SignableBody::Bytes(&[])) .expect("signable request"); let (signing_instructions, _signature) = sign(signable_request, &signing_params.into())?.into_parts(); let mut url = url::Url::parse(&url).unwrap(); for (name, value) in signing_instructions.params() { url.query_pairs_mut().append_pair(name, &value); } let response = url.to_string().split_off("https://".len()); Ok(response) } #[tokio::main] async fn main() -> Result<(), Error> { run(service_fn(handler)).await } async fn handler(_event: LambdaEvent<Value>) -> Result<Value, Error> { let db_host = env::var("DB_HOSTNAME").expect("DB_HOSTNAME must be set"); let db_port = env::var("DB_PORT") .expect("DB_PORT must be set") .parse::<u16>() .expect("PORT must be a valid number"); let db_name = env::var("DB_NAME").expect("DB_NAME must be set"); let db_user_name = env::var("DB_USERNAME").expect("DB_USERNAME must be set"); let token = generate_rds_iam_token(&db_host, db_port, &db_user_name).await?; let opts = PgConnectOptions::new() .host(&db_host) .port(db_port) .username(&db_user_name) .password(&token) .database(&db_name) .ssl_root_cert_from_pem(RDS_CERTS.to_vec()) .ssl_mode(sqlx::postgres::PgSslMode::Require); let pool = sqlx::postgres::PgPoolOptions::new() .connect_with(opts) .await?; let result: i32 = sqlx::query_scalar("SELECT $1 + $2") .bind(3) .bind(2) .fetch_one(&pool) .await?; println!("Result: {:?}", result); Ok(json!({ "statusCode": 200, "content-type": "text/plain", "body": format!("The selected sum is: {result}") })) }

以下代码示例展示了如何实现一个 Lambda 函数,该函数接收因接收来自 Kinesis 流的记录而触发的事件。该函数检索 Kinesis 有效负载,将 Base64 解码,并记录下记录内容。

SDK对于 Rust
注意

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

通过 Rust 将 Kinesis 事件与 Lambda 结合使用。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::event::kinesis::KinesisEvent; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> { if event.payload.records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(()); } event.payload.records.iter().for_each(|record| { tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default()); let record_data = std::str::from_utf8(&record.kinesis.data); match record_data { Ok(data) => { // log the record data tracing::info!("Data: {}", data); } Err(e) => { tracing::error!("Error: {}", e); } } }); tracing::info!( "Successfully processed {} records", event.payload.records.len() ); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }

以下代码示例演示如何实现 Lambda 函数,该函数接收通过从 DynamoDB 流接收记录而触发的事件。该函数检索 DynamoDB 有效负载,并记录下记录内容。

SDK对于 Rust
注意

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Rust 将 DynamoDB 事件与 Lambda 结合使用。

use lambda_runtime::{service_fn, tracing, Error, LambdaEvent}; use aws_lambda_events::{ event::dynamodb::{Event, EventRecord}, }; // Built with the following dependencies: //lambda_runtime = "0.11.1" //serde_json = "1.0" //tokio = { version = "1", features = ["macros"] } //tracing = { version = "0.1", features = ["log"] } //tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] } //aws_lambda_events = "0.15.0" async fn function_handler(event: LambdaEvent<Event>) ->Result<(), Error> { let records = &event.payload.records; tracing::info!("event payload: {:?}",records); if records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(()); } for record in records{ log_dynamo_dbrecord(record); } tracing::info!("Dynamo db records processed"); // Prepare the response Ok(()) } fn log_dynamo_dbrecord(record: &EventRecord)-> Result<(), Error>{ tracing::info!("EventId: {}", record.event_id); tracing::info!("EventName: {}", record.event_name); tracing::info!("DynamoDB Record: {:?}", record.change ); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) .with_target(false) .without_time() .init(); let func = service_fn(function_handler); lambda_runtime::run(func).await?; Ok(()) }

以下代码示例说明如何实现一个 Lambda 函数,该函数接收通过从 DocumentDB 更改流接收记录而触发的事件。该函数检索 DocumentDB 有效负载,并记录下记录内容。

SDK对于 Rust
注意

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Rust 将 Amazon DocumentDB 事件与 Lambda 结合使用。

use lambda_runtime::{service_fn, tracing, Error, LambdaEvent}; use aws_lambda_events::{ event::documentdb::{DocumentDbEvent, DocumentDbInnerEvent}, }; // Built with the following dependencies: //lambda_runtime = "0.11.1" //serde_json = "1.0" //tokio = { version = "1", features = ["macros"] } //tracing = { version = "0.1", features = ["log"] } //tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] } //aws_lambda_events = "0.15.0" async fn function_handler(event: LambdaEvent<DocumentDbEvent>) ->Result<(), Error> { tracing::info!("Event Source ARN: {:?}", event.payload.event_source_arn); tracing::info!("Event Source: {:?}", event.payload.event_source); let records = &event.payload.events; if records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(()); } for record in records{ log_document_db_event(record); } tracing::info!("Document db records processed"); // Prepare the response Ok(()) } fn log_document_db_event(record: &DocumentDbInnerEvent)-> Result<(), Error>{ tracing::info!("Change Event: {:?}", record.event); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) .with_target(false) .without_time() .init(); let func = service_fn(function_handler); lambda_runtime::run(func).await?; Ok(()) }

以下代码示例展示了如何实现一个 Lambda 函数,该函数接收通过将对象上传到 S3 桶而触发的事件。该函数从事件参数中检索 S3 存储桶名称和对象密钥,并调用 Amazon S3 API 来检索和记录对象的内容类型。

SDK对于 Rust
注意

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Rust 将 S3 事件与 Lambda 结合使用。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::event::s3::S3Event; use aws_sdk_s3::{Client}; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; /// Main function #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) .with_target(false) .without_time() .init(); // Initialize the AWS SDK for Rust let config = aws_config::load_from_env().await; let s3_client = Client::new(&config); let res = run(service_fn(|request: LambdaEvent<S3Event>| { function_handler(&s3_client, request) })).await; res } async fn function_handler( s3_client: &Client, evt: LambdaEvent<S3Event> ) -> Result<(), Error> { tracing::info!(records = ?evt.payload.records.len(), "Received request from SQS"); if evt.payload.records.len() == 0 { tracing::info!("Empty S3 event received"); } let bucket = evt.payload.records[0].s3.bucket.name.as_ref().expect("Bucket name to exist"); let key = evt.payload.records[0].s3.object.key.as_ref().expect("Object key to exist"); tracing::info!("Request is for {} and object {}", bucket, key); let s3_get_object_result = s3_client .get_object() .bucket(bucket) .key(key) .send() .await; match s3_get_object_result { Ok(_) => tracing::info!("S3 Get Object success, the s3GetObjectResult contains a 'body' property of type ByteStream"), Err(_) => tracing::info!("Failure with S3 Get Object request") } Ok(()) }

以下代码示例说明如何实现一个 Lambda 函数,该函数接收通过接收来自主题的消息而触发的事件。SNS该函数从事件参数检索消息并记录每条消息的内容。

SDK对于 Rust
注意

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Rust 在 Lambda 上使用SNS事件。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::event::sns::SnsEvent; use aws_lambda_events::sns::SnsRecord; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; use tracing::info; // Built with the following dependencies: // aws_lambda_events = { version = "0.10.0", default-features = false, features = ["sns"] } // lambda_runtime = "0.8.1" // tokio = { version = "1", features = ["macros"] } // tracing = { version = "0.1", features = ["log"] } // tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] } async fn function_handler(event: LambdaEvent<SnsEvent>) -> Result<(), Error> { for event in event.payload.records { process_record(&event)?; } Ok(()) } fn process_record(record: &SnsRecord) -> Result<(), Error> { info!("Processing SNS Message: {}", record.sns.message); // Implement your record handling code here. Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) .with_target(false) .without_time() .init(); run(service_fn(function_handler)).await }

以下代码示例说明如何实现一个 Lambda 函数,该函数接收通过从队列接收消息而触发的事件。SQS该函数从事件参数检索消息并记录每条消息的内容。

SDK对于 Rust
注意

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Rust 在 Lambda 上使用SQS事件。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::event::sqs::SqsEvent; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> { event.payload.records.iter().for_each(|record| { // process the record tracing::info!("Message body: {}", record.body.as_deref().unwrap_or_default()) }); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }

以下代码示例展示了如何为接收来自 Kinesis 流的事件的 Lambda 函数实现部分批处理响应。该函数在响应中报告批处理项目失败,并指示 Lambda 稍后重试这些消息。

SDK对于 Rust
注意

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

报告通过 Rust 进行 Lambda Kinesis 批处理项目失败。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::{ event::kinesis::KinesisEvent, kinesis::KinesisEventRecord, streams::{KinesisBatchItemFailure, KinesisEventResponse}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<KinesisEventResponse, Error> { let mut response = KinesisEventResponse { batch_item_failures: vec![], }; if event.payload.records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(response); } for record in &event.payload.records { tracing::info!( "EventId: {}", record.event_id.as_deref().unwrap_or_default() ); let record_processing_result = process_record(record); if record_processing_result.is_err() { response.batch_item_failures.push(KinesisBatchItemFailure { item_identifier: record.kinesis.sequence_number.clone(), }); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return Ok(response); } } tracing::info!( "Successfully processed {} records", event.payload.records.len() ); Ok(response) } fn process_record(record: &KinesisEventRecord) -> Result<(), Error> { let record_data = std::str::from_utf8(record.kinesis.data.as_slice()); if let Some(err) = record_data.err() { tracing::error!("Error: {}", err); return Err(Error::from(err)); } let record_data = record_data.unwrap_or_default(); // do something interesting with the data tracing::info!("Data: {}", record_data); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }

以下代码示例演示如何为接收来自 DynamoDB 流的事件的 Lambda 函数实现部分批量响应。该函数在响应中报告批处理项目失败,并指示 Lambda 稍后重试这些消息。

SDK对于 Rust
注意

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

报告使用 Rust 通过 Lambda 进行 DynamoDB 批处理项目失败。

use aws_lambda_events::{ event::dynamodb::{Event, EventRecord, StreamRecord}, streams::{DynamoDbBatchItemFailure, DynamoDbEventResponse}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; /// Process the stream record fn process_record(record: &EventRecord) -> Result<(), Error> { let stream_record: &StreamRecord = &record.change; // process your stream record here... tracing::info!("Data: {:?}", stream_record); Ok(()) } /// Main Lambda handler here... async fn function_handler(event: LambdaEvent<Event>) -> Result<DynamoDbEventResponse, Error> { let mut response = DynamoDbEventResponse { batch_item_failures: vec![], }; let records = &event.payload.records; if records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(response); } for record in records { tracing::info!("EventId: {}", record.event_id); // Couldn't find a sequence number if record.change.sequence_number.is_none() { response.batch_item_failures.push(DynamoDbBatchItemFailure { item_identifier: Some("".to_string()), }); return Ok(response); } // Process your record here... if process_record(record).is_err() { response.batch_item_failures.push(DynamoDbBatchItemFailure { item_identifier: record.change.sequence_number.clone(), }); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return Ok(response); } } tracing::info!("Successfully processed {} record(s)", records.len()); Ok(response) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }

以下代码示例说明如何为从队列接收事件的 Lambda 函数实现部分批量响应。SQS该函数在响应中报告批处理项目失败,并指示 Lambda 稍后重试这些消息。

SDK对于 Rust
注意

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Rust 使用 Lambda 报告SQS批处理项目失败。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::{ event::sqs::{SqsBatchResponse, SqsEvent}, sqs::{BatchItemFailure, SqsMessage}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn process_record(_: &SqsMessage) -> Result<(), Error> { Err(Error::from("Error processing message")) } async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<SqsBatchResponse, Error> { let mut batch_item_failures = Vec::new(); for record in event.payload.records { match process_record(&record).await { Ok(_) => (), Err(_) => batch_item_failures.push(BatchItemFailure { item_identifier: record.message_id.unwrap(), }), } } Ok(SqsBatchResponse { batch_item_failures, }) } #[tokio::main] async fn main() -> Result<(), Error> { run(service_fn(function_handler)).await }