Doc AWS SDK Examples GitHub リポジトリには、他にも SDK の例があります。 AWS
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
SDK for Rust を使用した Lambda の例
次のコード例は、Lambda で AWS SDK for Rust を使用してアクションを実行し、一般的なシナリオを実装する方法を示しています。
基本は、重要なオペレーションをサービス内で実行する方法を示すコード例です。
アクションはより大きなプログラムからのコードの抜粋であり、コンテキスト内で実行する必要があります。アクションは個々のサービス機能を呼び出す方法を示していますが、コンテキスト内のアクションは、関連するシナリオで確認できます。
「シナリオ」は、1 つのサービス内から、または他の AWS のサービスと組み合わせて複数の関数を呼び出し、特定のタスクを実行する方法を示すコード例です。
AWS コミュニティへの貢献は、複数のチームが作成し、維持している例です AWS。フィードバックを提供するには、リンクされたリポジトリで提供されているメカニズムを使用します。
各例には、完全なソースコードへのリンクが含まれており、そこからコードの設定方法と実行方法に関する手順を確認できます。
基本
次のコードサンプルは、以下の操作方法を示しています。
IAM ロールと Lambda 関数を作成し、ハンドラーコードをアップロードします。
1 つのパラメーターで関数を呼び出して、結果を取得します。
関数コードを更新し、環境変数で設定します。
新しいパラメーターで関数を呼び出して、結果を取得します。返された実行ログを表示します。
アカウントの関数を一覧表示し、リソースをクリーンアップします。
詳細については、「コンソールで Lambda 関数を作成する」を参照してください。
- SDK for Rust
-
注記
GitHub には、その他のリソースもあります。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 このシナリオで使用した依存関係を含む Cargo.toml。
[package] name = "lambda-code-examples" version = "0.1.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] aws-config = { version = "1.0.1", features = ["behavior-version-latest"] } aws-sdk-ec2 = { version = "1.3.0" } aws-sdk-iam = { version = "1.3.0" } aws-sdk-lambda = { version = "1.3.0" } aws-sdk-s3 = { version = "1.4.0" } aws-smithy-types = { version = "1.0.1" } aws-types = { version = "1.0.1" } clap = { version = "4.4", features = ["derive"] } tokio = { version = "1.20.1", features = ["full"] } tracing-subscriber = { version = "0.3.15", features = ["env-filter"] } tracing = "0.1.37" serde_json = "1.0.94" anyhow = "1.0.71" uuid = { version = "1.3.3", features = ["v4"] } lambda_runtime = "0.8.0" serde = "1.0.164"
このシナリオの Lambda 呼び出しを効率化するユーティリティのコレクション。このファイルはクレート内の src/ations.rs というファイルです。
use anyhow::anyhow; use aws_sdk_iam::operation::{create_role::CreateRoleError, delete_role::DeleteRoleOutput}; use aws_sdk_lambda::{ operation::{ delete_function::DeleteFunctionOutput, get_function::GetFunctionOutput, invoke::InvokeOutput, list_functions::ListFunctionsOutput, update_function_code::UpdateFunctionCodeOutput, update_function_configuration::UpdateFunctionConfigurationOutput, }, primitives::ByteStream, types::{Environment, FunctionCode, LastUpdateStatus, State}, }; use aws_sdk_s3::{ error::ErrorMetadata, operation::{delete_bucket::DeleteBucketOutput, delete_object::DeleteObjectOutput}, types::CreateBucketConfiguration, }; use aws_smithy_types::Blob; use serde::{ser::SerializeMap, Serialize}; use std::{fmt::Display, path::PathBuf, str::FromStr, time::Duration}; use tracing::{debug, info, warn}; /* Operation describes */ #[derive(Clone, Copy, Debug, Serialize)] pub enum Operation { #[serde(rename = "plus")] Plus, #[serde(rename = "minus")] Minus, #[serde(rename = "times")] Times, #[serde(rename = "divided-by")] DividedBy, } impl FromStr for Operation { type Err = anyhow::Error; fn from_str(s: &str) -> Result<Self, Self::Err> { match s { "plus" => Ok(Operation::Plus), "minus" => Ok(Operation::Minus), "times" => Ok(Operation::Times), "divided-by" => Ok(Operation::DividedBy), _ => Err(anyhow!("Unknown operation {s}")), } } } impl Display for Operation { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Operation::Plus => write!(f, "plus"), Operation::Minus => write!(f, "minus"), Operation::Times => write!(f, "times"), Operation::DividedBy => write!(f, "divided-by"), } } } /** * InvokeArgs will be serialized as JSON and sent to the AWS Lambda handler. */ #[derive(Debug)] pub enum InvokeArgs { Increment(i32), Arithmetic(Operation, i32, i32), } impl Serialize for InvokeArgs { fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: serde::Serializer, { match self { InvokeArgs::Increment(i) => serializer.serialize_i32(*i), InvokeArgs::Arithmetic(o, i, j) => { let mut map: S::SerializeMap = serializer.serialize_map(Some(3))?; map.serialize_key(&"op".to_string())?; map.serialize_value(&o.to_string())?; map.serialize_key(&"i".to_string())?; map.serialize_value(&i)?; map.serialize_key(&"j".to_string())?; map.serialize_value(&j)?; map.end() } } } } /** A policy document allowing Lambda to execute this function on the account's behalf. */ const ROLE_POLICY_DOCUMENT: &str = r#"{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }"#; /** * A LambdaManager gathers all the resources necessary to run the Lambda example scenario. * This includes instantiated aws_sdk clients and details of resource names. */ pub struct LambdaManager { iam_client: aws_sdk_iam::Client, lambda_client: aws_sdk_lambda::Client, s3_client: aws_sdk_s3::Client, lambda_name: String, role_name: String, bucket: String, own_bucket: bool, } // These unit type structs provide nominal typing on top of String parameters for LambdaManager::new pub struct LambdaName(pub String); pub struct RoleName(pub String); pub struct Bucket(pub String); pub struct OwnBucket(pub bool); impl LambdaManager { pub fn new( iam_client: aws_sdk_iam::Client, lambda_client: aws_sdk_lambda::Client, s3_client: aws_sdk_s3::Client, lambda_name: LambdaName, role_name: RoleName, bucket: Bucket, own_bucket: OwnBucket, ) -> Self { Self { iam_client, lambda_client, s3_client, lambda_name: lambda_name.0, role_name: role_name.0, bucket: bucket.0, own_bucket: own_bucket.0, } } /** * Load the AWS configuration from the environment. * Look up lambda_name and bucket if none are given, or generate a random name if not present in the environment. * If the bucket name is provided, the caller needs to have created the bucket. * If the bucket name is generated, it will be created. */ pub async fn load_from_env(lambda_name: Option<String>, bucket: Option<String>) -> Self { let sdk_config = aws_config::load_from_env().await; let lambda_name = LambdaName(lambda_name.unwrap_or_else(|| { std::env::var("LAMBDA_NAME").unwrap_or_else(|_| "rust_lambda_example".to_string()) })); let role_name = RoleName(format!("{}_role", lambda_name.0)); let (bucket, own_bucket) = match bucket { Some(bucket) => (Bucket(bucket), false), None => ( Bucket(std::env::var("LAMBDA_BUCKET").unwrap_or_else(|_| { format!("rust-lambda-example-{}", uuid::Uuid::new_v4()) })), true, ), }; let s3_client = aws_sdk_s3::Client::new(&sdk_config); if own_bucket { info!("Creating bucket for demo: {}", bucket.0); s3_client .create_bucket() .bucket(bucket.0.clone()) .create_bucket_configuration( CreateBucketConfiguration::builder() .location_constraint(aws_sdk_s3::types::BucketLocationConstraint::from( sdk_config.region().unwrap().as_ref(), )) .build(), ) .send() .await .unwrap(); } Self::new( aws_sdk_iam::Client::new(&sdk_config), aws_sdk_lambda::Client::new(&sdk_config), s3_client, lambda_name, role_name, bucket, OwnBucket(own_bucket), ) } /** * Upload function code from a path to a zip file. * The zip file must have an AL2 Linux-compatible binary called `bootstrap`. * The easiest way to create such a zip is to use `cargo lambda build --output-format Zip`. */ async fn prepare_function( &self, zip_file: PathBuf, key: Option<String>, ) -> Result<FunctionCode, anyhow::Error> { let body = ByteStream::from_path(zip_file).await?; let key = key.unwrap_or_else(|| format!("{}_code", self.lambda_name)); info!("Uploading function code to s3://{}/{}", self.bucket, key); let _ = self .s3_client .put_object() .bucket(self.bucket.clone()) .key(key.clone()) .body(body) .send() .await?; Ok(FunctionCode::builder() .s3_bucket(self.bucket.clone()) .s3_key(key) .build()) } /** * Create a function, uploading from a zip file. */ pub async fn create_function(&self, zip_file: PathBuf) -> Result<String, anyhow::Error> { let code = self.prepare_function(zip_file, None).await?; let key = code.s3_key().unwrap().to_string(); let role = self.create_role().await.map_err(|e| anyhow!(e))?; info!("Created iam role, waiting 15s for it to become active"); tokio::time::sleep(Duration::from_secs(15)).await; info!("Creating lambda function {}", self.lambda_name); let _ = self .lambda_client .create_function() .function_name(self.lambda_name.clone()) .code(code) .role(role.arn()) .runtime(aws_sdk_lambda::types::Runtime::Providedal2) .handler("_unused") .send() .await .map_err(anyhow::Error::from)?; self.wait_for_function_ready().await?; self.lambda_client .publish_version() .function_name(self.lambda_name.clone()) .send() .await?; Ok(key) } /** * Create an IAM execution role for the managed Lambda function. * If the role already exists, use that instead. */ async fn create_role(&self) -> Result<aws_sdk_iam::types::Role, CreateRoleError> { info!("Creating execution role for function"); let get_role = self .iam_client .get_role() .role_name(self.role_name.clone()) .send() .await; if let Ok(get_role) = get_role { if let Some(role) = get_role.role { return Ok(role); } } let create_role = self .iam_client .create_role() .role_name(self.role_name.clone()) .assume_role_policy_document(ROLE_POLICY_DOCUMENT) .send() .await; match create_role { Ok(create_role) => match create_role.role { Some(role) => Ok(role), None => Err(CreateRoleError::generic( ErrorMetadata::builder() .message("CreateRole returned empty success") .build(), )), }, Err(err) => Err(err.into_service_error()), } } /** * Poll `is_function_ready` with a 1-second delay. It returns when the function is ready or when there's an error checking the function's state. */ pub async fn wait_for_function_ready(&self) -> Result<(), anyhow::Error> { info!("Waiting for function"); while !self.is_function_ready(None).await? { info!("Function is not ready, sleeping 1s"); tokio::time::sleep(Duration::from_secs(1)).await; } Ok(()) } /** * Check if a Lambda function is ready to be invoked. * A Lambda function is ready for this scenario when its state is active and its LastUpdateStatus is Successful. * Additionally, if a sha256 is provided, the function must have that as its current code hash. * Any missing properties or failed requests will be reported as an Err. */ async fn is_function_ready( &self, expected_code_sha256: Option<&str>, ) -> Result<bool, anyhow::Error> { match self.get_function().await { Ok(func) => { if let Some(config) = func.configuration() { if let Some(state) = config.state() { info!(?state, "Checking if function is active"); if !matches!(state, State::Active) { return Ok(false); } } match config.last_update_status() { Some(last_update_status) => { info!(?last_update_status, "Checking if function is ready"); match last_update_status { LastUpdateStatus::Successful => { // continue } LastUpdateStatus::Failed | LastUpdateStatus::InProgress => { return Ok(false); } unknown => { warn!( status_variant = unknown.as_str(), "LastUpdateStatus unknown" ); return Err(anyhow!( "Unknown LastUpdateStatus, fn config is {config:?}" )); } } } None => { warn!("Missing last update status"); return Ok(false); } }; if expected_code_sha256.is_none() { return Ok(true); } if let Some(code_sha256) = config.code_sha256() { return Ok(code_sha256 == expected_code_sha256.unwrap_or_default()); } } } Err(e) => { warn!(?e, "Could not get function while waiting"); } } Ok(false) } /** Get the Lambda function with this Manager's name. */ pub async fn get_function(&self) -> Result<GetFunctionOutput, anyhow::Error> { info!("Getting lambda function"); self.lambda_client .get_function() .function_name(self.lambda_name.clone()) .send() .await .map_err(anyhow::Error::from) } /** List all Lambda functions in the current Region. */ pub async fn list_functions(&self) -> Result<ListFunctionsOutput, anyhow::Error> { info!("Listing lambda functions"); self.lambda_client .list_functions() .send() .await .map_err(anyhow::Error::from) } /** Invoke the lambda function using calculator InvokeArgs. */ pub async fn invoke(&self, args: InvokeArgs) -> Result<InvokeOutput, anyhow::Error> { info!(?args, "Invoking {}", self.lambda_name); let payload = serde_json::to_string(&args)?; debug!(?payload, "Sending payload"); self.lambda_client .invoke() .function_name(self.lambda_name.clone()) .payload(Blob::new(payload)) .send() .await .map_err(anyhow::Error::from) } /** Given a Path to a zip file, update the function's code and wait for the update to finish. */ pub async fn update_function_code( &self, zip_file: PathBuf, key: String, ) -> Result<UpdateFunctionCodeOutput, anyhow::Error> { let function_code = self.prepare_function(zip_file, Some(key)).await?; info!("Updating code for {}", self.lambda_name); let update = self .lambda_client .update_function_code() .function_name(self.lambda_name.clone()) .s3_bucket(self.bucket.clone()) .s3_key(function_code.s3_key().unwrap().to_string()) .send() .await .map_err(anyhow::Error::from)?; self.wait_for_function_ready().await?; Ok(update) } /** Update the environment for a function. */ pub async fn update_function_configuration( &self, environment: Environment, ) -> Result<UpdateFunctionConfigurationOutput, anyhow::Error> { info!( ?environment, "Updating environment for {}", self.lambda_name ); let updated = self .lambda_client .update_function_configuration() .function_name(self.lambda_name.clone()) .environment(environment) .send() .await .map_err(anyhow::Error::from)?; self.wait_for_function_ready().await?; Ok(updated) } /** Delete a function and its role, and if possible or necessary, its associated code object and bucket. */ pub async fn delete_function( &self, location: Option<String>, ) -> ( Result<DeleteFunctionOutput, anyhow::Error>, Result<DeleteRoleOutput, anyhow::Error>, Option<Result<DeleteObjectOutput, anyhow::Error>>, ) { info!("Deleting lambda function {}", self.lambda_name); let delete_function = self .lambda_client .delete_function() .function_name(self.lambda_name.clone()) .send() .await .map_err(anyhow::Error::from); info!("Deleting iam role {}", self.role_name); let delete_role = self .iam_client .delete_role() .role_name(self.role_name.clone()) .send() .await .map_err(anyhow::Error::from); let delete_object: Option<Result<DeleteObjectOutput, anyhow::Error>> = if let Some(location) = location { info!("Deleting object {location}"); Some( self.s3_client .delete_object() .bucket(self.bucket.clone()) .key(location) .send() .await .map_err(anyhow::Error::from), ) } else { info!(?location, "Skipping delete object"); None }; (delete_function, delete_role, delete_object) } pub async fn cleanup( &self, location: Option<String>, ) -> ( ( Result<DeleteFunctionOutput, anyhow::Error>, Result<DeleteRoleOutput, anyhow::Error>, Option<Result<DeleteObjectOutput, anyhow::Error>>, ), Option<Result<DeleteBucketOutput, anyhow::Error>>, ) { let delete_function = self.delete_function(location).await; let delete_bucket = if self.own_bucket { info!("Deleting bucket {}", self.bucket); if delete_function.2.is_none() || delete_function.2.as_ref().unwrap().is_ok() { Some( self.s3_client .delete_bucket() .bucket(self.bucket.clone()) .send() .await .map_err(anyhow::Error::from), ) } else { None } } else { info!("No bucket to clean up"); None }; (delete_function, delete_bucket) } } /** * Testing occurs primarily as an integration test running the `scenario` bin successfully. * Each action relies deeply on the internal workings and state of Amazon Simple Storage Service (Amazon S3), Lambda, and IAM working together. * It is therefore infeasible to mock the clients to test the individual actions. */ #[cfg(test)] mod test { use super::{InvokeArgs, Operation}; use serde_json::json; /** Make sure that the JSON output of serializing InvokeArgs is what's expected by the calculator. */ #[test] fn test_serialize() { assert_eq!(json!(InvokeArgs::Increment(5)), 5); assert_eq!( json!(InvokeArgs::Arithmetic(Operation::Plus, 5, 7)).to_string(), r#"{"op":"plus","i":5,"j":7}"#.to_string(), ); } }
コマンドラインフラグを使用して一部の動作を制御し、シナリオを最初から最後まで実行するためのバイナリ。このファイルはクレート内の src/bin/scenario.rs というファイルです。
/* ## Service actions Service actions wrap the SDK call, taking a client and any specific parameters necessary for the call. * CreateFunction * GetFunction * ListFunctions * Invoke * UpdateFunctionCode * UpdateFunctionConfiguration * DeleteFunction ## Scenario A scenario runs at a command prompt and prints output to the user on the result of each service action. A scenario can run in one of two ways: straight through, printing out progress as it goes, or as an interactive question/answer script. ## Getting started with functions Use an SDK to manage AWS Lambda functions: create a function, invoke it, update its code, invoke it again, view its output and logs, and delete it. This scenario uses two Lambda handlers: _Note: Handlers don't use AWS SDK API calls._ The increment handler is straightforward: 1. It accepts a number, increments it, and returns the new value. 2. It performs simple logging of the result. The arithmetic handler is more complex: 1. It accepts a set of actions ['plus', 'minus', 'times', 'divided-by'] and two numbers, and returns the result of the calculation. 2. It uses an environment variable to control log level (such as DEBUG, INFO, WARNING, ERROR). It logs a few things at different levels, such as: * DEBUG: Full event data. * INFO: The calculation result. * WARN~ING~: When a divide by zero error occurs. * This will be the typical `RUST_LOG` variable. The steps of the scenario are: 1. Create an AWS Identity and Access Management (IAM) role that meets the following requirements: * Has an assume_role policy that grants 'lambda.amazonaws.com' the 'sts:AssumeRole' action. * Attaches the 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole' managed role. * _You must wait for ~10 seconds after the role is created before you can use it!_ 2. Create a function (CreateFunction) for the increment handler by packaging it as a zip and doing one of the following: * Adding it with CreateFunction Code.ZipFile. * --or-- * Uploading it to Amazon Simple Storage Service (Amazon S3) and adding it with CreateFunction Code.S3Bucket/S3Key. * _Note: Zipping the file does not have to be done in code._ * If you have a waiter, use it to wait until the function is active. Otherwise, call GetFunction until State is Active. 3. Invoke the function with a number and print the result. 4. Update the function (UpdateFunctionCode) to the arithmetic handler by packaging it as a zip and doing one of the following: * Adding it with UpdateFunctionCode ZipFile. * --or-- * Uploading it to Amazon S3 and adding it with UpdateFunctionCode S3Bucket/S3Key. 5. Call GetFunction until Configuration.LastUpdateStatus is 'Successful' (or 'Failed'). 6. Update the environment variable by calling UpdateFunctionConfiguration and pass it a log level, such as: * Environment={'Variables': {'RUST_LOG': 'TRACE'}} 7. Invoke the function with an action from the list and a couple of values. Include LogType='Tail' to get logs in the result. Print the result of the calculation and the log. 8. [Optional] Invoke the function to provoke a divide-by-zero error and show the log result. 9. List all functions for the account, using pagination (ListFunctions). 10. Delete the function (DeleteFunction). 11. Delete the role. Each step should use the function created in Service Actions to abstract calling the SDK. */ use aws_sdk_lambda::{operation::invoke::InvokeOutput, types::Environment}; use clap::Parser; use std::{collections::HashMap, path::PathBuf}; use tracing::{debug, info, warn}; use tracing_subscriber::EnvFilter; use lambda_code_examples::actions::{ InvokeArgs::{Arithmetic, Increment}, LambdaManager, Operation, }; #[derive(Debug, Parser)] pub struct Opt { /// The AWS Region. #[structopt(short, long)] pub region: Option<String>, // The bucket to use for the FunctionCode. #[structopt(short, long)] pub bucket: Option<String>, // The name of the Lambda function. #[structopt(short, long)] pub lambda_name: Option<String>, // The number to increment. #[structopt(short, long, default_value = "12")] pub inc: i32, // The left operand. #[structopt(long, default_value = "19")] pub num_a: i32, // The right operand. #[structopt(long, default_value = "23")] pub num_b: i32, // The arithmetic operation. #[structopt(short, long, default_value = "plus")] pub operation: Operation, #[structopt(long)] pub cleanup: Option<bool>, #[structopt(long)] pub no_cleanup: Option<bool>, } fn code_path(lambda: &str) -> PathBuf { PathBuf::from(format!("../target/lambda/{lambda}/bootstrap.zip")) } fn log_invoke_output(invoke: &InvokeOutput, message: &str) { if let Some(payload) = invoke.payload().cloned() { let payload = String::from_utf8(payload.into_inner()); info!(?payload, message); } else { info!("Could not extract payload") } if let Some(logs) = invoke.log_result() { debug!(?logs, "Invoked function logs") } else { debug!("Invoked function had no logs") } } async fn main_block( opt: &Opt, manager: &LambdaManager, code_location: String, ) -> Result<(), anyhow::Error> { let invoke = manager.invoke(Increment(opt.inc)).await?; log_invoke_output(&invoke, "Invoked function configured as increment"); let update_code = manager .update_function_code(code_path("arithmetic"), code_location.clone()) .await?; let code_sha256 = update_code.code_sha256().unwrap_or("Unknown SHA"); info!(?code_sha256, "Updated function code with arithmetic.zip"); let arithmetic_args = Arithmetic(opt.operation, opt.num_a, opt.num_b); let invoke = manager.invoke(arithmetic_args).await?; log_invoke_output(&invoke, "Invoked function configured as arithmetic"); let update = manager .update_function_configuration( Environment::builder() .set_variables(Some(HashMap::from([( "RUST_LOG".to_string(), "trace".to_string(), )]))) .build(), ) .await?; let updated_environment = update.environment(); info!(?updated_environment, "Updated function configuration"); let invoke = manager .invoke(Arithmetic(opt.operation, opt.num_a, opt.num_b)) .await?; log_invoke_output( &invoke, "Invoked function configured as arithmetic with increased logging", ); let invoke = manager .invoke(Arithmetic(Operation::DividedBy, opt.num_a, 0)) .await?; log_invoke_output( &invoke, "Invoked function configured as arithmetic with divide by zero", ); Ok::<(), anyhow::Error>(()) } #[tokio::main] async fn main() { tracing_subscriber::fmt() .without_time() .with_file(true) .with_line_number(true) .with_env_filter(EnvFilter::from_default_env()) .init(); let opt = Opt::parse(); let manager = LambdaManager::load_from_env(opt.lambda_name.clone(), opt.bucket.clone()).await; let key = match manager.create_function(code_path("increment")).await { Ok(init) => { info!(?init, "Created function, initially with increment.zip"); let run_block = main_block(&opt, &manager, init.clone()).await; info!(?run_block, "Finished running example, cleaning up"); Some(init) } Err(err) => { warn!(?err, "Error happened when initializing function"); None } }; if Some(false) == opt.cleanup || Some(true) == opt.no_cleanup { info!("Skipping cleanup") } else { let delete = manager.cleanup(key).await; info!(?delete, "Deleted function & cleaned up resources"); } }
-
API の詳細については、「AWS SDK for Rust API リファレンス」の以下のトピックを参照してください。
-
アクション
次のコード例は、CreateFunction
を使用する方法を示しています。
- SDK for Rust
-
注記
GitHub には、その他のリソースもあります。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 /** * Create a function, uploading from a zip file. */ pub async fn create_function(&self, zip_file: PathBuf) -> Result<String, anyhow::Error> { let code = self.prepare_function(zip_file, None).await?; let key = code.s3_key().unwrap().to_string(); let role = self.create_role().await.map_err(|e| anyhow!(e))?; info!("Created iam role, waiting 15s for it to become active"); tokio::time::sleep(Duration::from_secs(15)).await; info!("Creating lambda function {}", self.lambda_name); let _ = self .lambda_client .create_function() .function_name(self.lambda_name.clone()) .code(code) .role(role.arn()) .runtime(aws_sdk_lambda::types::Runtime::Providedal2) .handler("_unused") .send() .await .map_err(anyhow::Error::from)?; self.wait_for_function_ready().await?; self.lambda_client .publish_version() .function_name(self.lambda_name.clone()) .send() .await?; Ok(key) } /** * Upload function code from a path to a zip file. * The zip file must have an AL2 Linux-compatible binary called `bootstrap`. * The easiest way to create such a zip is to use `cargo lambda build --output-format Zip`. */ async fn prepare_function( &self, zip_file: PathBuf, key: Option<String>, ) -> Result<FunctionCode, anyhow::Error> { let body = ByteStream::from_path(zip_file).await?; let key = key.unwrap_or_else(|| format!("{}_code", self.lambda_name)); info!("Uploading function code to s3://{}/{}", self.bucket, key); let _ = self .s3_client .put_object() .bucket(self.bucket.clone()) .key(key.clone()) .body(body) .send() .await?; Ok(FunctionCode::builder() .s3_bucket(self.bucket.clone()) .s3_key(key) .build()) }
-
API の詳細については、「AWS SDK for Rust API リファレンス」の「CreateFunction
」を参照してください。
-
次のコード例は、DeleteFunction
を使用する方法を示しています。
- SDK for Rust
-
注記
GitHub には、その他のリソースもあります。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 /** Delete a function and its role, and if possible or necessary, its associated code object and bucket. */ pub async fn delete_function( &self, location: Option<String>, ) -> ( Result<DeleteFunctionOutput, anyhow::Error>, Result<DeleteRoleOutput, anyhow::Error>, Option<Result<DeleteObjectOutput, anyhow::Error>>, ) { info!("Deleting lambda function {}", self.lambda_name); let delete_function = self .lambda_client .delete_function() .function_name(self.lambda_name.clone()) .send() .await .map_err(anyhow::Error::from); info!("Deleting iam role {}", self.role_name); let delete_role = self .iam_client .delete_role() .role_name(self.role_name.clone()) .send() .await .map_err(anyhow::Error::from); let delete_object: Option<Result<DeleteObjectOutput, anyhow::Error>> = if let Some(location) = location { info!("Deleting object {location}"); Some( self.s3_client .delete_object() .bucket(self.bucket.clone()) .key(location) .send() .await .map_err(anyhow::Error::from), ) } else { info!(?location, "Skipping delete object"); None }; (delete_function, delete_role, delete_object) }
-
API の詳細については、「AWS SDK for Rust API リファレンス」の「DeleteFunction
」を参照してください。
-
次のコード例は、GetFunction
を使用する方法を示しています。
- SDK for Rust
-
注記
GitHub には、その他のリソースもあります。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 /** Get the Lambda function with this Manager's name. */ pub async fn get_function(&self) -> Result<GetFunctionOutput, anyhow::Error> { info!("Getting lambda function"); self.lambda_client .get_function() .function_name(self.lambda_name.clone()) .send() .await .map_err(anyhow::Error::from) }
-
API の詳細については、「AWS SDK for Rust API リファレンス」の「GetFunction
」を参照してください。
-
次の例は、Invoke
を使用する方法を説明しています。
- SDK for Rust
-
注記
GitHub には、その他のリソースもあります。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 /** Invoke the lambda function using calculator InvokeArgs. */ pub async fn invoke(&self, args: InvokeArgs) -> Result<InvokeOutput, anyhow::Error> { info!(?args, "Invoking {}", self.lambda_name); let payload = serde_json::to_string(&args)?; debug!(?payload, "Sending payload"); self.lambda_client .invoke() .function_name(self.lambda_name.clone()) .payload(Blob::new(payload)) .send() .await .map_err(anyhow::Error::from) } fn log_invoke_output(invoke: &InvokeOutput, message: &str) { if let Some(payload) = invoke.payload().cloned() { let payload = String::from_utf8(payload.into_inner()); info!(?payload, message); } else { info!("Could not extract payload") } if let Some(logs) = invoke.log_result() { debug!(?logs, "Invoked function logs") } else { debug!("Invoked function had no logs") } }
-
API の詳細については、「AWS SDK for Rust API リファレンス」の「Invoke
」を参照してください。
-
次のコード例は、ListFunctions
を使用する方法を示しています。
- SDK for Rust
-
注記
GitHub には、その他のリソースもあります。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 /** List all Lambda functions in the current Region. */ pub async fn list_functions(&self) -> Result<ListFunctionsOutput, anyhow::Error> { info!("Listing lambda functions"); self.lambda_client .list_functions() .send() .await .map_err(anyhow::Error::from) }
-
API の詳細については、「AWS SDK for Rust API リファレンス」の「ListFunctions
」を参照してください。
-
次の例は、UpdateFunctionCode
を使用する方法を説明しています。
- SDK for Rust
-
注記
GitHub には、その他のリソースもあります。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 /** Given a Path to a zip file, update the function's code and wait for the update to finish. */ pub async fn update_function_code( &self, zip_file: PathBuf, key: String, ) -> Result<UpdateFunctionCodeOutput, anyhow::Error> { let function_code = self.prepare_function(zip_file, Some(key)).await?; info!("Updating code for {}", self.lambda_name); let update = self .lambda_client .update_function_code() .function_name(self.lambda_name.clone()) .s3_bucket(self.bucket.clone()) .s3_key(function_code.s3_key().unwrap().to_string()) .send() .await .map_err(anyhow::Error::from)?; self.wait_for_function_ready().await?; Ok(update) } /** * Upload function code from a path to a zip file. * The zip file must have an AL2 Linux-compatible binary called `bootstrap`. * The easiest way to create such a zip is to use `cargo lambda build --output-format Zip`. */ async fn prepare_function( &self, zip_file: PathBuf, key: Option<String>, ) -> Result<FunctionCode, anyhow::Error> { let body = ByteStream::from_path(zip_file).await?; let key = key.unwrap_or_else(|| format!("{}_code", self.lambda_name)); info!("Uploading function code to s3://{}/{}", self.bucket, key); let _ = self .s3_client .put_object() .bucket(self.bucket.clone()) .key(key.clone()) .body(body) .send() .await?; Ok(FunctionCode::builder() .s3_bucket(self.bucket.clone()) .s3_key(key) .build()) }
-
API の詳細については、「AWS SDK for Rust API リファレンス」の「UpdateFunctionCode
」を参照してください。
-
次のコード例は、UpdateFunctionConfiguration
を使用する方法を示しています。
- SDK for Rust
-
注記
GitHub には、その他のリソースもあります。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 /** Update the environment for a function. */ pub async fn update_function_configuration( &self, environment: Environment, ) -> Result<UpdateFunctionConfigurationOutput, anyhow::Error> { info!( ?environment, "Updating environment for {}", self.lambda_name ); let updated = self .lambda_client .update_function_configuration() .function_name(self.lambda_name.clone()) .environment(environment) .send() .await .map_err(anyhow::Error::from)?; self.wait_for_function_ready().await?; Ok(updated) }
-
API の詳細については、「AWS SDK for Rust API リファレンス」の「UpdateFunctionConfiguration
」を参照してください。
-
シナリオ
次のコード例では、ユーザーがラベルを使用して写真を管理できるサーバーレスアプリケーションを作成する方法について示しています。
- SDK for Rust
-
Amazon Rekognition を使用して画像内のラベルを検出し、保存して後で取得できるようにする写真アセット管理アプリケーションの開発方法を示します。
完全なソースコードと設定および実行の手順については、GitHub
で完全な例を参照してください。 この例のソースについて詳しくは、AWS コミュニティ
でブログ投稿を参照してください。 この例で使用されているサービス
API Gateway
DynamoDB
Lambda
Amazon Rekognition
Amazon S3
Amazon SNS
サーバーレスサンプル
次のコード例は、RDS データベースに接続する Lambda 関数を実装する方法を示しています。この関数は、シンプルなデータベースリクエストを実行し、結果を返します。
- SDK for Rust
-
注記
GitHub には、その他のリソースもあります。サーバーレスサンプル
リポジトリで完全な例を検索し、設定および実行の方法を確認してください。 Rust を使用した Lambda 関数での Amazon RDS データベースへの接続
use aws_config::BehaviorVersion; use aws_credential_types::provider::ProvideCredentials; use aws_sigv4::{ http_request::{sign, SignableBody, SignableRequest, SigningSettings}, sign::v4, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; use serde_json::{json, Value}; use sqlx::postgres::PgConnectOptions; use std::env; use std::time::{Duration, SystemTime}; const RDS_CERTS: &[u8] = include_bytes!("global-bundle.pem"); async fn generate_rds_iam_token( db_hostname: &str, port: u16, db_username: &str, ) -> Result<String, Error> { let config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await; let credentials = config .credentials_provider() .expect("no credentials provider found") .provide_credentials() .await .expect("unable to load credentials"); let identity = credentials.into(); let region = config.region().unwrap().to_string(); let mut signing_settings = SigningSettings::default(); signing_settings.expires_in = Some(Duration::from_secs(900)); signing_settings.signature_location = aws_sigv4::http_request::SignatureLocation::QueryParams; let signing_params = v4::SigningParams::builder() .identity(&identity) .region(®ion) .name("rds-db") .time(SystemTime::now()) .settings(signing_settings) .build()?; let url = format!( "https://{db_hostname}:{port}/?Action=connect&DBUser={db_user}", db_hostname = db_hostname, port = port, db_user = db_username ); let signable_request = SignableRequest::new("GET", &url, std::iter::empty(), SignableBody::Bytes(&[])) .expect("signable request"); let (signing_instructions, _signature) = sign(signable_request, &signing_params.into())?.into_parts(); let mut url = url::Url::parse(&url).unwrap(); for (name, value) in signing_instructions.params() { url.query_pairs_mut().append_pair(name, &value); } let response = url.to_string().split_off("https://".len()); Ok(response) } #[tokio::main] async fn main() -> Result<(), Error> { run(service_fn(handler)).await } async fn handler(_event: LambdaEvent<Value>) -> Result<Value, Error> { let db_host = env::var("DB_HOSTNAME").expect("DB_HOSTNAME must be set"); let db_port = env::var("DB_PORT") .expect("DB_PORT must be set") .parse::<u16>() .expect("PORT must be a valid number"); let db_name = env::var("DB_NAME").expect("DB_NAME must be set"); let db_user_name = env::var("DB_USERNAME").expect("DB_USERNAME must be set"); let token = generate_rds_iam_token(&db_host, db_port, &db_user_name).await?; let opts = PgConnectOptions::new() .host(&db_host) .port(db_port) .username(&db_user_name) .password(&token) .database(&db_name) .ssl_root_cert_from_pem(RDS_CERTS.to_vec()) .ssl_mode(sqlx::postgres::PgSslMode::Require); let pool = sqlx::postgres::PgPoolOptions::new() .connect_with(opts) .await?; let result: i32 = sqlx::query_scalar("SELECT $1 + $2") .bind(3) .bind(2) .fetch_one(&pool) .await?; println!("Result: {:?}", result); Ok(json!({ "statusCode": 200, "content-type": "text/plain", "body": format!("The selected sum is: {result}") })) }
次のコード例では、Kinesis ストリームからレコードを受信することによってトリガーされるイベントを受け取る、Lambda 関数の実装方法を示しています。この関数は Kinesis ペイロードを取得し、それを Base64 からデコードして、そのレコードの内容をログ記録します。
- SDK for Rust
-
注記
GitHub には、その他のリソースもあります。サーバーレスサンプル
リポジトリで完全な例を検索し、設定および実行の方法を確認してください。 Rust を使用した Lambda での Kinesis イベントの消費。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::event::kinesis::KinesisEvent; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> { if event.payload.records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(()); } event.payload.records.iter().for_each(|record| { tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default()); let record_data = std::str::from_utf8(&record.kinesis.data); match record_data { Ok(data) => { // log the record data tracing::info!("Data: {}", data); } Err(e) => { tracing::error!("Error: {}", e); } } }); tracing::info!( "Successfully processed {} records", event.payload.records.len() ); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }
次のコード例は、DynamoDB ストリームからレコードを受信することによってトリガーされるイベントを受信する Lambda 関数を実装する方法を示しています。関数は DynamoDB ペイロードを取得し、レコードの内容をログ記録します。
- SDK for Rust
-
注記
GitHub には、その他のリソースもあります。サーバーレスサンプル
リポジトリで完全な例を検索し、設定および実行の方法を確認してください。 Rust を使用して Lambda で DynamoDB イベントを利用します。
use lambda_runtime::{service_fn, tracing, Error, LambdaEvent}; use aws_lambda_events::{ event::dynamodb::{Event, EventRecord}, }; // Built with the following dependencies: //lambda_runtime = "0.11.1" //serde_json = "1.0" //tokio = { version = "1", features = ["macros"] } //tracing = { version = "0.1", features = ["log"] } //tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] } //aws_lambda_events = "0.15.0" async fn function_handler(event: LambdaEvent<Event>) ->Result<(), Error> { let records = &event.payload.records; tracing::info!("event payload: {:?}",records); if records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(()); } for record in records{ log_dynamo_dbrecord(record); } tracing::info!("Dynamo db records processed"); // Prepare the response Ok(()) } fn log_dynamo_dbrecord(record: &EventRecord)-> Result<(), Error>{ tracing::info!("EventId: {}", record.event_id); tracing::info!("EventName: {}", record.event_name); tracing::info!("DynamoDB Record: {:?}", record.change ); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) .with_target(false) .without_time() .init(); let func = service_fn(function_handler); lambda_runtime::run(func).await?; Ok(()) }
次のコード例は、DocumentDB 変更ストリームからレコードを受信することによってトリガーされるイベントを受信する Lambda 関数を実装する方法を示しています。関数は DocumentDB ペイロードを取得し、レコードの内容をログ記録します。
- SDK for Rust
-
注記
GitHub には、その他のリソースもあります。サーバーレスサンプル
リポジトリで完全な例を検索し、設定および実行の方法を確認してください。 Rust を使用して Lambda で Amazon DocumentDB イベントの消費。
use lambda_runtime::{service_fn, tracing, Error, LambdaEvent}; use aws_lambda_events::{ event::documentdb::{DocumentDbEvent, DocumentDbInnerEvent}, }; // Built with the following dependencies: //lambda_runtime = "0.11.1" //serde_json = "1.0" //tokio = { version = "1", features = ["macros"] } //tracing = { version = "0.1", features = ["log"] } //tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] } //aws_lambda_events = "0.15.0" async fn function_handler(event: LambdaEvent<DocumentDbEvent>) ->Result<(), Error> { tracing::info!("Event Source ARN: {:?}", event.payload.event_source_arn); tracing::info!("Event Source: {:?}", event.payload.event_source); let records = &event.payload.events; if records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(()); } for record in records{ log_document_db_event(record); } tracing::info!("Document db records processed"); // Prepare the response Ok(()) } fn log_document_db_event(record: &DocumentDbInnerEvent)-> Result<(), Error>{ tracing::info!("Change Event: {:?}", record.event); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) .with_target(false) .without_time() .init(); let func = service_fn(function_handler); lambda_runtime::run(func).await?; Ok(()) }
次のコード例は、Amazon MSK クラスターからレコードを受信することによってトリガーされるイベントを受信する Lambda 関数を実装する方法を示しています。関数は MSK ペイロードを取得し、レコードの内容をログ記録します。
- SDK for Rust
-
注記
GitHub には、その他のリソースもあります。サーバーレスサンプル
リポジトリで完全な例を検索し、設定および実行の方法を確認してください。 Rust を使用した Lambda での Amazon MSK イベントの消費。
use aws_lambda_events::event::kafka::KafkaEvent; use lambda_runtime::{run, service_fn, tracing, Error, LambdaEvent}; use base64::prelude::*; use serde_json::{Value}; use tracing::{info}; /// Pre-Requisites: /// 1. Install Cargo Lambda - see https://www.cargo-lambda.info/guide/getting-started.html /// 2. Add packages tracing, tracing-subscriber, serde_json, base64 /// /// This is the main body for the function. /// Write your code inside it. /// There are some code example in the following URLs: /// - https://github.com/awslabs/aws-lambda-rust-runtime/tree/main/examples /// - https://github.com/aws-samples/serverless-rust-demo/ async fn function_handler(event: LambdaEvent<KafkaEvent>) -> Result<Value, Error> { let payload = event.payload.records; for (_name, records) in payload.iter() { for record in records { let record_text = record.value.as_ref().ok_or("Value is None")?; info!("Record: {}", &record_text); // perform Base64 decoding let record_bytes = BASE64_STANDARD.decode(record_text)?; let message = std::str::from_utf8(&record_bytes)?; info!("Message: {}", message); } } Ok(().into()) } #[tokio::main] async fn main() -> Result<(), Error> { // required to enable CloudWatch error logging by the runtime tracing::init_default_subscriber(); info!("Setup CW subscriber!"); run(service_fn(function_handler)).await }
次のコード例は、S3 バケットにオブジェクトをアップロードすることによってトリガーされるイベントを受け取る Lambda 関数を実装する方法を示しています。この関数は、イベントパラメータから S3 バケット名とオブジェクトキーを取得し、Amazon S3 API を呼び出してオブジェクトのコンテンツタイプを取得してログに記録します。
- SDK for Rust
-
注記
GitHub には、その他のリソースもあります。サーバーレスサンプル
リポジトリで完全な例を検索し、設定および実行の方法を確認してください。 Rust を使用して Lambda で S3 イベントを消費します。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::event::s3::S3Event; use aws_sdk_s3::{Client}; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; /// Main function #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) .with_target(false) .without_time() .init(); // Initialize the AWS SDK for Rust let config = aws_config::load_from_env().await; let s3_client = Client::new(&config); let res = run(service_fn(|request: LambdaEvent<S3Event>| { function_handler(&s3_client, request) })).await; res } async fn function_handler( s3_client: &Client, evt: LambdaEvent<S3Event> ) -> Result<(), Error> { tracing::info!(records = ?evt.payload.records.len(), "Received request from SQS"); if evt.payload.records.len() == 0 { tracing::info!("Empty S3 event received"); } let bucket = evt.payload.records[0].s3.bucket.name.as_ref().expect("Bucket name to exist"); let key = evt.payload.records[0].s3.object.key.as_ref().expect("Object key to exist"); tracing::info!("Request is for {} and object {}", bucket, key); let s3_get_object_result = s3_client .get_object() .bucket(bucket) .key(key) .send() .await; match s3_get_object_result { Ok(_) => tracing::info!("S3 Get Object success, the s3GetObjectResult contains a 'body' property of type ByteStream"), Err(_) => tracing::info!("Failure with S3 Get Object request") } Ok(()) }
次のコード例は、SNS トピックからメッセージを受信することによってトリガーされるイベントを受け取る Lambda 関数を実装する方法を示しています。この関数はイベントパラメータからメッセージを取得し、各メッセージの内容を記録します。
- SDK for Rust
-
注記
GitHub には、その他のリソースもあります。サーバーレスサンプル
リポジトリで完全な例を検索し、設定および実行の方法を確認してください。 Rust を使用して Lambda で SNS イベントを消費します。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::event::sns::SnsEvent; use aws_lambda_events::sns::SnsRecord; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; use tracing::info; // Built with the following dependencies: // aws_lambda_events = { version = "0.10.0", default-features = false, features = ["sns"] } // lambda_runtime = "0.8.1" // tokio = { version = "1", features = ["macros"] } // tracing = { version = "0.1", features = ["log"] } // tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] } async fn function_handler(event: LambdaEvent<SnsEvent>) -> Result<(), Error> { for event in event.payload.records { process_record(&event)?; } Ok(()) } fn process_record(record: &SnsRecord) -> Result<(), Error> { info!("Processing SNS Message: {}", record.sns.message); // Implement your record handling code here. Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) .with_target(false) .without_time() .init(); run(service_fn(function_handler)).await }
次のコード例では、SQS キューからメッセージを受信することによってトリガーされるイベントを受け取る、Lambda 関数の実装方法を示しています。この関数はイベントパラメータからメッセージを取得し、各メッセージの内容を記録します。
- SDK for Rust
-
注記
GitHub には、その他のリソースもあります。サーバーレスサンプル
リポジトリで完全な例を検索し、設定および実行の方法を確認してください。 Rust を使用して Lambda で SQS イベントを消費します。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::event::sqs::SqsEvent; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> { event.payload.records.iter().for_each(|record| { // process the record tracing::info!("Message body: {}", record.body.as_deref().unwrap_or_default()) }); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }
以下のコード例では、Kinesis ストリームからイベントを受け取る Lambda 関数のための、部分的なバッチレスポンスの実装方法を示しています。この関数は、レスポンスとしてバッチアイテムの失敗を報告し、対象のメッセージを後で再試行するよう Lambda に伝えます。
- SDK for Rust
-
注記
GitHub には、その他のリソースもあります。サーバーレスサンプル
リポジトリで完全な例を検索し、設定および実行の方法を確認してください。 Rust を使用した Lambda での Kinesis バッチアイテム失敗のレポート。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::{ event::kinesis::KinesisEvent, kinesis::KinesisEventRecord, streams::{KinesisBatchItemFailure, KinesisEventResponse}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<KinesisEventResponse, Error> { let mut response = KinesisEventResponse { batch_item_failures: vec![], }; if event.payload.records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(response); } for record in &event.payload.records { tracing::info!( "EventId: {}", record.event_id.as_deref().unwrap_or_default() ); let record_processing_result = process_record(record); if record_processing_result.is_err() { response.batch_item_failures.push(KinesisBatchItemFailure { item_identifier: record.kinesis.sequence_number.clone(), }); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return Ok(response); } } tracing::info!( "Successfully processed {} records", event.payload.records.len() ); Ok(response) } fn process_record(record: &KinesisEventRecord) -> Result<(), Error> { let record_data = std::str::from_utf8(record.kinesis.data.as_slice()); if let Some(err) = record_data.err() { tracing::error!("Error: {}", err); return Err(Error::from(err)); } let record_data = record_data.unwrap_or_default(); // do something interesting with the data tracing::info!("Data: {}", record_data); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }
次のコード例は、DynamoDB ストリームからイベントを受信する Lambda 関数に部分的なバッチレスポンスを実装する方法を示しています。この関数は、レスポンスとしてバッチアイテムの失敗を報告し、対象のメッセージを後で再試行するよう Lambda に伝えます。
- SDK for Rust
-
注記
GitHub には、その他のリソースもあります。サーバーレスサンプル
リポジトリで完全な例を検索し、設定および実行の方法を確認してください。 Rust を使用して Lambda で DynamoDB のバッチアイテム失敗のレポート。
use aws_lambda_events::{ event::dynamodb::{Event, EventRecord, StreamRecord}, streams::{DynamoDbBatchItemFailure, DynamoDbEventResponse}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; /// Process the stream record fn process_record(record: &EventRecord) -> Result<(), Error> { let stream_record: &StreamRecord = &record.change; // process your stream record here... tracing::info!("Data: {:?}", stream_record); Ok(()) } /// Main Lambda handler here... async fn function_handler(event: LambdaEvent<Event>) -> Result<DynamoDbEventResponse, Error> { let mut response = DynamoDbEventResponse { batch_item_failures: vec![], }; let records = &event.payload.records; if records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(response); } for record in records { tracing::info!("EventId: {}", record.event_id); // Couldn't find a sequence number if record.change.sequence_number.is_none() { response.batch_item_failures.push(DynamoDbBatchItemFailure { item_identifier: Some("".to_string()), }); return Ok(response); } // Process your record here... if process_record(record).is_err() { response.batch_item_failures.push(DynamoDbBatchItemFailure { item_identifier: record.change.sequence_number.clone(), }); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return Ok(response); } } tracing::info!("Successfully processed {} record(s)", records.len()); Ok(response) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }
以下のコード例では、SQS キューからイベントを受け取る Lambda 関数のための、部分的なバッチレスポンスの実装方法を示しています。この関数は、レスポンスとしてバッチアイテムの失敗を報告し、対象のメッセージを後で再試行するよう Lambda に伝えます。
- SDK for Rust
-
注記
GitHub には、その他のリソースもあります。サーバーレスサンプル
リポジトリで完全な例を検索し、設定および実行の方法を確認してください。 Rust を使用した Lambda での SQS バッチアイテム失敗のレポート。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::{ event::sqs::{SqsBatchResponse, SqsEvent}, sqs::{BatchItemFailure, SqsMessage}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn process_record(_: &SqsMessage) -> Result<(), Error> { Err(Error::from("Error processing message")) } async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<SqsBatchResponse, Error> { let mut batch_item_failures = Vec::new(); for record in event.payload.records { match process_record(&record).await { Ok(_) => (), Err(_) => batch_item_failures.push(BatchItemFailure { item_identifier: record.message_id.unwrap(), }), } } Ok(SqsBatchResponse { batch_item_failures, }) } #[tokio::main] async fn main() -> Result<(), Error> { run(service_fn(function_handler)).await }
AWS コミュニティへの貢献
次のコード例は、Lambda と DynamoDB で API Gateway を使用してサーバーレスアプリケーションを構築およびテストする方法を示しています。
- SDK for Rust
-
Rust SDK を使用し、Lambda および DynamoDB を備えた API Gateway で構成されるサーバーレスアプリケーションを構築およびテストする方法が示されます。
完全なソースコードとセットアップおよび実行の手順については、GitHub
で完全な例を参照してください。 この例で使用されているサービス
API Gateway
DynamoDB
Lambda