

# Lambda 持久性函数
<a name="durable-functions"></a>

Lambda 持久性函数使您能够构建具有高韧性的多步骤应用程序和人工智能工作流，这些应用程序和工作流能够最长运行一年，并且即便出现中断也能保持稳定进展。当一个持久性函数运行时，这个完整的生命周期就被称为一次持久执行。它利用检查点来跟踪进度，并通过回放、重新从头开始执行并跳过已完成的工作的方式来自动从故障中恢复。

在每个函数中，您都将持久操作用作基础构建块。步骤通过内置的重试和进度跟踪功能执行业务逻辑，而等待会暂停执行但不会产生计算费用，因此非常适合长时间运行的流程，例如人工干预工作流程或轮询外部依赖项。无论您是在处理订单、协调微服务，还是在编排代理式人工智能应用程序，持久性函数都能自动维护状态，并在您使用熟悉的编程语言编写代码时从故障中自动恢复。

## 主要优势
<a name="durable-functions-benefits"></a>

**以自然的方式编写弹性代码：**借助熟悉的编程结构，您能够编写出能够自动处理故障的代码。内置的检查点机制、透明的重试功能以及自动恢复功能意味着您的业务逻辑能够保持清晰且专注。

**仅按实际用量付费：**在等待操作期间，您的函数会暂停运行，但不会产生计算费用。对于那些长时间运行数小时或数天的工作流程，您只需为实际的处理时间付费，而无需为闲置等待时间付费。

**操作简便性：**借助 Lambda 的无服务器模型，您可以实现自动扩缩（包括扩缩至零），而无需管理基础设施。持久性函数可自动处理状态管理、重试逻辑和故障恢复，从而减少操作开销。

## 何时使用持久性函数
<a name="durable-functions-use-cases"></a>

**短期协调：**在多个服务之间协调支付、库存和发货事宜，并在出现故障时自动进行回滚操作。通过验证、付款授权、库存分配和履行等环节来处理订单，并确保完成任务。

**放心处理付款事宜：**构建弹性支付流程，使其能够在出现故障时保持交易状态，并能自动处理重试操作。协调多步骤授权、欺诈检测以及跨支付提供商的结算操作，实现各步骤间的完全可审计性。

**构建可靠的 AI 工作流程：**创建多步骤 AI 工作流程，以将模型调用串联起来，纳入人类反馈，并在出现故障时以确定性的方式处理长时间运行的任务。自动在暂停后恢复运行，并且仅按实际执行时间收费。

**协调复杂的订单履行：**通过内置的弹性机制，实现库存、支付、发货和通知系统之间的订单处理协调。自动处理部分故障，即便出现中断也能保留订单状态，并且能够高效地等待外部事件而无需消耗计算资源。

**自动执行多步骤业务工作流程：**为员工入职、贷款审批以及涵盖数天或数周的合规流程构建可靠的工作流程。在人工审批、系统集成以及调度任务过程中保持工作流程状态的一致性，同时提供对流程状态和历史记录的完全可见性。

### 持久性函数与 Step Functions 相比如何
<a name="durable-functions-vs-step-functions"></a>

持久性函数和 Step Functions 两者都为工作流编排提供自动状态管理。主要区别在于它们的运行位置以及您如何定义工作流：
+ **持久性函数：**在 Lambda 中运行，使用标准编程语言，在 Lambda 环境中进行管理
+ **Step Functions：**独立服务、基于图形的 DSL 或视觉设计器，完全托管，零维护

持久性函数非常适合在 Lambda 中进行应用程序开发，其中工作流与业务逻辑紧密耦合。Step Functions 擅长跨 AWS 服务进行工作流编排，您需要可视化设计、与 220 多种服务的本机集成以及零维护基础设施。

有关详细比较，请参阅[持久性函数或 Step Functions](durable-step-functions.md)。

## 工作原理
<a name="durable-functions-how-it-works"></a>

 在后台，持久性函数是使用检查点/回放机制的常规 Lambda 函数，该机制用于跟踪进度并通过用户定义的暂停点支持长时间运行的操作，通常被称为持久执行。当您的函数从暂停或中断中恢复后，系统会执行重放操作。在回放过程中，您的代码会从头开始运行，但会跳过已完成的检查点，会使用已存储的结果而不是重新执行已完成的操作。这种回放机制既能保证一致性，又能支持长时间的执行过程。

为了在您的应用程序中利用这种检查点与回放机制，Lambda 提供了一种持久执行 SDK。该 SDK 消除了管理检查点和重放的复杂性，只向您展示了一些称为持久操作的简单基元，您可以在代码中使用这些基元。该 SDK 可用于 JavaScript、TypeScript、Python 和 Java（预览版），可与您现有的 Lambda 开发工作流程无缝集成。

使用该 SDK 封装您的 Lambda 事件处理程序后，该处理程序会随事件提供一个 DurableContext。此上下文使您能够访问诸如步骤和等待之类的持久操作。您可以按照常规的顺序编写函数逻辑代码，但无需直接调用服务，而是将这些调用封装在步骤中以实现自动检查点机制和重试。当您需要暂停执行时，可以添加等待操作，这样可以暂停您的函数运行，而不会产生费用。该 SDK 可以在后台处理所有复杂的状态管理和重放，从而确保您的代码简洁易读。

 ![\[Filter for Amazon Inspector results related to Lambda functions\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/how_durable_works.png) 

## 后续步骤
<a name="durable-functions-next-steps"></a>
+ [开始使用持久性函数](durable-getting-started.md)
+ [探索持久执行 SDK](durable-execution-sdk.md)
+ [持久性函数或 Step Functions](durable-step-functions.md)
+ [监控和调试持久性函数](durable-monitoring.md)
+ [查看安全性和权限](durable-security.md)
+ [遵循最佳实践](durable-best-practices.md)

# 基本概念
<a name="durable-basic-concepts"></a>

Lambda 为 JavaScript、TypeScript 和 Python 提供持久性执行 SDK。这些 SDK 是构建持久性函数的基础，提供了记录进度检查点、处理重试和管理执行流所需的基元。有关完整的 SDK 文档和示例，请参阅 GitHub 上的 [JavaScript/TypeScript SDK](https://github.com/aws/aws-durable-execution-sdk-js) 和 [Python SDK](https://github.com/aws/aws-durable-execution-sdk-python)。

## 持久执行
<a name="durable-execution-concept"></a>

**持久执行**代表了 Lambda 持久性函数的整个生命周期，它通过检查点和回放机制来跟踪业务逻辑的执行进度、暂停执行以及从故障中恢复。当函数在暂停或中断后恢复时，会回放之前完成的检查点，然后函数继续执行。

该生命周期可能包括多次调用 Lambda 函数以完成执行，尤其是在暂停或故障恢复之后。这种方法使您的函数能够长时间运行（最长一年），同时即使出现中断，也能保持可靠的进度。

**重播的工作原理**  
Lambda 会持续记录所有持久操作（步骤、等待以及其他操作）的执行过程，直至您的函数完成执行。当您的函数需要暂停或遇到中断时，Lambda 会保存此检查点日志并停止执行。当需要恢复时，Lambda 会从头开始再次调用您的函数并重播检查点日志，从而用存储的值替代已完成的操作。这意味着您的代码会再次运行，但之前已完成的步骤不会再次执行。而是会使用它们存储的结果。

这种重播机制是理解持久性函数的基础。您的代码在重播期间必须是确定性的，也就是说，对于相同的输入，它会产生相同的输出结果。避免在步骤之外进行会产生副作用的操作（例如生成随机数或获取当前时间），因为这些操作在重播时可能会产生不同的结果，从而导致非确定性行为。

## DurableContext
<a name="durable-context-concept"></a>

**DurableContext** 是您的持久性函数接收的上下文对象。它提供了持久操作的方法，例如创建检查点和管理执行流的步骤和等待。

您的持久性函数会接收 `DurableContext` 而不是默认的 Lambda 上下文：

------
#### [ TypeScript ]

```
import {
  DurableContext,
  withDurableExecution,
} from "@aws/durable-execution-sdk-js";

export const handler = withDurableExecution(
  async (event: any, context: DurableContext) => {
    const result = await context.step(async () => {
      return "step completed";
    });
    return result;
  },
);
```

------
#### [ Python ]

```
from aws_durable_execution_sdk_python import (
    DurableContext,
    durable_execution,
    durable_step,
)

@durable_step
def my_step(step_context, data):
    # Your business logic
    return result

@durable_execution
def handler(event, context: DurableContext):
    result = context.step(my_step(event["data"]))
    return result
```

------

用于持久性函数的 Python SDK 会使用同步方法，但不支持 `await`。TypeScript SDK 使用 `async/await`。

## Steps
<a name="steps-concept"></a>

**步骤**通过内置重试和自动检查点机制运行业务逻辑。每个步骤都会保存其结果，这样即使出现中断情况，您的函数也能从任何已完成的步骤处恢复执行。

------
#### [ TypeScript ]

```
// Each step is automatically checkpointed
const order = await context.step(async () => processOrder(event));
const payment = await context.step(async () => processPayment(order));
const result = await context.step(async () => completeOrder(payment));
```

------
#### [ Python ]

```
# Each step is automatically checkpointed
order = context.step(lambda: process_order(event))
payment = context.step(lambda: process_payment(order))
result = context.step(lambda: complete_order(payment))
```

------

## 等待状态
<a name="wait-states-concept"></a>

**等待状态**是计划中的暂停，在该状态下，您的函数停止运行（并停止收费），直到需要继续运行时。使用它们来等待时间段、外部回调或特定条件。

------
#### [ TypeScript ]

```
// Wait for 1 hour without consuming resources
await context.wait({ seconds:3600 });

// Wait for external callback
const approval = await context.waitForCallback(
  async (callbackId) => sendApprovalRequest(callbackId)
);
```

------
#### [ Python ]

```
# Wait for 1 hour without consuming resources
context.wait(3600)

# Wait for external callback
approval = context.wait_for_callback(
    lambda callback_id: send_approval_request(callback_id)
)
```

------

当您的函数遇到等待或需要暂停时，Lambda 会保存此检查点日志并停止执行。当需要恢复时，Lambda 会再次调用您的函数并重播检查点日志，从而用存储的值替代已完成的操作。

对于更复杂的工作流，持久性 Lambda 函数还附带高级操作，例如用于并发执行的 `parallel()`、用于处理数组的 `map()`、用于嵌套操作的 `runInChildContext()` 和用于轮询操作的 `waitForCondition()`。有关何时使用每种操作的详细示例和指导，请参阅[示例](durable-examples.md)。

## 调用其他函数
<a name="invoke-concept"></a>

**调用**使持久性函数能够调用其他 Lambda 函数并等待其结果。调用函数在被调用函数执行期间会暂停，从而形成一个检查点，以保存执行结果。这使您能够构建模块化工作流，专用函数可以在其中处理特定任务。

使用 `context.invoke()` 从您的持久性函数中调用其他函数。该调用会被检查点存储，因此如果您的函数在被调用函数执行完毕后中断，它会以存储的结果继续执行，而无需重新调用函数。

------
#### [ TypeScript ]

```
// Invoke another function and wait for result
const customerData = await context.invoke(
  'validate-customer',
  'arn:aws:lambda:us-east-1:123456789012:function:customer-service:1',
  { customerId: event.customerId }
);

// Use the result in subsequent steps
const order = await context.step(async () => {
  return processOrder(customerData);
});
```

------
#### [ Python ]

```
# Invoke another function and wait for result
customer_data = context.invoke(
    'arn:aws:lambda:us-east-1:123456789012:function:customer-service:1',
    {'customerId': event['customerId']},
    name='validate-customer'
)

# Use the result in subsequent steps
order = context.step(
    lambda: process_order(customer_data),
    name='process-order'
)
```

------

调用的函数可以是持久性 Lambda 函数，也可以是标准的 Lambda 函数。如果您调用了持久性函数，则调用函数将等待完整的持久执行完成。这种模式在微服务架构中很常见，在这种架构中，每个函数负责处理特定的领域，从而使您能够通过使用专门的、可复用的函数来构建复杂的工作流。

**注意**  
不支持跨账户调用。被调用的函数必须与调用函数在同一 AWS 账户中。

## 持久性函数配置
<a name="durable-configuration-basic"></a>

持久性函数具有控制执行行为和数据留存的特定配置设置。这些设置独立于标准的 Lambda 函数配置，并且适用于整个持久执行生命周期。

**DurableConfig** 对象用于定义持久性函数的配置：

```
{
  "ExecutionTimeout": Integer,
  "RetentionPeriodInDays": Integer
}
```

### Execution timeout (执行超时)
<a name="durable-execution-timeout"></a>

**执行超时**用于控制持久执行从开始到完成的运行时长。该超时与 Lambda 函数的超时不同，后者用于控制单次函数调用的运行时长。

在其经历检查点、等待和重播的过程中，一个持久执行过程能够跨越多次 Lambda 函数调用。执行超时时间适用于持久执行的总耗时，而非针对每个函数调用而言。

**了解差异**  
Lambda 函数超时时间（最长 15 分钟）会限制您函数的每次调用。持久执行超时（最长 1 年）限制从执行开始到执行完成、失败或超时的总时长。在此期间，由于您的函数会处理步骤、等待和从故障中恢复，可能会被多次调用。

例如，如果您将持久执行超时设置为 24 小时，并将 Lambda 函数超时设置为 5 分钟：
+ 每次函数调用必须在 5 分钟内完成
+ 整个持久执行最多可以运行 24 小时
+ 在这 24 小时内，您的函数可能会被多次调用
+ 等待操作不计入 Lambda 函数超时，但会计入执行超时

在使用 Lambda 控制台、AWS CLI 或 AWS SAM 创建持久性函数时，您可以配置执行超时。在 Lambda 控制台中，选择您的函数，然后选择“配置”“持久执行”。以秒为单位设置执行超时值（默认值：86400 秒/24 小时，最小值：60 秒，最大值：31536000 秒/1 年）。

**注意**  
执行超时和 Lambda 函数超时是不同的设置。Lambda 函数超时控制每次单独调用所能运行的最长时间（最长为 15 分钟）。执行超时控制整个持久执行的总耗时（最长 1 年）。

### 保留期
<a name="durable-retention-period"></a>

**保留期**控制 Lambda 在持久执行完成后将执行历史记录和检查点数据保留的时长。此数据包括步骤结果、执行状态以及完整的检查点日志。

保留期过期后，Lambda 将删除执行历史记录和检查点数据。您无法再检索执行详细信息或重播执行。保留期从执行达到终端状态（SUCCEEDED、FAILED、STOPPED 或 TIMED\$1OUT）时开始。

在使用 Lambda 控制台、AWS CLI 或 AWS SAM 创建持久性函数时，您可以配置保留期。在 Lambda 控制台中，选择您的函数，然后选择“配置”“持久执行”。以天为单位设置保留期值（默认值：14 天，最小值：1 天，最大值：90 天）。

根据您的合规性要求、调试需求和成本考虑因素选择保留期。保留期越长，为调试和审计提供的时间越多，但会增加存储成本。

## 另请参阅
<a name="durable-basic-concepts-see-also"></a>
+ [持久性函数或 Step Functions](durable-step-functions.md)：比较持久性函数与 Step Functions，了解每种方法何时最有效。

# 创建 Lambda 持久性函数
<a name="durable-getting-started"></a>

要开始使用 Lambda 持久性函数，请使用 Lambda 控制台创建持久性函数。您可以在几分钟内创建并部署一个持久性函数，该函数使用步骤和等待来演示基于检查点的执行。

在您执行教程的过程中，您将学习一些基本的持久性函数概念，例如如何使用 `DurableContext` 对象、如何通过步骤创建检查点以及如何使用等待功能来暂停执行。您还将了解到在您的函数在等待结束后恢复时，重放机制是如何运作的。

为了简单起见，本教程将向您展示如何使用 Python 或 Node.js 运行时创建函数。您可以使用这些解释性语言，在控制台的内置代码编辑器中直接编辑函数代码。

Java（预览）中的持久性函数目前只能通过容器映像部署。有关通过容器映像创建持久性函数的更多信息，请参阅[持久性函数支持的运行时](durable-supported-runtimes.md)或[使用基础设施即代码部署 Lambda 持久性函数](durable-getting-started-iac.md)。

**注意**  
持久性函数目前支持 Python 和 Node.js（JavaScript/TypeScript）运行时和容器映像（OCI），如 Java。有关支持的运行时版本和容器映像选项的完整列表，请参阅[持久性函数支持的运行时](durable-supported-runtimes.md)。有关将容器映像与 Lambda 结合使用的更多信息，请参阅《Lambda 开发人员指南》中的[创建 Lambda 容器映像](https://docs.aws.amazon.com/lambda/latest/dg/images-create.html)。

**提示**  
要了解如何构建**无服务器解决方案**，请查看[无服务器开发人员指南](https://docs.aws.amazon.com/serverless/latest/devguide/)。

## 先决条件
<a name="durable-getting-started-prerequisites"></a>

### 注册 AWS 账户
<a name="sign-up-for-aws"></a>

如果您还没有，AWS 账户请完成以下步骤来创建一个。

**注册 AWS 账户**

1. 打开 [https://portal.aws.amazon.com/billing/signup](https://portal.aws.amazon.com/billing/signup)。

1. 按照屏幕上的说明操作。

   在注册时，将接到电话或收到短信，要求使用电话键盘输入一个验证码。

   当您注册 AWS 账户 时，系统将会创建一个。*AWS 账户根用户*根用户有权访问该账户中的所有 AWS 服务 和资源。作为最佳安全实践，请为用户分配管理访问权限，并且只使用根用户来执行[需要根用户访问权限的任务](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_root-user.html#root-user-tasks)。

注册过程完成后，AWS 会向您发送一封确认电子邮件。在任何时候，您都可以通过转至 [https://aws.amazon.com/](https://aws.amazon.com/) 并选择**我的账户**来查看当前的账户活动并管理您的账户。

### 创建具有管理访问权限的用户
<a name="create-an-admin"></a>

注册 AWS 账户 后，请保护好您的，AWS 账户根用户启用，AWS IAM Identity Center并创建一个管理用户，以避免使用根用户执行日常任务。

**保护您的 AWS 账户根用户**

1.  选择**根用户**并输入您的 AWS 账户 电子邮件地址，以账户拥有者身份登录。[AWS 管理控制台](https://console.aws.amazon.com/)在下一页上，输入您的密码。

   要获取使用根用户登录方面的帮助，请参阅《AWS 登录 用户指南》**中的 [Signing in as the root user](https://docs.aws.amazon.com/signin/latest/userguide/console-sign-in-tutorials.html#introduction-to-root-user-sign-in-tutorial)。

1. 为您的根用户启用多重身份验证（MFA）。

   有关说明，请参阅《IAM 用户指南》**中的[为 AWS 账户 根用户启用虚拟 MFA 设备（控制台）](https://docs.aws.amazon.com/IAM/latest/UserGuide/enable-virt-mfa-for-root.html)。

**创建具有管理访问权限的用户**

1. 启用 IAM Identity Center。

   有关说明，请参阅**《AWS IAM Identity Center 用户指南》中的[启用 AWS IAM Identity Center](https://docs.aws.amazon.com//singlesignon/latest/userguide/get-set-up-for-idc.html)。

1. 在 IAM Identity Center 中，为用户授予管理访问权限。

   有关如何使用 IAM Identity Center 目录 作为身份源的教程，请参阅**《AWS IAM Identity Center 用户指南》中的 [Configure user access with the default。IAM Identity Center 目录](https://docs.aws.amazon.com//singlesignon/latest/userguide/quick-start-default-idc.html)

**以具有管理访问权限的用户身份登录**
+ 要使用您的 IAM Identity Center 用户身份登录，请使用您在创建 IAM Identity Center 用户时发送到您的电子邮件地址的登录 URL。

  要获取使用 IAM Identity Center 用户登录方面的帮助，请参阅《AWS 登录 用户指南》**中的 [Signing in to the AWS access portal](https://docs.aws.amazon.com/signin/latest/userguide/iam-id-center-sign-in-tutorial.html)。

**将访问权限分配给其他用户**

1. 在 IAM Identity Center 中，创建一个权限集，该权限集遵循应用最低权限的最佳做法。

   有关说明，请参阅《AWS IAM Identity Center 用户指南》**中的 [Create a permission set](https://docs.aws.amazon.com//singlesignon/latest/userguide/get-started-create-a-permission-set.html)。

1. 将用户分配到一个组，然后为该组分配单点登录访问权限。

   有关说明，请参阅《AWS IAM Identity Center 用户指南》**中的 [Add groups](https://docs.aws.amazon.com//singlesignon/latest/userguide/addgroups.html)。

## 使用控制台创建 Lambda 持久性函数
<a name="getting-started-create-durable-function"></a>

在此示例中，您的持久性函数通过多个步骤处理订单，并具有自动检查点机制。该函数采用包含订单 ID 的 JSON 对象，对其进行验证，处理付款，然后确认订单。每个步骤都会自动进行检查点操作，因此如果函数被中断，它会从最后完成的步骤处恢复执行。

您的函数还演示了一个等待操作，它会暂停执行一小段时间，以此来模拟等待外部确认的过程。

**要使用控制台创建持久性函数**

1. 打开 Lamba 控制台的 [Functions page](https://console.aws.amazon.com/lambda/home#/functions)（函数页面）。

1. 选择**创建函数**。

1. 选择**从头开始编写**。

1. 在**基本信息**窗格中，为**函数名称**输入 `myDurableFunction`。

1. 对于**运行时**，选择 **Node.js 24** 或 **Python 3.14**。

1. 选择**启用持久执行**。

Lambda 使用包含检查点操作（`lambda:CheckpointDurableExecutions` 和 `lambda:GetDurableExecutionState`）权限的[执行角色](lambda-intro-execution-role.md)创建您的持久性函数。

**注意**  
Lambda 运行时包括持久执行 SDK，因此您无需打包依赖项即可测试持久性函数。但是，我们建议将 SDK 包含在您的生产部署包中。这样可以确保版本一致性，并且可以避免可能影响函数的潜在运行时更新。

使用控制台的内置代码编辑器添加您的持久性函数代码。

------
#### [ Node.js ]

**在控制台中修改代码**

1. 选择**节点**选项卡。

   在控制台的内置代码编辑器中，会显示 Lambda 创建的函数代码。如果代码编辑器中没有显示 **index.mjs** 选项卡，请在文件资源管理器中选择 **index.mjs**，如下图所示。  
![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/durable-nodejs.png)

1. 将以下代码粘贴到 **index.mjs** 选项卡中，替换 Lambda 创建的代码。

   ```
   import {
     DurableContext,
     withDurableExecution,
   } from "@aws/durable-execution-sdk-js";
   
   export const handler = withDurableExecution(
     async (event, context) => {
       const orderId = event.orderId;
       
       // Step 1: Validate order
       const validationResult = await context.step(async (stepContext) => {
         stepContext.logger.info(`Validating order ${orderId}`);
         return { orderId, status: "validated" };
       });
       
       // Step 2: Process payment
       const paymentResult = await context.step(async (stepContext) => {
         stepContext.logger.info(`Processing payment for order ${orderId}`);
         return { orderId, status: "paid", amount: 99.99 };
       });
       
       // Wait for 10 seconds to simulate external confirmation
       await context.wait({ seconds: 10 });
       
       // Step 3: Confirm order
       const confirmationResult = await context.step(async (stepContext) => {
         stepContext.logger.info(`Confirming order ${orderId}`);
         return { orderId, status: "confirmed" };
       });
           
       return {
         orderId: orderId,
         status: "completed",
         steps: [validationResult, paymentResult, confirmationResult]
       };
     }
   );
   ```

1. 在**部署**部分，选择**部署**以更新函数的代码：  
![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/getting-started-tutorial/deploy-console.png)

**了解持久性函数代码**  
在继续执行下一步之前，我们来查看一下函数代码并了解一些关键的持久性函数概念。
+ `withDurableExecution` 封装器：

  您的持久性函数使用 `withDurableExecution` 进行封装。此封装器通过提供 `DurableContext` 对象和管理检查点操作来实现持久执行。
+ `DurableContext` 对象：

  您的函数会接收 `DurableContext` 而不是标准的 Lambda 上下文。此对象提供了用于执行诸如 `step()` 和 `wait()` 等持久操作的方法，这些操作会创建检查点。
+ 步骤和检查点：

  每次 `context.step()` 调用都会在执行前后创建一个检查点。如果您的函数被中断，它将从上次完成的检查点恢复。该函数不会重新执行已完成的步骤。它会改用它们存储的结果。
+ 等待操作：

  `context.wait()` 调用将暂停执行，但不会消耗计算资源。当等待完成时，Lambda 会再次调用您的函数并重放检查点日志，从而用存储的值替代已完成的步骤。
+ 重放机制：

  当您的函数在等待或中断后恢复时，Lambda 会从头开始运行您的代码。但是，已完成的步骤不会重新执行。Lambda 会从检查点日志中重放其结果。这就是为什么您的代码必须是确定性的。

------
#### [ Python ]

**在控制台中修改代码**

1. 选择**节点**选项卡。

   在控制台的内置代码编辑器中，会显示 Lambda 创建的函数代码。如果代码编辑器中没有显示 **lambda\$1function.py** 选项卡，请在文件资源管理器中选择 **lambda\$1function.py**，如下图所示。  
![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/durable-python.png)

1. 将以下代码粘贴到 **lambda\$1function.py** 选项卡中，替换 Lambda 创建的代码。

   ```
   from aws_durable_execution_sdk_python import (
       DurableContext,
       durable_execution,
       durable_step,
   )
   from aws_durable_execution_sdk_python.config import Duration
   
   @durable_step
   def validate_order(step_context, order_id):
       step_context.logger.info(f"Validating order {order_id}")
       return {"orderId": order_id, "status": "validated"}
   
   @durable_step
   def process_payment(step_context, order_id):
       step_context.logger.info(f"Processing payment for order {order_id}")
       return {"orderId": order_id, "status": "paid", "amount": 99.99}
   
   @durable_step
   def confirm_order(step_context, order_id):
       step_context.logger.info(f"Confirming order {order_id}")
       return {"orderId": order_id, "status": "confirmed"}
   
   @durable_execution
   def lambda_handler(event, context: DurableContext):
       order_id = event['orderId']
       
       # Step 1: Validate order
       validation_result = context.step(validate_order(order_id))
       
       # Step 2: Process payment
       payment_result = context.step(process_payment(order_id))
       
       # Wait for 10 seconds to simulate external confirmation
       context.wait(Duration.from_seconds(10))
       
       # Step 3: Confirm order
       confirmation_result = context.step(confirm_order(order_id))
           
       return {
           "orderId": order_id,
           "status": "completed",
           "steps": [validation_result, payment_result, confirmation_result]
       }
   ```

1. 在**部署**部分，选择**部署**以更新函数的代码：  
![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/getting-started-tutorial/deploy-console.png)

**了解持久性函数代码**  
在继续执行下一步之前，我们来查看一下函数代码并了解一些关键的持久性函数概念。
+ `@durable_execution` 修饰器：

  您的处理函数使用 `@durable_execution` 进行修饰。此修饰器通过提供 `DurableContext` 对象和管理检查点操作来实现持久执行。
+ `@durable_step` 修饰器：

  每个步骤函数都使用 `@durable_step` 进行修饰。此修饰器将该函数标记为创建检查点的持久步骤。
+ `DurableContext` 对象：

  您的函数会接收 `DurableContext` 而不是标准的 Lambda 上下文。此对象提供了用于执行诸如 `step()` 和 `wait()` 等持久操作的方法，这些操作会创建检查点。
+ 步骤和检查点：

  每次 `context.step()` 调用都会在执行前后创建一个检查点。如果您的函数被中断，它将从上次完成的检查点恢复。该函数不会重新执行已完成的步骤。它会改用它们存储的结果。
+ 等待操作：

  `context.wait()` 调用将暂停执行，但不会消耗计算资源。当等待完成时，Lambda 会再次调用您的函数并重放检查点日志，从而用存储的值替代已完成的步骤。
+ Python SDK 是同步的：

  请注意，Python SDK 不使用 `await`。所有持久操作都是同步方法调用。

------

## 使用控制台代码编辑器调用持久性函数
<a name="get-started-invoke-durable-manually"></a>

如果未指定（或发布）显式版本，则控制台使用 `$LATEST` 版本限定符调用持久性函数。但是，为了确定性地执行代码，必须始终使用指向稳定版本的限定 ARN。

**要发布您的函数的版本**

1. 选择**版本**选项卡。

1. 选择 **새 버전 발행**。

1. 在**版本描述**中，输入 **Initial version**（可选）。

1. 选择**发布**。

1. Lambda 会创建您的函数的版本 1。请注意，函数 ARN 现在在末尾包含 `:1`，表示这是版本 1。

现在，创建一个测试事件以发送到您的函数。该事件是一个 JSON 格式的文档，其中包含订单 ID。

**创建测试事件**

1. 在控制台代码编辑器的**测试事件**部分中，选择**创建测试事件**。  
![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/getting-started-tutorial/test-event.png)

1. 对于**事件名称**，输入 **myTestEvent**。

1. 在**事件 JSON** 部分中，使用以下内容替换默认 JSON：

   ```
   {
     "orderId": "order-12345"
   }
   ```

1. 选择**保存**。

**要测试您的持久性函数并查看执行情况**

在控制台代码编辑器的**测试事件**部分中，选择测试事件旁边的运行图标：

![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/getting-started-tutorial/run-test-event.png)


您的持久性函数开始执行。因为该函数包含一段10秒的等待时间，所以初始调用会快速完成，函数将在等待期结束后恢复执行。您可以在**持久执行**选项卡中查看执行进度。

**要查看您的持久性函数执行情况**

1. 选择**持久执行**选项卡。

1. 在列表中找到执行。执行将显示当前状态（正在运行、成功或失败）。

1. 选择执行 ID 以查看详细信息，其中包括：
   + 显示每个步骤完成时间的执行时间表
   + 检查点历史记录
   + 等待期
   + 步骤结果

您还可以在 CloudWatch Logs 中查看函数的日志，以查看每个步骤的控制台输出。

**在 CloudWatch Logs 中查看函数的调用记录**

1. 打开 CloudWatch 控制台的 [Log groups page](https://console.aws.amazon.com/cloudwatch/home#logs:)（日志组页面）。

1. 选择函数 (`/aws/lambda/myDurableFunction`) 的日志组。

1. 向下滚动，选择要查看的函数调用的**日志流**。  
![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/log-stream.png)

   您应该能看到每次调用该函数时产生的日志条目，包括首次执行时的记录以及等待结束后进行的重放记录。

**注意**  
当您使用 `DurableContext` 中的记录器（如 `context.logger` 或 `stepContext.logger`）时，日志还会显示在 Lambda 控制台的持久执行和步骤视图中。这些日志可能需要一些时间才能加载。

## 清理
<a name="gettingstarted-durable-cleanup"></a>

使用完示例持久性函数后，请将其删除。您还可以删除存储函数日志的日志组以及控制台创建的[执行角色](lambda-intro-execution-role.md)。

**删除 Lambda 函数**

1. 打开 Lamba 控制台的 [Functions（函数）页面](https://console.aws.amazon.com/lambda/home#/functions)。

1. 选择您创建的函数。

1. 依次选择**操作**和**删除**。

1. 在文本输入字段中键入 **confirm**，然后选择**删除**。

**删除日志组**

1. 打开 CloudWatch 控制台的 [Log groups page](https://console.aws.amazon.com/cloudwatch/home#logs:)（日志组页面）。

1. 选择函数的日志组 (`/aws/lambda/myDurableFunction`)。

1. 依次选择 **Actions**（操作）和 **Delete log group(s)**（删除日志组）。

1. 在 **Delete log group(s)**（删除日志组）对话框中，选择 **Delete**（删除）。

**删除执行角色**

1. 打开 AWS Identity and Access Management (IAM) 控制台的 [Roles page](https://console.aws.amazon.com/iam/home?#/roles)（角色页面）。

1. 选择函数的执行角色（例如 `myDurableFunction-role-31exxmpl`）。

1. 选择**删除**。

1. 在**删除角色**对话框中，输入角色名称，然后选择**删除**。

## 其他资源和后续步骤
<a name="durable-getting-started-more-resources"></a>

现在，您已经使用控制台创建并测试了一个简单的持久性函数，请继续执行以下后续步骤：
+ 了解持久性函数的常见使用案例，包括分布式事务、订单处理和人工审查工作流程。请参见[示例](durable-examples.md)。
+ 了解如何使用 CloudWatch 指标和执行历史记录监控持久性函数的执行情况。请参见[监控和调试](durable-monitoring.md)。
+ 了解如何以同步和异步方式调用持久性函数，并管理长时间运行的执行过程。请参见[调用持久性函数](durable-invoking.md)。
+ 遵循编写确定性代码、管理检查点大小和优化成本的最佳实践。请参见[最佳实践](durable-best-practices.md)。
+ 了解如何在本地和云端测试持久性函数。请参见[测试持久性函数](durable-testing.md)。
+ 比较持久性函数与 Step Functions，了解每种方法何时最有效。请参阅[持久性函数或 Step Functions](durable-step-functions.md)。

# 使用 AWS CLI 部署和调用 Lambda 持久性函数
<a name="durable-getting-started-cli"></a>

使用 AWS CLI 通过强制性命令创建和部署 Lambda 持久性函数。这种方法能让您直接控制部署过程的每一个步骤。

## 先决条件
<a name="durable-cli-prerequisites"></a>
+ 安装和配置 AWS CLI。有关说明，请参阅[安装 AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)。
+ 创建包含函数代码和持久执行 SDK 的部署包。
+ 创建具有检查点权限的 IAM 执行角色。

## 创建执行角色
<a name="durable-cli-create-role"></a>

创建一个 IAM 角色，赋予其执行基本 Lambda 任务和进行检查点操作的权限。

**创建执行角色**

1. 创建一个信任策略文档，该文档允许 Lambda 代入该角色。将其另存为 `trust-policy.json`：

   ```
   {
     "Version": "2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "lambda.amazonaws.com"
         },
         "Action": "sts:AssumeRole"
       }
     ]
   }
   ```

1. 创建该角色：

   ```
   aws iam create-role \
     --role-name durable-function-role \
     --assume-role-policy-document file://trust-policy.json
   ```

1. 为检查点操作和基本执行附加持久执行策略：

   ```
   aws iam attach-role-policy \
     --role-name durable-function-role \
     --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicDurableExecutionRolePolicy
   ```

`AWSLambdaBasicDurableExecutionRolePolicy` 托管式策略包括检查点操作（`lambda:CheckpointDurableExecutions` 和 `lambda:GetDurableExecutionState`）和基本 Lambda 执行所需的权限。

## 创建持久性函数
<a name="durable-cli-create-function"></a>

使用 `--durable-config` 参数创建您的持久性函数。

**要创建持久性函数**

1. 将包含依赖项的函数代码打包到 .zip 文件中：

   ```
   zip -r function.zip index.mjs node_modules/
   ```

1. 创建启用了持久执行的函数：

   ```
   aws lambda create-function \
     --function-name myDurableFunction \
     --runtime nodejs22.x \
     --role arn:aws:iam::123456789012:role/durable-function-role \
     --handler index.handler \
     --zip-file fileb://function.zip \
     --durable-config '{"ExecutionTimeout": 3600, "RetentionPeriodInDays": 7}'
   ```

**注意**  
您只能在创建函数时启用持久执行。您无法对现有函数启用它。

**注意**  
当前，Java（预览）中的持久性函数目前只能通过容器映像创建。有关从容器映像创建持久性函数的更多信息，请参阅[持久性函数支持的运行时](durable-supported-runtimes.md)。

## 发布版本
<a name="durable-cli-publish-version"></a>

虽然可以使用 `$LATEST` 版本限定符调用持久性函数，但是您必须始终使用指向稳定版本的限定 ARN，以确保代码的确定性执行。

```
aws lambda publish-version \
  --function-name myDurableFunction \
  --description "Initial version"
```

该命令将返回版本 ARN。注意 ARN 末尾的版本号（例如 `:1`）。

或者，创建指向版本的别名：

```
aws lambda create-alias \
  --function-name myDurableFunction \
  --name prod \
  --function-version 1
```

## 调用持久性函数
<a name="durable-cli-invoke"></a>

使用限定的 ARN（版本或别名）调用您的持久性函数。

**注意**  
**幂等调用：**为防止在重试失败的调用时重复执行，您可以提供一个执行名称来确保最多一次执行语义。有关详细信息，请参阅[幂等性](durable-execution-idempotency.md)。

**同步调用**  
对于在 15 分钟内完成的执行，请使用同步调用：

```
aws lambda invoke \
  --function-name myDurableFunction:1 \
  --payload '{"orderId": "order-12345"}' \
  --cli-binary-format raw-in-base64-out \
  response.json
```

或者使用别名：

```
aws lambda invoke \
  --function-name myDurableFunction:prod \
  --payload '{"orderId": "order-12345"}' \
  --cli-binary-format raw-in-base64-out \
  response.json
```

**异步调用**  
对于长时间运行的执行，请使用异步调用：

```
aws lambda invoke \
  --function-name myDurableFunction:prod \
  --invocation-type Event \
  --payload '{"orderId": "order-12345"}' \
  --cli-binary-format raw-in-base64-out \
  response.json
```

通过异步调用方式，Lambda 会立即返回。函数将继续在后台执行。

**注意**  
您可以使用 `$LATEST` 在控制台中进行原型设计和测试。对于生产工作负载，请使用发布的版本或别名。

## 管理持久执行
<a name="durable-cli-manage-executions"></a>

使用以下命令管理和监控持久性函数执行。

**列出执行**  
列出持久性函数的所有执行情况：

```
aws lambda list-durable-executions-by-function \
  --function-name myDurableFunction
```

**获取执行详细信息**  
获取有关特定执行的详细信息：

```
aws lambda get-durable-execution \
  --durable-execution-arn arn:aws:lambda:us-east-1:123456789012:function:myDurableFunction:my-function-version/durable-execution/my-execution-name/my-execution-id
```

**获取执行历史记录**  
查看执行的检查点历史记录：

```
aws lambda get-durable-execution-history \
  --durable-execution-arn arn:aws:lambda:us-east-1:123456789012:function:myDurableFunction:my-function-version/durable-execution/my-execution-name/my-execution-id
```

**停止执行**  
停止正在运行的持久执行：

```
aws lambda stop-durable-execution \
  --durable-execution-arn arn:aws:lambda:us-east-1:123456789012:function:myDurableFunction:my-function-version/durable-execution/my-execution-name/my-execution-id
```

## 更新函数代码
<a name="durable-cli-update-function"></a>

更新您的持久性函数代码并发布新版本：

**要更新并发布新版本**

1. 更新函数代码：

   ```
   aws lambda update-function-code \
     --function-name myDurableFunction \
     --zip-file fileb://function.zip
   ```

1. 等待更新完成：

   ```
   aws lambda wait function-updated \
     --function-name myDurableFunction
   ```

1. 发布新版本：

   ```
   aws lambda publish-version \
     --function-name myDurableFunction \
     --description "Updated order processing logic"
   ```

1. 更新别名以指向新版本：

   ```
   aws lambda update-alias \
     --function-name myDurableFunction \
     --name prod \
     --function-version 2
   ```

**重要**  
正在运行的执行继续使用其最初的版本。新的调用将使用更新的别名版本。

## 查看函数日志
<a name="durable-cli-view-logs"></a>

在 CloudWatch Logs 中查看您的持久性函数日志：

```
aws logs tail /aws/lambda/myDurableFunction --follow
```

筛选特定执行的日志：

```
aws logs filter-log-events \
  --log-group-name /aws/lambda/myDurableFunction \
  --filter-pattern "exec-abc123"
```

## 清理 资源
<a name="durable-cli-cleanup"></a>

删除您的持久性函数和相关资源：

```
# Delete the function
aws lambda delete-function --function-name myDurableFunction

# Delete the IAM role policies
aws iam detach-role-policy \
  --role-name durable-function-role \
  --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

aws iam detach-role-policy \
  --role-name durable-function-role \
  --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicDurableExecutionRolePolicy

# Delete the role
aws iam delete-role --role-name durable-function-role
```

## 后续步骤
<a name="durable-cli-next-steps"></a>

使用 AWS CLI 命令部署您的持久性函数后：
+ 使用 `list-durable-executions` 和 `get-durable-execution` 命令监控执行情况
+ 查看 AWS CloudTrail 数据事件中的检查点操作
+ 为执行故障或长时间运行的执行设置 CloudWatch 警报
+ 使用 shell 脚本或 CI/CD 管道自动部署

有关 Lambda 的 AWS CLI 命令的更多信息，请参阅 [AWS CLI 命令参考](https://docs.aws.amazon.com/cli/latest/reference/lambda/index.html)。

# 使用基础设施即代码部署 Lambda 持久性函数
<a name="durable-getting-started-iac"></a>

您可以使用基础设施即代码（IaC）工具（例如 AWS CloudFormation、AWS CDK、AWS Serverless Application Model 或 Terraform）部署 Lambda 持久性函数。这些工具允许您通过代码来定义函数、执行角色以及权限，从而使得部署过程可重复且能够进行版本控制。

这三种工具都要求您：
+ 对函数启用持久执行
+ 授予对执行角色的检查点权限
+ 发布版本或创建别名（持久性函数需要限定的 ARN）

## 来自 ZIP 的持久性函数
<a name="durable-iac-zip"></a>

### AWS CloudFormation
<a name="durable-iac-cloudformation"></a>

使用 CloudFormation 在模板中定义您的持久性函数。以下示例创建了具有所需权限的持久性函数。

```
AWSTemplateFormatVersion: '2010-09-09'
Description: Lambda durable function example

Resources:
  DurableFunctionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'		 	 	 
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicDurableExecutionRolePolicy

  DurableFunction:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: myDurableFunction
      Runtime: nodejs22.x
      Handler: index.handler
      Role: !GetAtt DurableFunctionRole.Arn
      Code:
        ZipFile: |
          // Your durable function code here
          export const handler = async (event, context) => {
            return { statusCode: 200 };
          };
      DurableConfig:
        ExecutionTimeout: 3600
        RetentionPeriodInDays: 7

  DurableFunctionVersion:
    Type: AWS::Lambda::Version
    Properties:
      FunctionName: !Ref DurableFunction
      Description: Initial version

  DurableFunctionAlias:
    Type: AWS::Lambda::Alias
    Properties:
      FunctionName: !Ref DurableFunction
      FunctionVersion: !GetAtt DurableFunctionVersion.Version
      Name: prod

Outputs:
  FunctionArn:
    Description: Durable function ARN
    Value: !GetAtt DurableFunction.Arn
  AliasArn:
    Description: Function alias ARN (use this for invocations)
    Value: !Ref DurableFunctionAlias
```

**部署模板**

```
aws cloudformation deploy \
  --template-file template.yaml \
  --stack-name my-durable-function-stack \
  --capabilities CAPABILITY_IAM
```

### AWS CDK
<a name="durable-iac-cdk"></a>

AWS CDK 使您能够使用编程语言定义基础设施。以下示例演示如何使用 TypeScript 和 Python 创建持久性函数。

------
#### [ TypeScript ]

```
import * as cdk from 'aws-cdk-lib';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as iam from 'aws-cdk-lib/aws-iam';
import { Construct } from 'constructs';

export class DurableFunctionStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // Create the durable function
    const durableFunction = new lambda.Function(this, 'DurableFunction', {
      runtime: lambda.Runtime.NODEJS_22_X,
      handler: 'index.handler',
      code: lambda.Code.fromAsset('lambda'),
      functionName: 'myDurableFunction',
      durableConfig: { executionTimeout: Duration.hours(1), retentionPeriod: Duration.days(30) },
    });

    // Create version and alias
    const version = durableFunction.currentVersion;
    const alias = new lambda.Alias(this, 'ProdAlias', {
      aliasName: 'prod',
      version: version,
    });

    // Output the alias ARN
    new cdk.CfnOutput(this, 'FunctionAliasArn', {
      value: alias.functionArn,
      description: 'Use this ARN to invoke the durable function',
    });
  }
}
```

------
#### [ Python ]

```
from aws_cdk import (
    Stack,
    aws_lambda as lambda_,
    aws_iam as iam,
    CfnOutput,
)
from constructs import Construct

class DurableFunctionStack(Stack):
    def __init__(self, scope: Construct, id: str, **kwargs):
        super().__init__(scope, id, **kwargs)

        # Create the durable function
        durable_function = lambda_.Function(
            self, 'DurableFunction',
            runtime=lambda_.Runtime.NODEJS_22_X,
            handler='index.handler',
            code=lambda_.Code.from_asset('lambda'),
            function_name='myDurableFunction',
            durable_execution={execution_timeout: Duration.hours(1), retention_period: Duration.days(30)}
        )

        # Add durable execution managed policy for checkpoint permissions
        durable_function.role.add_managed_policy(
            iam.ManagedPolicy.from_aws_managed_policy_name('service-role/AWSLambdaBasicDurableExecutionRolePolicy')
        )

        # Create version and alias
        version = durable_function.current_version
        alias = lambda_.Alias(
            self, 'ProdAlias',
            alias_name='prod',
            version=version
        )

        # Output the alias ARN
        CfnOutput(
            self, 'FunctionAliasArn',
            value=alias.function_arn,
            description='Use this ARN to invoke the durable function'
        )
```

------

**要部署 CDK 堆栈**

```
cdk deploy
```

### AWS Serverless Application Model
<a name="durable-iac-sam"></a>

AWS SAM 简化了无服务器应用程序的 CloudFormation 模板。以下模板使用 AWS SAM 创建了一个持久性函数。

```
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Lambda durable function with SAM

Resources:
  DurableFunction:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: myDurableFunction
      Runtime: nodejs22.x
      Handler: index.handler
      CodeUri: ./src
      DurableConfig:
        ExecutionTimeout: 3600
        RetentionPeriodInDays: 7
      Policies:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicDurableExecutionRolePolicy
      AutoPublishAlias: prod

Outputs:
  FunctionArn:
    Description: Durable function ARN
    Value: !GetAtt DurableFunction.Arn
  AliasArn:
    Description: Function alias ARN (use this for invocations)
    Value: !Ref DurableFunction.Alias
```

**要部署 SAM 模板**

```
sam build
sam deploy --guided
```

### Terraform
<a name="durable-iac-terraform"></a>

Terraform 是一款支持 AWS 资源的流行开源 IaC 工具。以下示例使用 AWS 提供程序版本 6.25.0 或更高版本通过 Terraform 创建持久性函数。

```
terraform {
  required_version = ">= 1.0"
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = ">= 6.25.0"
    }
  }
}

provider "aws" {
  region = "us-east-2"
}

# IAM Role for Lambda Function
resource "aws_iam_role" "lambda_role" {
  name = "durable-function-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action = "sts:AssumeRole"
      Effect = "Allow"
      Principal = {
        Service = "lambda.amazonaws.com"
      }
    }]
  })
}

# Attach durable execution policy for checkpoint operations
resource "aws_iam_role_policy_attachment" "lambda_durable" {
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicDurableExecutionRolePolicy"
  role       = aws_iam_role.lambda_role.name
}

# Lambda Function with Durable Execution enabled
resource "aws_lambda_function" "durable_function" {
  filename      = "function.zip"
  function_name = "myDurableFunction"
  role          = aws_iam_role.lambda_role.arn
  handler       = "index.handler"
  runtime       = "nodejs22.x"
  timeout       = 30
  memory_size   = 512

  durable_config {
    execution_timeout = 900
    retention_period  = 7
  }
}

# Publish a version
resource "aws_lambda_alias" "prod" {
  name             = "prod"
  function_name    = aws_lambda_function.durable_function.function_name
  function_version = aws_lambda_function.durable_function.version
}

output "function_arn" {
  description = "ARN of the Lambda function"
  value       = aws_lambda_function.durable_function.arn
}

output "alias_arn" {
  description = "ARN of the function alias (use this for invocations)"
  value       = aws_lambda_alias.prod.arn
}
```

**使用 Terraform 进行部署**

```
terraform init
terraform plan
terraform apply
```

**注意**  
Terraform 对 Lambda 持久性函数的支持需要 AWS 提供程序版本 6.25.0 或更高版本。如果使用的是较旧版本，请更新提供程序版本。

## 来自 OCI 容器映像的持久性函数
<a name="durable-iac-oci"></a>

您还可以基于容器映像创建持久性函数。有关如何构建容器映像的说明，请参阅[持久性函数支持的运行时](durable-supported-runtimes.md)。

### AWS CDK
<a name="durable-iac-oci-cdk"></a>

AWS CDK 使您能够使用编程语言定义基础设施。以下示例演示如何使用 TypeScript 从容器映像创建持久性函数。

```
import * as cdk from 'aws-cdk-lib';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as iam from 'aws-cdk-lib/aws-iam';
import { Construct } from 'constructs';

export class DurableFunctionStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // Create the durable function
    const durableFunction = new lambda.DockerImageFunction(this, 'DurableFunction', {
      code: lambda.DockerImageCode.fromImageAsset('./lambda', {
        platform: cdk.aws_ecr_assets.Platform.LINUX_AMD64,
      }),
      functionName: 'myDurableFunction',
      memorySize: 512,
      timeout: cdk.Duration.seconds(30),
      durableConfig: { executionTimeout: cdk.Duration.hours(1), retentionPeriod: cdk.Duration.days(30) },
    });

    // Create version and alias
    const version = durableFunction.currentVersion;
    const alias = new lambda.Alias(this, 'ProdAlias', {
      aliasName: 'prod',
      version: version,
    });

    // Output the alias ARN
    new cdk.CfnOutput(this, 'FunctionAliasArn', {
      value: alias.functionArn,
      description: 'Use this ARN to invoke the durable function',
    });
  }
}
```

**要部署 CDK 堆栈**

```
cdk deploy
```

### AWS Serverless Application Model
<a name="durable-iac-oci-sam"></a>

AWS SAM 简化了无服务器应用程序的 CloudFormation 模板。以下模板使用 AWS SAM 创建了一个持久性函数。

```
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Lambda durable function with SAM

Resources:
  DurableFunction:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: myDurableFunction
      PackageType: Image
      ImageUri: ./src
      DurableConfig:
        ExecutionTimeout: 3600
        RetentionPeriodInDays: 7
      Policies:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicDurableExecutionRolePolicy
      AutoPublishAlias: prod
    Metadata:
      DockerTag: latest
      DockerContext: ./src
      Dockerfile: Dockerfile

Outputs:
  FunctionArn:
    Description: Durable function ARN
    Value: !GetAtt DurableFunction.Arn
  AliasArn:
    Description: Function alias ARN (use this for invocations)
    Value: !Ref DurableFunction.Alias
```

**要部署 SAM 模板**

```
sam build
sam deploy --guided
```

## 常见配置模式
<a name="durable-iac-common-patterns"></a>

无论您使用哪种 IaC 工具，都要遵循持久性函数的以下模式：

**启用持久执行**  
对函数设置 `DurableConfig` 属性以启用持久执行。只有当创建函数时，此属性才可用。您无法对现有函数启用持久执行。

**授予检查点权限**  
将 `AWSLambdaBasicDurableExecutionRolePolicy` 托管式策略附加到执行角色。此策略包括必需的 `lambda:CheckpointDurableExecutions` 和 `lambda:GetDurableExecutionState` 权限。

**使用限定的 ARN**  
为您的函数创建版本或别名。持久性函数需要使用限定的 ARN（带有版本或别名）才能调用。在 AWS SAM 中使用 `AutoPublishAlias` 或者在 CloudFormation、AWS CDK 和 Terraform 中创建显式版本。

**软件包依赖项**  
在您的部署包中包含持久执行 SDK。对于 Node.js，请安装 `@aws/durable-execution-sdk-js`。对于 Python，请安装 `aws-durable-execution-sdk-python`。

## 后续步骤
<a name="durable-iac-next-steps"></a>

部署您的持久性函数后：
+ 使用限定的 ARN（版本或别名）测试您的函数
+ 在 Lambda 控制台的“持久执行”选项卡下监控执行进度
+ 查看 AWS CloudTrail 数据事件中的检查点操作
+ 查看 CloudWatch Logs 以了解函数输出和重放行为

有关使用 IaC 工具部署 Lambda 函数的更多信息，请参阅：
+ [CloudFormation AWS::Lambda::Function 参考](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-function.html)
+ [AWS CDK Lambda 模块文档](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_lambda-readme.html)
+ [AWS SAM 开发人员指南](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/what-is-sam.html)

# 持久性函数或 Step Functions
<a name="durable-step-functions"></a>

Lambda 持久性函数和 AWS Step Functions 都能够实现可靠的工作流编排，并具有自动状态管理和故障恢复功能。它们服务于不同的开发人员偏好和架构模式。持久性函数针对 Lambda 中的应用程序开发进行了优化，而 Step Functions 则专为跨 AWS 服务的工作流编排而构建。

## 何时使用持久性函数
<a name="durable-sfn-when-durable"></a>

在以下情况下使用持久性函数：
+ 您的团队更喜欢使用标准编程语言和熟悉的开发工具
+ 您的应用程序逻辑主要在 Lambda 函数中
+ 您想要精细控制代码中的执行状态
+ 您正在构建以 Lambda 为中心的应用程序，工作流和业务逻辑紧密耦合
+ 您想要快速迭代，而不在代码和可视化/JSON 设计器之间切换

## 何时使用 Step Functions
<a name="durable-sfn-when-step"></a>

在以下情况下使用 Step Functions：
+ 您需要可视化工作流表示来实现跨团队可见性
+ 您正在编排多个 AWS 服务，想要在没有自定义 SDK 代码的情况下进行本机集成
+ 您需要零维护基础设施（无修补、运行时更新）
+ 非技术利益相关者需要了解和验证工作流逻辑

## 决策框架
<a name="durable-sfn-decision-framework"></a>

使用下面的问题确定哪项服务适合您的使用案例：
+ **您的主要关注点是什么？** Lambda 中的应用程序开发 → 持久性函数。跨 AWS 的工作流编排 → Step Functions。
+ **您的首选编程模型是什么？** 标准编程语言 → 持久性函数。基于图形的 DSL 或可视化设计器 → Step Functions。
+ **涉及多少项 AWS 服务？** 主要是 Lambda → 持久性函数。多项 AWS 服务 → Step Functions。
+ **您使用什么开发工具？** Lambda 开发人员体验、带有 LLM 代理的 IDE、编程语言特定的单元测试框架、AWS SAM、AWS CDK、AWS Toolkit → 持久性函数。可视化工作流生成器，AWS CDK 用于建模工作流 → Step Functions。
+ **谁管理基础设施？** 想要在 Lambda 中实现灵活性 → 持久性函数。想要完全托管、零维护 → Step Functions。

## 功能对比
<a name="durable-sfn-comparison"></a>

下表比较了 Step Functions 和 Lambda 持久性函数之间的主要功能：


| 功能 | AWS Step Functions | Lambda 持久性函数 | 
| --- | --- | --- | 
| 主要关注点 | 跨 AWS 的工作流编排 | Lambda 中的应用程序开发 | 
| 服务类型 | 独立的专用工作流服务 | 在 Lambda 内运行 | 
| 编程模型 | 基于图形的 Amazon States Language DSL 或 AWS CDK | 标准编程语言（JavaScript/TypeScript、Python） | 
| 开发工具 | 控制台/AWS Toolkit IDE 扩展中的可视化生成器，AWS CDK | IDE 和 LLM 代理中的 Lambda DX、单元测试框架、AWS SAM、AWS Toolkit IDE 扩展 | 
| 集成 | 220 多项 AWS 服务，16000 个 API | Lambda 事件驱动编程模型扩展（事件源） | 
| 管理 | 完全托管、运行时不可知、零维护（无修补、运行时更新） | 在 Lambda 环境中进行管理 | 
| 适用于 | 业务流程和 IT 自动化、数据处理、AI 工作流 | 分布式事务、有状态应用程序逻辑、函数编排、数据处理、AI 工作流 | 

## 混合架构
<a name="durable-sfn-hybrid"></a>

许多应用程序都能从同时使用这两项服务中受益。一种常见模式是在 Lambda 中对应用程序级逻辑使用持久性函数，而 Step Functions 在 Lambda 函数之外的多个 AWS 服务之间协调高级工作流。

## 迁移注意事项
<a name="durable-sfn-migration"></a>

**从简单入手，日益复杂：**对于以 Lambda 为中心的工作流从持久性函数开始。当您需要多服务编排或可视化工作流设计时，请添加 Step Functions。

**现有 Step Functions 用户：**对已建立的跨服务工作流保留 Step Functions。对于需要可靠性的新 Lambda 应用程序逻辑，请考虑使用持久性函数。

## 相关资源
<a name="durable-sfn-related"></a>
+ [Lambda 持久性函数](durable-functions.md)
+ [使用 Step Functions 编排 Lambda 函数](with-step-functions.md)
+ [开始使用持久性函数](durable-getting-started.md)

# 示例和使用案例
<a name="durable-examples"></a>

Lambda 持久性函数使您能够使用诸如步骤和等待之类的持久操作来构建容错的多步骤应用程序。借助自动检查点机制以及检查点重放模型（在此模型中，如果出现故障，程序会从头开始重新执行，但会跳过已完成的检查点），这样您的函数就能从故障中恢复并恢复执行，而不会丢失之前的进度。

## 短暂的容错进程
<a name="durable-examples-short-lived"></a>

使用持久性函数来构建通常在几分钟内完成的可靠操作。虽然这些流程的执行时间比长时间运行的工作流程短，但它们仍能从分布式系统中的自动检查点机制和容错能力中获益。持久性函数可帮助确保多步骤进程即便在个别服务调用失败的情况下也能顺利完成，而无需复杂的错误处理或状态管理代码。

常见场景包括酒店预订系统、餐厅预订平台、拼车出行申请、活动门票购买和 SaaS 订阅升级。这些场景具有共同特征：多个服务调用必须一起完成，需要在瞬态故障时自动重试，以及需要在分布式系统之间保持一致的状态。

### 跨微服务的分布式事务
<a name="durable-examples-distributed-transactions"></a>

在多个服务之间协调支付、库存和发货事宜，并在出现故障时自动进行回滚操作。每个服务操作都被封装在一个步骤中，这样即使服务出现故障，整个事务也能从任何环节进行恢复。

------
#### [ TypeScript ]

```
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";

export const handler = withDurableExecution(
  async (event: any, context: DurableContext) => {
    const { orderId, amount, items } = event;
    
    // Reserve inventory across multiple warehouses
    const inventory = await context.step("reserve-inventory", async () => {
      return await inventoryService.reserve(items);
    });
    
    // Process payment
    const payment = await context.step("process-payment", async () => {
      return await paymentService.charge(amount);
    });
    
    // Create shipment
    const shipment = await context.step("create-shipment", async () => {
      return await shippingService.createShipment(orderId, inventory);
    });
    
    return { orderId, status: 'completed', shipment };
  }
);
```

------
#### [ Python ]

```
from aws_durable_execution_sdk_python import DurableContext, durable_execution

@durable_execution
def lambda_handler(event, context: DurableContext):
    order_id = event['orderId']
    amount = event['amount']
    items = event['items']
    
    # Reserve inventory across multiple warehouses
    inventory = context.step(
        lambda _: inventory_service.reserve(items),
        name='reserve-inventory'
    )
    
    # Process payment
    payment = context.step(
        lambda _: payment_service.charge(amount),
        name='process-payment'
    )
    
    # Create shipment
    shipment = context.step(
        lambda _: shipping_service.create_shipment(order_id, inventory),
        name='create-shipment'
    )
    
    return {'orderId': order_id, 'status': 'completed', 'shipment': shipment}
```

------

如果任何步骤失败，该函数将自动从上次成功的检查点开始重试。即使支付处理暂时出现故障，库存预留仍会继续有效。当该函数重试时，它会跳过已完成的库存步骤，直接进入付款处理。这能够消除重复的预留，并确保整个分布式系统中的状态保持一致。

### 多步骤订单处理
<a name="durable-examples-order-processing"></a>

通过验证、付款授权、库存分配和履行等环节来处理订单，并具备自动重试和恢复功能。每一个步骤都会进行检查点操作，这样即便个别步骤失败并重试，订单也能继续推进。

------
#### [ TypeScript ]

```
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";

export const handler = withDurableExecution(
  async (event: any, context: DurableContext) => {
    const { orderId, customerId, items } = event;
    
    // Validate order details
    const validation = await context.step("validate-order", async () => {
      const customer = await customerService.validate(customerId);
      const itemsValid = await inventoryService.validateItems(items);
      return { customer, itemsValid };
    });
    
    if (!validation.itemsValid) {
      return { orderId, status: 'rejected', reason: 'invalid_items' };
    }
    
    // Authorize payment
    const authorization = await context.step("authorize-payment", async () => {
      return await paymentService.authorize(
        validation.customer.paymentMethod,
        calculateTotal(items)
      );
    });
    
    // Allocate inventory
    const allocation = await context.step("allocate-inventory", async () => {
      return await inventoryService.allocate(items);
    });
    
    // Fulfill order
    const fulfillment = await context.step("fulfill-order", async () => {
      return await fulfillmentService.createShipment({
        orderId,
        items: allocation.allocatedItems,
        address: validation.customer.shippingAddress
      });
    });
    
    return {
      orderId,
      status: 'completed',
      trackingNumber: fulfillment.trackingNumber
    };
  }
);
```

------
#### [ Python ]

```
from aws_durable_execution_sdk_python import DurableContext, durable_execution

@durable_execution
def lambda_handler(event, context: DurableContext):
    order_id = event['orderId']
    customer_id = event['customerId']
    items = event['items']
    
    # Validate order details
    def validate_order(_):
        customer = customer_service.validate(customer_id)
        items_valid = inventory_service.validate_items(items)
        return {'customer': customer, 'itemsValid': items_valid}
    
    validation = context.step(validate_order, name='validate-order')
    
    if not validation['itemsValid']:
        return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_items'}
    
    # Authorize payment
    authorization = context.step(
        lambda _: payment_service.authorize(
            validation['customer']['paymentMethod'],
            calculate_total(items)
        ),
        name='authorize-payment'
    )
    
    # Allocate inventory
    allocation = context.step(
        lambda _: inventory_service.allocate(items),
        name='allocate-inventory'
    )
    
    # Fulfill order
    fulfillment = context.step(
        lambda _: fulfillment_service.create_shipment({
            'orderId': order_id,
            'items': allocation['allocatedItems'],
            'address': validation['customer']['shippingAddress']
        }),
        name='fulfill-order'
    )
    
    return {
        'orderId': order_id,
        'status': 'completed',
        'trackingNumber': fulfillment['trackingNumber']
    }
```

------

这种模式可确保订单永远不会停留在中间状态。如果验证失败，订单将在付款授权之前被拒绝。如果付款授权失败，则不会分配库存。每个步骤建立在前一个步骤的基础之上，且具有自动重试和恢复功能。

**注意**  
条件检查 `if (!validation.itemsValid)` 在步骤之外，将在重放期间重新执行。这是安全的，因为它是确定性的，只要输入相同的验证对象，它就会始终产生相同的结果。

## 长期运行的进程
<a name="durable-examples-long-running"></a>

对于持续数小时、数天或数周的流程，使用持久性函数。等待操作会暂停执行但不会产生计算费用，这使得长时间运行的进程具有成本效益。在等待期间，您的函数会停止运行，Lambda 会回收执行环境。当需要恢复时，Lambda 会再次调用您的函数，并从上一次的检查点处重放。

这种执行模型使持久性函数非常适合需要长时间暂停的进程，无论是等待人工决策、外部系统响应、计划的处理窗口还是基于时间的延迟。您只需为实际使用的计算时间付费，而无需为等待时间付费。

常见场景包括文件批准流程、计划的批处理、多日入驻流程、订阅试用流程以及延迟的通知系统。这些场景具有共同的特征：以小时或天为单位计算的延长等待时间，需要在这些等待期间保持执行状态，以及成本敏感型要求，即无法为空闲计算时间付费。

### 人工干预批准
<a name="durable-examples-human-in-loop"></a>

在保持执行状态的同时，暂停执行文档审核、批准或决策。该函数会等待外部回调请求，且不会消耗资源，一旦收到批准便会自动恢复运行。

这种模式对于需要人工判断或外部验证的流程至关重要。该函数在回调点处暂停执行，在等待期间不会产生计算费用。当有人通过 API 提交其决策时，Lambda 会再次调用您的函数，并从上次保存的检查点重放，以继续处理批准结果。

------
#### [ TypeScript ]

```
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";

export const handler = withDurableExecution(
  async (event: any, context: DurableContext) => {
    const { documentId, reviewers } = event;
    
    // Step 1: Prepare document for review
    const prepared = await context.step("prepare-document", async () => {
      return await documentService.prepare(documentId);
    });
    
    // Step 2: Request approval with callback
    const approval = await context.waitForCallback(
      "approval-callback",
      async (callbackId) => {
        await notificationService.sendApprovalRequest({
          documentId,
          reviewers,
          callbackId,
          expiresIn: 86400
        });
      },
      {
        timeout: { seconds: 86400 }
      }
    );
    
    // Function resumes here when approval is received
    if (approval?.approved) {
      const finalized = await context.step("finalize-document", async () => {
        return await documentService.finalize(documentId, approval.comments);
      });
      
      return {
        status: 'approved',
        documentId,
        finalizedAt: finalized.timestamp
      };
    }
    
    // Handle rejection
    await context.step("archive-rejected", async () => {
      await documentService.archive(documentId, approval?.reason);
    });
    
    return {
      status: 'rejected',
      documentId,
      reason: approval?.reason
    };
  }
);
```

------
#### [ Python ]

```
from aws_durable_execution_sdk_python import DurableContext, durable_execution, WaitConfig

@durable_execution
def lambda_handler(event, context: DurableContext):
    document_id = event['documentId']
    reviewers = event['reviewers']
    
    # Step 1: Prepare document for review
    prepared = context.step(
        lambda _: document_service.prepare(document_id),
        name='prepare-document'
    )
    
    # Step 2: Request approval with callback
    def send_approval_request(callback_id):
        notification_service.send_approval_request({
            'documentId': document_id,
            'reviewers': reviewers,
            'callbackId': callback_id,
            'expiresIn': 86400
        })
    
    approval = context.wait_for_callback(
        send_approval_request,
        name='approval-callback',
        config=WaitConfig(timeout=86400)
    )
    
    # Function resumes here when approval is received
    if approval and approval.get('approved'):
        finalized = context.step(
            lambda _: document_service.finalize(document_id, approval.get('comments')),
            name='finalize-document'
        )
        
        return {
            'status': 'approved',
            'documentId': document_id,
            'finalizedAt': finalized['timestamp']
        }
    
    # Handle rejection
    context.step(
        lambda _: document_service.archive(document_id, approval.get('reason') if approval else None),
        name='archive-rejected'
    )
    
    return {
        'status': 'rejected',
        'documentId': document_id,
        'reason': approval.get('reason') if approval else None
    }
```

------

当收到回调且您的函数恢复时，它会从头开始重放。准备文档的步骤会立即返回其检查点结果。waitForCallback 操作还会立即返回存储的批准结果，而不会再次等待。然后，执行过程会继续进行到最后的完善或归档步骤。

### 多阶段数据管线
<a name="durable-examples-data-pipelines"></a>

通过提取、转换和加载等阶段对大型数据集进行处理，并在各阶段之间设置检查点。每个阶段可能需要数小时才能完成，检查点使管线能够在中断时从任何阶段恢复。

这种模式非常适合 ETL 工作流程、数据迁移或批处理作业，在这些作业中，您需要分阶段处理数据，并在各阶段之间设置恢复点。如果某个阶段失败，管线将从最后一个已完成的阶段恢复，而不是从头开始重新启动。您还可以使用等待操作在各阶段之间进行暂停；例如，在遵守速率限制的前提下等待下游系统准备好，或者在非高峰时段调度处理工作。

------
#### [ TypeScript ]

```
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";

export const handler = withDurableExecution(
  async (event: any, context: DurableContext) => {
    const { datasetId, batchSize } = event;
    
    // Stage 1: Extract data from source
    const extracted = await context.step("extract-data", async () => {
      const records = await sourceDatabase.extractRecords(datasetId);
      return { recordCount: records.length, records };
    });
    
    // Wait 5 minutes to respect source system rate limits
    await context.wait({ seconds: 300 });
    
    // Stage 2: Transform data in batches
    const transformed = await context.step("transform-data", async () => {
      const batches = chunkArray(extracted.records, batchSize);
      const results = [];
      
      for (const batch of batches) {
        const transformed = await transformService.processBatch(batch);
        results.push(transformed);
      }
      
      return { batchCount: batches.length, results };
    });
    
    // Wait until off-peak hours (e.g., 2 AM)
    const now = new Date();
    const targetHour = 2;
    const msUntilTarget = calculateMsUntilHour(now, targetHour);
    await context.wait({ seconds: Math.floor(msUntilTarget / 1000) });
    
    // Stage 3: Load data to destination
    const loaded = await context.step("load-data", async () => {
      let loadedCount = 0;
      
      for (const result of transformed.results) {
        await destinationDatabase.loadBatch(result);
        loadedCount += result.length;
      }
      
      return { loadedCount };
    });
    
    // Stage 4: Verify and finalize
    const verified = await context.step("verify-pipeline", async () => {
      const verification = await destinationDatabase.verifyRecords(datasetId);
      await pipelineService.markComplete(datasetId, verification);
      return verification;
    });
    
    return {
      datasetId,
      recordsProcessed: extracted.recordCount,
      batchesProcessed: transformed.batchCount,
      recordsLoaded: loaded.loadedCount,
      verified: verified.success
    };
  }
);
```

------
#### [ Python ]

```
from aws_durable_execution_sdk_python import DurableContext, durable_execution
from datetime import datetime

@durable_execution
def lambda_handler(event, context: DurableContext):
    dataset_id = event['datasetId']
    batch_size = event['batchSize']
    
    # Stage 1: Extract data from source
    def extract_data(_):
        records = source_database.extract_records(dataset_id)
        return {'recordCount': len(records), 'records': records}
    
    extracted = context.step(extract_data, name='extract-data')
    
    # Wait 5 minutes to respect source system rate limits
    context.wait(300)
    
    # Stage 2: Transform data in batches
    def transform_data(_):
        batches = chunk_array(extracted['records'], batch_size)
        results = []
        
        for batch in batches:
            transformed = transform_service.process_batch(batch)
            results.append(transformed)
        
        return {'batchCount': len(batches), 'results': results}
    
    transformed = context.step(transform_data, name='transform-data')
    
    # Wait until off-peak hours (e.g., 2 AM)
    now = datetime.now()
    target_hour = 2
    ms_until_target = calculate_ms_until_hour(now, target_hour)
    context.wait(ms_until_target // 1000)
    
    # Stage 3: Load data to destination
    def load_data(_):
        loaded_count = 0
        
        for result in transformed['results']:
            destination_database.load_batch(result)
            loaded_count += len(result)
        
        return {'loadedCount': loaded_count}
    
    loaded = context.step(load_data, name='load-data')
    
    # Stage 4: Verify and finalize
    def verify_pipeline(_):
        verification = destination_database.verify_records(dataset_id)
        pipeline_service.mark_complete(dataset_id, verification)
        return verification
    
    verified = context.step(verify_pipeline, name='verify-pipeline')
    
    return {
        'datasetId': dataset_id,
        'recordsProcessed': extracted['recordCount'],
        'batchesProcessed': transformed['batchCount'],
        'recordsLoaded': loaded['loadedCount'],
        'verified': verified['success']
    }
```

------

每个阶段都被封装在一个步骤中，从而形成一个检查点，这样，如果流程被打断，管线仍可以从任何阶段恢复。提取和转换之间的 5 分钟等待时间既遵守了源系统的速率限制，又不会消耗计算资源，而等待到凌晨 2 点则将昂贵的加载操作安排在了非高峰时段。

**注意**  
`new Date()` 调用和 `calculateMsUntilHour()` 函数在步骤之外，将在重放期间重新执行。对于必须在重放间保持一致的时间相关操作，请在步骤内部计算时间戳，或者仅将其用于等待持续时间（等待持续时间会被检查点处理）。

## 高级模式
<a name="durable-examples-advanced"></a>

使用持久性函数构建复杂的多步骤应用程序，这些应用程序能够整合多个持久操作、并行执行、数组处理、条件逻辑和轮询。这些模式使您能够构建复杂的应用程序，而这些应用程序能够协调多项任务，并同时具备容错能力和自动恢复功能。

高级模式并非只是简单的按顺序进行的步骤。您可以与 `parallel()` 并行运行操作，使用 `map()` 处理数组，通过 `waitForCondition()` 等待外部条件，然后将这些基元组合起来以构建可靠的应用程序。每个持久操作都会创建自己的检查点，因此如果应用程序出现中断情况，仍能从任何一点恢复操作。

### 用户入驻流程
<a name="durable-examples-user-onboarding"></a>

引导用户完成注册、电子邮件验证、个人资料设置以及初始配置流程，并提供重试处理功能。此示例结合了顺序步骤、回调和条件逻辑，从而构建了一个完整的入驻流程。

------
#### [ TypeScript ]

```
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";

export const handler = withDurableExecution(
  async (event: any, context: DurableContext) => {
    const { userId, email } = event;
    
    // Step 1: Create user account
    const user = await context.step("create-account", async () => {
      return await userService.createAccount(userId, email);
    });
    
    // Step 2: Send verification email
    await context.step("send-verification", async () => {
      return await emailService.sendVerification(email);
    });
    
    // Step 3: Wait for email verification (up to 48 hours)
    const verified = await context.waitForCallback(
      "email-verification",
      async (callbackId) => {
        await notificationService.sendVerificationLink({
          email,
          callbackId,
          expiresIn: 172800
        });
      },
      {
        timeout: { seconds: 172800 }
      }
    );
    
    if (!verified) {
      await context.step("send-reminder", async () => {
        await emailService.sendReminder(email);
      });
      
      return {
        status: "verification_timeout",
        userId,
        message: "Email verification not completed within 48 hours"
      };
    }
    
    // Step 4: Initialize user profile in parallel
    const setupResults = await context.parallel("profile-setup", [
      async (ctx: DurableContext) => {
        return await ctx.step("create-preferences", async () => {
          return await preferencesService.createDefaults(userId);
        });
      },
      
      async (ctx: DurableContext) => {
        return await ctx.step("setup-notifications", async () => {
          return await notificationService.setupDefaults(userId);
        });
      },
      
      async (ctx: DurableContext) => {
        return await ctx.step("create-welcome-content", async () => {
          return await contentService.createWelcome(userId);
        });
      }
    ]);
    
    // Step 5: Send welcome email
    await context.step("send-welcome", async () => {
      const [preferences, notifications, content] = setupResults.getResults();
      return await emailService.sendWelcome({
        email,
        preferences,
        notifications,
        content
      });
    });
    
    return {
      status: "onboarding_complete",
      userId,
      completedAt: new Date().toISOString()
    };
  }
);
```

------
#### [ Python ]

```
from aws_durable_execution_sdk_python import DurableContext, durable_execution, WaitConfig
from datetime import datetime

@durable_execution
def lambda_handler(event, context: DurableContext):
    user_id = event['userId']
    email = event['email']
    
    # Step 1: Create user account
    user = context.step(
        lambda _: user_service.create_account(user_id, email),
        name='create-account'
    )
    
    # Step 2: Send verification email
    context.step(
        lambda _: email_service.send_verification(email),
        name='send-verification'
    )
    
    # Step 3: Wait for email verification (up to 48 hours)
    def send_verification_link(callback_id):
        notification_service.send_verification_link({
            'email': email,
            'callbackId': callback_id,
            'expiresIn': 172800
        })
    
    verified = context.wait_for_callback(
        send_verification_link,
        name='email-verification',
        config=WaitConfig(timeout=172800)
    )
    
    if not verified:
        context.step(
            lambda _: email_service.send_reminder(email),
            name='send-reminder'
        )
        
        return {
            'status': 'verification_timeout',
            'userId': user_id,
            'message': 'Email verification not completed within 48 hours'
        }
    
    # Step 4: Initialize user profile in parallel
    def create_preferences(ctx: DurableContext):
        return ctx.step(
            lambda _: preferences_service.create_defaults(user_id),
            name='create-preferences'
        )
    
    def setup_notifications(ctx: DurableContext):
        return ctx.step(
            lambda _: notification_service.setup_defaults(user_id),
            name='setup-notifications'
        )
    
    def create_welcome_content(ctx: DurableContext):
        return ctx.step(
            lambda _: content_service.create_welcome(user_id),
            name='create-welcome-content'
        )
    
    setup_results = context.parallel(
        [create_preferences, setup_notifications, create_welcome_content],
        name='profile-setup'
    )
    
    # Step 5: Send welcome email
    def send_welcome(_):
        results = setup_results.get_results()
        preferences, notifications, content = results[0], results[1], results[2]
        return email_service.send_welcome({
            'email': email,
            'preferences': preferences,
            'notifications': notifications,
            'content': content
        })
    
    context.step(send_welcome, name='send-welcome')
    
    return {
        'status': 'onboarding_complete',
        'userId': user_id,
        'completedAt': datetime.now().isoformat()
    }
```

------

该流程将顺序步骤与创建账户和发送电子邮件的检查点相结合，然后暂停运行最长 48 小时，以等待电子邮件验证，且在此期间不会消耗资源。条件逻辑根据验证是否完成或超时的情况来处理不同的路径。配置文件设置任务通过并行操作同时运行，以缩短总执行时间，并且每个步骤都会在出现暂时性故障时自动重试，以帮助确保载入过程能够可靠完成。

### 跨函数的链式调用
<a name="durable-examples-chained-invocations"></a>

使用 `context.invoke()` 从持久性函数内调用其他 Lambda 函数。调用函数在等待被调用函数完成期间会暂停，从而形成一个检查点，以保存执行结果。如果调用函数在被调用函数执行完毕后中断，它会以存储的结果继续执行，而无需重新调用函数。

当您拥有处理特定领域（客户验证、支付处理、库存管理）的专用函数，并且需要在工作流中协调它们时，请使用此模式。每个函数都拥有独立的逻辑，并且可以被多个编排工具函数调用，从而避免了代码的重复。

------
#### [ TypeScript ]

```
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";

// Main orchestrator function
export const handler = withDurableExecution(
  async (event: any, context: DurableContext) => {
    const { orderId, customerId } = event;
    
    // Step 1: Validate customer by invoking customer service function
    const customer = await context.invoke(
      "validate-customer",
      "arn:aws:lambda:us-east-1:123456789012:function:customer-service:1",
      { customerId }
    );
    
    if (!customer.isValid) {
      return { orderId, status: "rejected", reason: "invalid_customer" };
    }
    
    // Step 2: Check inventory by invoking inventory service function
    const inventory = await context.invoke(
      "check-inventory",
      "arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1",
      { orderId, items: event.items }
    );
    
    if (!inventory.available) {
      return { orderId, status: "rejected", reason: "insufficient_inventory" };
    }
    
    // Step 3: Process payment by invoking payment service function
    const payment = await context.invoke(
      "process-payment",
      "arn:aws:lambda:us-east-1:123456789012:function:payment-service:1",
      {
        customerId,
        amount: inventory.totalAmount,
        paymentMethod: customer.paymentMethod
      }
    );
    
    // Step 4: Create shipment by invoking fulfillment service function
    const shipment = await context.invoke(
      "create-shipment",
      "arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1",
      {
        orderId,
        items: inventory.allocatedItems,
        address: customer.shippingAddress
      }
    );
    
    return {
      orderId,
      status: "completed",
      trackingNumber: shipment.trackingNumber,
      estimatedDelivery: shipment.estimatedDelivery
    };
  }
);
```

------
#### [ Python ]

```
from aws_durable_execution_sdk_python import DurableContext, durable_execution

# Main orchestrator function
@durable_execution
def lambda_handler(event, context: DurableContext):
    order_id = event['orderId']
    customer_id = event['customerId']
    
    # Step 1: Validate customer by invoking customer service function
    customer = context.invoke(
        'arn:aws:lambda:us-east-1:123456789012:function:customer-service:1',
        {'customerId': customer_id},
        name='validate-customer'
    )
    
    if not customer['isValid']:
        return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_customer'}
    
    # Step 2: Check inventory by invoking inventory service function
    inventory = context.invoke(
        'arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1',
        {'orderId': order_id, 'items': event['items']},
        name='check-inventory'
    )
    
    if not inventory['available']:
        return {'orderId': order_id, 'status': 'rejected', 'reason': 'insufficient_inventory'}
    
    # Step 3: Process payment by invoking payment service function
    payment = context.invoke(
        'arn:aws:lambda:us-east-1:123456789012:function:payment-service:1',
        {
            'customerId': customer_id,
            'amount': inventory['totalAmount'],
            'paymentMethod': customer['paymentMethod']
        },
        name='process-payment'
    )
    
    # Step 4: Create shipment by invoking fulfillment service function
    shipment = context.invoke(
        'arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1',
        {
            'orderId': order_id,
            'items': inventory['allocatedItems'],
            'address': customer['shippingAddress']
        },
        name='create-shipment'
    )
    
    return {
        'orderId': order_id,
        'status': 'completed',
        'trackingNumber': shipment['trackingNumber'],
        'estimatedDelivery': shipment['estimatedDelivery']
    }
```

------

每次调用都会在编排工具函数中创建一个检查点。如果编排工具在客户验证完成后中断，它将从该检查点恢复，使用存储的客户数据，并跳过验证调用过程。这能够避免对下游服务的重复调用，并确保在中断情况下也能保持执行的一致性。

被调用函数可以是持久性 Lambda 函数，也可以是标准的 Lambda 函数。如果您调用一个持久性函数，它可以有自己的多步骤工作流程，其中包括等待和检查点。编排工具只需等待整个持久执行过程完成，然后接收最终结果即可。

**注意**  
不支持跨账户调用。所有被调用函数必须与调用函数在同一 AWS 账户中。

### 使用检查点进行批处理
<a name="durable-examples-batch-processing"></a>

在出现故障后，通过自动恢复上一次成功的检查点的方式处理数百万条记录。此示例演示了持久性函数如何将 `map()` 操作与分块和速率限制相结合，以进行大规模数据处理。

------
#### [ TypeScript ]

```
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";

interface Batch {
  batchIndex: number;
  recordIds: string[];
}

export const handler = withDurableExecution(
  async (event: any, context: DurableContext) => {
    const { datasetId, batchSize = 1000 } = event;
    
    // Step 1: Get all record IDs to process
    const recordIds = await context.step("fetch-record-ids", async () => {
      return await dataService.getRecordIds(datasetId);
    });
    
    // Step 2: Split into batches
    const batches: Batch[] = [];
    for (let i = 0; i < recordIds.length; i += batchSize) {
      batches.push({
        batchIndex: Math.floor(i / batchSize),
        recordIds: recordIds.slice(i, i + batchSize)
      });
    }
    
    // Step 3: Process batches with controlled concurrency
    const batchResults = await context.map(
      "process-batches",
      batches,
      async (ctx: DurableContext, batch: Batch, index: number) => {
        const processed = await ctx.step(`batch-${batch.batchIndex}`, async () => {
          const results = [];
          for (const recordId of batch.recordIds) {
            const result = await recordService.process(recordId);
            results.push(result);
          }
          return results;
        });
        
        const validated = await ctx.step(`validate-${batch.batchIndex}`, async () => {
          return await validationService.validateBatch(processed);
        });
        
        return {
          batchIndex: batch.batchIndex,
          recordCount: batch.recordIds.length,
          successCount: validated.successCount,
          failureCount: validated.failureCount
        };
      },
      {
        maxConcurrency: 5
      }
    );
    
    // Step 4: Aggregate results
    const summary = await context.step("aggregate-results", async () => {
      const results = batchResults.getResults();
      const totalSuccess = results.reduce((sum, r) => sum + r.successCount, 0);
      const totalFailure = results.reduce((sum, r) => sum + r.failureCount, 0);
      
      return {
        datasetId,
        totalRecords: recordIds.length,
        batchesProcessed: batches.length,
        successCount: totalSuccess,
        failureCount: totalFailure,
        completedAt: new Date().toISOString()
      };
    });
    
    return summary;
  }
);
```

------
#### [ Python ]

```
from aws_durable_execution_sdk_python import DurableContext, durable_execution, MapConfig
from datetime import datetime
from typing import List, Dict

@durable_execution
def lambda_handler(event, context: DurableContext):
    dataset_id = event['datasetId']
    batch_size = event.get('batchSize', 1000)
    
    # Step 1: Get all record IDs to process
    record_ids = context.step(
        lambda _: data_service.get_record_ids(dataset_id),
        name='fetch-record-ids'
    )
    
    # Step 2: Split into batches
    batches = []
    for i in range(0, len(record_ids), batch_size):
        batches.append({
            'batchIndex': i // batch_size,
            'recordIds': record_ids[i:i + batch_size]
        })
    
    # Step 3: Process batches with controlled concurrency
    def process_batch(ctx: DurableContext, batch: Dict, index: int):
        batch_index = batch['batchIndex']
        
        def process_records(_):
            results = []
            for record_id in batch['recordIds']:
                result = record_service.process(record_id)
                results.append(result)
            return results
        
        processed = ctx.step(process_records, name=f'batch-{batch_index}')
        
        validated = ctx.step(
            lambda _: validation_service.validate_batch(processed),
            name=f'validate-{batch_index}'
        )
        
        return {
            'batchIndex': batch_index,
            'recordCount': len(batch['recordIds']),
            'successCount': validated['successCount'],
            'failureCount': validated['failureCount']
        }
    
    batch_results = context.map(
        process_batch,
        batches,
        name='process-batches',
        config=MapConfig(max_concurrency=5)
    )
    
    # Step 4: Aggregate results
    def aggregate_results(_):
        results = batch_results.get_results()
        total_success = sum(r['successCount'] for r in results)
        total_failure = sum(r['failureCount'] for r in results)
        
        return {
            'datasetId': dataset_id,
            'totalRecords': len(record_ids),
            'batchesProcessed': len(batches),
            'successCount': total_success,
            'failureCount': total_failure,
            'completedAt': datetime.now().isoformat()
        }
    
    summary = context.step(aggregate_results, name='aggregate-results')
    
    return summary
```

------

将记录按可管理的批次进行拆分，以避免占用过多内存或影响下游服务，随后多个批次并行处理，由 `maxConcurrency` 控制并行度。每个批次都有其独立的检查点，因此出现故障时只会重新处理失败的批次，而不会重新处理所有记录。这种模式非常适合用于 ETL 作业、数据迁移或批量操作，因为在这些操作中，处理过程可能需要数小时。

## 后续步骤
<a name="durable-examples-next-steps"></a>
+ 探索[基本概念](durable-basic-concepts.md)以了解 DurableContext、步骤和等待
+ 查看编写确定性代码和优化性能的[最佳实践](durable-best-practices.md)
+ 了解如何在本地和云端[测试持久性函数](durable-testing.md)
+ 比较持久性函数与 Step Functions，了解每种方法何时最有效。请参阅[持久性函数或 Step Functions](durable-step-functions.md)。

# Lambda 持久性函数的安全性和权限
<a name="durable-security"></a>

Lambda 持久性函数需要特定的 IAM 权限来管理检查点操作。遵循‌最低权限原则‌，仅授予您的函数所需的权限。

## 执行角色权限
<a name="durable-execution-role"></a>

您的持久性函数的执行角色需要具备创建检查点和检索执行状态的权限。以下策略显示了所需的最低权限：

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "lambda:CheckpointDurableExecution",
                "lambda:GetDurableExecutionState"
            ],
            "Resource": "arn:aws:lambda:region:account-id:function:function-name:*"
        }
    ]
}
```

使用控制台创建持久性函数时，Lambda 会自动将这些权限添加到执行角色中。如果您使用 AWS CLI 或 AWS CloudFormation 创建函数，请将这些权限添加到您的执行角色中。

**最低权限原则**  
将 `Resource` 元素范围限定为特定的函数 ARN，而不是使用通配符。这使得执行角色仅限于对那些需要进行检查点操作的函数执行此类操作。

**示例：多个函数的限定范围权限**

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "lambda:CheckpointDurableExecution",
                "lambda:GetDurableExecutionState"
            ],
            "Resource": [
                "arn:aws:lambda:us-east-1:123456789012:function:orderProcessor:*",
                "arn:aws:lambda:us-east-1:123456789012:function:paymentHandler:*"
            ]
        }
    ]
}
```

或者，您可以使用 AWS 托管策略 `AWSLambdaBasicDurableExecutionRolePolicy`，其中包括所需的持久执行权限以及 Amazon CloudWatch Logs 的基本 Lambda 执行权限。

## 状态加密
<a name="durable-state-encryption"></a>

Lambda 持久性函数自动使用 AWS 拥有的密钥启用静态加密，且不收取任何费用。每个函数执行都会保持独立的状态，其他执行无法访问该状态。客户自主管理型密钥（CMK）不受支持。

检查点数据包括：
+ 步骤结果和返回值
+ 执行进度和时间表
+ 等待状态信息

当 Lambda 读取或写入检查点数据时，所有数据在传输过程中都会使用 TLS 进行加密。

### 使用自定义序列化器和反序列化器进行自定义加密
<a name="durable-custom-encryption"></a>

对于关键的安全要求，您可以使用持久的 SDK 通过自定义序列化器和反序列化器（SerDer）来实施自己的加密和解密机制。这种方法能让您完全控制用于保护检查点数据的加密密钥和算法。

**重要**  
当您使用自定义加密时，您将无法在 Lambda 控制台和 API 响应中查看操作结果。检查点数据在执行历史记录中是以加密形式存在的，若不进行解密则无法进行查看。

您的函数的执行角色对自定义 SerDer 实施中所使用的 AWS KMS 密钥，需要 `kms:Encrypt` 和 `kms:Decrypt` 权限。

## CloudTrail 日志
<a name="durable-cloudtrail-logging"></a>

Lambda 将检查点操作记录为 AWS CloudTrail 中的数据事件。您可以使用 CloudTrail 来审计检查点创建的时间、跟踪执行状态的变化以及监控对持久执行数据的访问情况。

检查点操作显示在 CloudTrail 日志中，其事件名称如下：
+ `CheckpointDurableExecution`：在步骤完成并创建检查点时记录
+ `GetDurableExecutionState`：当 Lambda 在重放过程中检索执行状态时记录

要为持久性函数启用数据事件日志记录，请配置 CloudTrail 跟踪以记录 Lambda 数据事件。有关更多信息，请参阅《CloudTrail 用户指南》中的[记录数据事件](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/logging-data-events-with-cloudtrail.html)。

**示例：用于检查点操作的 CloudTrail 日志条目**

```
{
    "eventVersion": "1.08",
    "eventTime": "2024-11-16T10:30:45Z",
    "eventName": "CheckpointDurableExecution",
    "eventSource": "lambda.amazonaws.com",
    "requestParameters": {
        "functionName": "myDurableFunction",
        "executionId": "exec-abc123",
        "stepId": "step-1"
    },
    "responseElements": null,
    "eventType": "AwsApiCall"
}
```

## 跨账户注意事项
<a name="durable-cross-account-access"></a>

如果您跨 AWS 账户调用持久性函数，则调用账户需要 `lambda:InvokeFunction` 权限，但检查点操作始终会使用函数账户中的执行角色。调用账户无法直接访问检查点数据或执行状态。

这种隔离可确保检查点数据在函数的账户内始终处于安全状态，即便是在从外部账户调用时也是如此。

## 继承的 Lambda 安全功能
<a name="durable-inherited-security"></a>

持久性函数继承了 Lambda 的所有安全、治理和合规特性，包括 VPC 连接、环境变量加密、死信队列、预留并发、函数 URL、代码签名以及合规认证（如 SOC、PCI DSS、HIPAA 等）。

有关 Lambda 安全功能的详细信息，请参阅《Lambda 开发人员指南》中的 [AWS Lambda 中的安全性](https://docs.aws.amazon.com/lambda/latest/dg/lambda-security.html)。对于持久性函数而言，唯一的额外安全考虑因素就是本指南中所提及的检查点权限。

# 持久执行 SDK
<a name="durable-execution-sdk"></a>

持久执行 SDK 是构建持久性函数的基础。它提供了为进度创建检查点、处理重试和管理执行流所需的基元。该 SDK 简化了检查点管理与回放的复杂性，使您能够编写具有自动容错功能的顺序代码。

该 SDK 适用于 JavaScript、TypeScript、Python 和 Java（预览版）。有关完整的 API 文档和示例，请参阅 GitHub 上的 [JavaScript/TypeScript SDK](https://github.com/aws/aws-durable-execution-sdk-js)、[Python SDK](https://github.com/aws/aws-durable-execution-sdk-python) 和 [Java SDK](https://github.com/aws/aws-durable-execution-sdk-java)。

## DurableContext
<a name="durable-sdk-context"></a>

该 SDK 为您的函数提供了一个可公开所有持久操作的 `DurableContext` 对象。此上下文取代了标准的 Lambda 上下文，并提供了创建检查点、管理执行流以及与外部系统进行协调的方法。

要使用该 SDK，请使用持久执行封装器封装您的 Lambda 处理程序：

------
#### [ TypeScript ]

```
import { withDurableExecution, DurableContext } from '@aws/durable-execution-sdk-js';

export const handler = withDurableExecution(
  async (event: any, context: DurableContext) => {
    // Your function receives DurableContext instead of Lambda context
    // Use context.step(), context.wait(), etc.
    return result;
  }
);
```

------
#### [ Python ]

```
from aws_durable_execution_sdk_python import durable_execution, DurableContext

@durable_execution
def handler(event: dict, context: DurableContext):
    # Your function receives DurableContext
    # Use context.step(), context.wait(), etc.
    return result
```

------
#### [ Java (Preview) ]

```
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;

public class Handler extends DurableHandler<Object, String> {
    @Override
    public String handleRequest(Object input, DurableContext context) {
        // Your function receives DurableContext
        // Use context.step(), context.wait(), etc.
        return result;
    }
}
```

------

封装器会拦截您的函数调用，加载任何现有的检查点日志，并提供管理重放和检查点的 `DurableContext`。

## 该 SDK 的用途
<a name="durable-sdk-what-it-does"></a>

该 SDK 承担三项关键责任，以实现持久执行：

**检查点管理：**当您的函数执行持久操作时，该 SDK 会自动创建检查点。每个检查点都记录操作类型、输入和结果。当您的函数完成一个步骤时，该 SDK 会保留检查点，然后再继续操作。这样可以确保您的函数在发生中断后可以从任何已完成的操作中恢复。

**重放协调：**当您的函数在暂停或中断后恢复时，该 SDK 会执行重放操作。它会从头开始运行您的代码，但会跳过已完成的操作，使用存储的检查点结果而不是重新执行它们。该 SDK 可确保重放操作具有确定性，如果输入和检查点日志相同，您的函数会生成相同的结果。

**状态隔离：**该 SDK 将执行状态与您的业务逻辑分开维护。每个持久执行都有自己的检查点日志，其他执行无法访问该日志。该 SDK 对静态检查点数据进行加密，并确保重放期间的状态保持一致。

## 检查点机制的工作原理
<a name="durable-sdk-how-checkpointing-works"></a>

当您调用持久操作时，该 SDK 将遵循以下顺序：

1. **检查现有检查点：**该 SDK 会检查此操作是否已在之前的调用中完成。如果已存在检查点，该 SDK 将在不重新执行操作的情况下返回存储的结果。

1. **执行操作：**如果不存在检查点，该 SDK 将执行您的操作代码。对于步骤而言，这意味着要调用您的函数。对于等待而言，这意味着要调度恢复。

1. **创建检查点：**操作完成后，该 SDK 会将结果序列化并创建检查点。检查点包括操作类型、名称、输入、结果和时间戳。

1. **保留检查点：**该 SDK 会调用 Lambda 检查点 API 来保留检查点。这确保了在继续执行之前，检查点是持久的。

1. **返回结果：**该 SDK 将操作结果返回到您的代码，然后继续执行下一个操作。

此顺序确保了每当一项操作完成时，其结果都会被安全地存储下来。如果您的函数在任何时候被中断，该 SDK 能够重放至上一个已完成的检查点。

## 重放行为
<a name="durable-sdk-replay-behavior"></a>

当您的函数在暂停或中断后恢复时，该 SDK 会执行重放操作：

1. **加载检查点日志：**该 SDK 会从 Lambda 中检索此次执行的检查点日志。

1. **从头开始运行：**该 SDK 会从头开始调用您的处理程序函数，而不是从它暂停的地方调用。

1. **跳过已完成的持久操作：**当您的代码调用持久操作时，该 SDK 会对照检查点日志检查每个操作。对于已完成的持久操作，该 SDK 会直接返回已存储的结果，而不会执行操作代码。
**注意**  
如果子上下文的结果大于最大检查点大小（256 KB），则该上下文的代码将在重放期间再次执行。这使得您能够根据在该上下文中运行的持久操作构建出较大的结果，这些结果将从检查点日志中进行查找。因此，必须确保只在该上下文中运行确定性代码。在使用包含较大结果的子上下文时，最佳做法是在步骤中执行长时间运行或非确定性的工作，而仅在上下文中执行整合结果的短时运行任务。

1. **在中断点恢复：**当该 SDK 执行到一个没有检查点的操作时，它会正常运行，并在完成持久操作后创建新的检查点。

这种重放机制要求您的代码必须具有确定性。在输入和检查点日志相同的情况下，您的函数必须按相同顺序执行持久操作调用。该 SDK 通过在重放过程中验证操作名称和类型是否与检查点日志匹配来强制执行此规则。

## 可用的持久操作
<a name="durable-sdk-operations"></a>

`DurableContext` 为不同的协调模式提供操作。每次持久操作都会自动创建检查点，从而确保您的函数能够从任何位置恢复运行。

### Steps
<a name="durable-sdk-op-step"></a>

使用自动检查点机制和重试功能执行业务逻辑。对于调用外部服务、进行计算或执行任何需要进行检查点记录的逻辑的操作，请遵循相应的步骤。该 SDK 会在步骤之前和之后创建一个检查点，存储结果以供重放。

------
#### [ TypeScript ]

```
const result = await context.step('process-payment', async () => {
  return await paymentService.charge(amount);
});
```

------
#### [ Python ]

```
result = context.step(
    lambda _: payment_service.charge(amount),
    name='process-payment'
)
```

------
#### [ Java (Preview) ]

```
var result = context.step("process-payment", Payment.class, 
    () -> paymentService.charge(amount)
);
```

------

步骤支持可配置的重试策略、执行语义（最多一次或至少一次）和自定义序列化。

### 等待
<a name="durable-sdk-op-wait"></a>

在指定的持续时间内暂停执行，且不消耗计算资源。SDK 会创建检查点、终止函数调用并调度恢复。当等待过程结束时，Lambda 会再次调用您的函数，并且 SDK 会重放到等待点，然后继续执行。

------
#### [ TypeScript ]

```
// Wait 1 hour without charges
await context.wait({ seconds: 3600 });
```

------
#### [ Python ]

```
# Wait 1 hour without charges
context.wait(3600)
```

------
#### [ Java (Preview) ]

```
// Wait 1 hour without charges
context.wait(Duration.ofHours(1));
```

------

### 回拨
<a name="durable-sdk-op-callback"></a>

回调使您的函数能够暂停并等待外部系统提供输入。创建回调时，该 SDK 会生成一个唯一的回调 ID 并创建一个检查点。然后，您的函数将暂停（终止调用），且不会产生计算费用。外部系统使用 `SendDurableExecutionCallbackSuccess` 或 `SendDurableExecutionCallbackFailure` Lambda API 提交回调结果。提交回调后，Lambda 会再次调用您的函数，SDK 会重放至回调点，然后您的函数会继续使用回调结果执行。

SDK 提供了两种处理回调的方法：

**createCallback：**创建回调并同时返回 Promise 和回调 ID。您将回调 ID 发送到外部系统，该系统随后使用 Lambda API 提交结果。

------
#### [ TypeScript ]

```
const [promise, callbackId] = await context.createCallback('approval', {
  timeout: { hours: 24 }
});

await sendApprovalRequest(callbackId, requestData);
const approval = await promise;
```

------
#### [ Python ]

```
callback = context.create_callback(
    name='approval',
    config=CallbackConfig(timeout_seconds=86400)
)

context.step(
    lambda _: send_approval_request(callback.callback_id),
    name='send_request'
)

approval = callback.result()
```

------
#### [ Java (Preview) ]

```
var config = CallbackConfig.builder(Duration.ofHours(24)).timeout()

var callback = context.createCallback("approval", String.class, config);

context.step("send-request", String.class, () -> {
    notificationService.sendApprovalRequest(callback.callbackId(), requestData);
    return "request-sent";
});

// Blocks until the callback finishes or times out
String approval = callback.get();
```

------

**waitForCallback：**通过将回调的创建与提交合二为一，简化了回调处理过程。SDK 创建回调，使用回调 ID 执行您的提交函数，然后等待结果。

------
#### [ TypeScript ]

```
const result = await context.waitForCallback(
  'external-api',
  async (callbackId, ctx) => {
    await submitToExternalAPI(callbackId, requestData);
  },
  { timeout: { minutes: 30 } }
);
```

------
#### [ Python ]

```
result = context.wait_for_callback(
    lambda callback_id: submit_to_external_api(callback_id, request_data),
    name='external-api',
    config=WaitForCallbackConfig(timeout_seconds=1800)
)
```

------
#### [ Java (Preview) ]

waitForCallback 仍在为 Java 开发中。

------

配置超时，以防止函数无限期等待。如果回调超时，SDK 会抛出 `CallbackError`，此时您的函数可以处理超时的情况。对长时间运行的回调使用心跳超时，以检测外部系统停止响应的时间。

对于需要人工干预的工作流程、与外部系统的集成、webhook 的响应，或者任何必须暂停等待外部输入才能继续执行的场景，都应使用回调机制。

### 并行执行
<a name="durable-sdk-op-parallel"></a>

使用可选的并发控制同时执行多个操作。该 SDK 负责管理并行执行过程，为每个操作创建检查点，并根据您的完成策略处理故障情况。

------
#### [ TypeScript ]

```
const results = await context.parallel([
  async (ctx) => ctx.step('task1', async () => processTask1()),
  async (ctx) => ctx.step('task2', async () => processTask2()),
  async (ctx) => ctx.step('task3', async () => processTask3())
]);
```

------
#### [ Python ]

```
results = context.parallel(
    lambda ctx: ctx.step(lambda _: process_task1(), name='task1'),
    lambda ctx: ctx.step(lambda _: process_task2(), name='task2'),
    lambda ctx: ctx.step(lambda _: process_task3(), name='task3')
)
```

------
#### [ Java (Preview) ]

Parallel 仍在为 Java 开发中。

------

使用 `parallel` 来同时执行独立的操作。

### Map
<a name="durable-sdk-op-map"></a>

使用可选的并发控制对数组中的每个项目同时执行操作。该 SDK 负责管理并发执行过程，为每个操作创建检查点，并根据您的完成策略处理故障情况。

------
#### [ TypeScript ]

```
const results = await context.map(itemArray, async (ctx, item, index) =>
  ctx.step('task', async () => processItem(item, index))
);
```

------
#### [ Python ]

```
results = context.map(
    item_array,
    lambda ctx, item, index: ctx.step(
        lambda _: process_item(item, index),
        name='task'
    )
)
```

------
#### [ Java (Preview) ]

Map 仍在为 Java 开发中。

------

使用 `map` 处理具有并发控制的数组。

### 子上下文
<a name="durable-sdk-op-child-context"></a>

为分组操作创建隔离的执行上下文。子上下文具有自己的检查点日志，并且可以包含多个步骤、等待操作以及其他操作。该 SDK 将整个子上下文视为一个整体来进行重试和恢复操作。

利用子上下文来组织复杂的工作流程、实施子工作流程，或者将那些应该一同重试的操作进行隔离。

------
#### [ TypeScript ]

```
const result = await context.runInChildContext(
  'batch-processing',
  async (childCtx) => {
    return await processBatch(childCtx, items);
  }
);
```

------
#### [ Python ]

```
result = context.run_in_child_context(
    lambda child_ctx: process_batch(child_ctx, items),
    name='batch-processing'
)
```

------
#### [ Java (Preview) ]

```
var result = context.runInChildContext(
    "batch-processing", 
    String.class, 
    childCtx -> process_batch(childCtx, items)
);
```

------

重放机制要求持久操作按照确定的顺序进行。通过使用多个子上下文，您可以实现多条工作流的并行执行，而确定性则在每个上下文中分别适用。这使您能够构建高性能的功能，而这些函数能够高效地利用多个 CPU 核心。

例如，假设我们启动了两个子上下文，分别称为 A 和 B。在初始调用时，这两个上下文内的步骤会按照这样的顺序执行，其中“A”步骤与“B”步骤同时进行：A1、B1、B2、A2、A3。重放时，时间会大大缩短，因为结果是从检查点日志中获取的，而且步骤的执行顺序与之前有所不同，即：B1、A1、A2、B2、A3。因为“A”步骤是按照正确的顺序（A1、A2、A3）被处理的，而“B”步骤也是按照正确的顺序（B1、B2）被处理的，所以确定性的需求得到了正确满足。

### 有条件等待
<a name="durable-sdk-op-wait-condition"></a>

在两次尝试之间使用自动检查点机制对条件进行轮询。该 SDK 将执行您的检查函数，用结果创建一个检查点，根据您的策略等待，然后重复直到条件得到满足。

------
#### [ TypeScript ]

```
const result = await context.waitForCondition(
  async (state, ctx) => {
    const status = await checkJobStatus(state.jobId);
    return { ...state, status };
  },
  {
    initialState: { jobId: 'job-123', status: 'pending' },
    waitStrategy: (state) => 
      state.status === 'completed' 
        ? { shouldContinue: false }
        : { shouldContinue: true, delay: { seconds: 30 } }
  }
);
```

------
#### [ Python ]

```
result = context.wait_for_condition(
    lambda state, ctx: check_job_status(state['jobId']),
    config=WaitForConditionConfig(
        initial_state={'jobId': 'job-123', 'status': 'pending'},
        wait_strategy=lambda state, attempt: 
            {'should_continue': False} if state['status'] == 'completed'
            else {'should_continue': True, 'delay': 30}
    )
)
```

------
#### [ Java (Preview) ]

waitForCondition 仍在为 Java 开发中。

------

使用 `waitForCondition` 进行外部系统轮询、等待资源准备就绪或使用回退实现重试。

### 函数调用
<a name="durable-sdk-op-invoke"></a>

调用另一个 Lambda 函数并等待其结果。该 SDK 会创建检查点，调用目标函数，并在调用完成后恢复您的函数。这使得函数组合和工作流程分解成为可能。

------
#### [ TypeScript ]

```
const result = await context.invoke(
  'invoke-processor',
  'arn:aws:lambda:us-east-1:123456789012:function:processor:1',
  { data: inputData }
);
```

------
#### [ Python ]

```
result = context.invoke(
    'arn:aws:lambda:us-east-1:123456789012:function:processor:1',
    {'data': input_data},
    name='invoke-processor'
)
```

------
#### [ Java (Preview) ]

```
var result = context.invoke(
    "invoke-processor", 
    "arn:aws:lambda:us-east-1:123456789012:function:processor:1",
    inputData,
    Result.class, 
    InvokeConfig.builder().build()
);
```

------

## 如何计量持久操作
<a name="durable-operations-checkpoint-consumption"></a>

您通过 `DurableContext` 调用的每个持久操作都会创建检查点来跟踪执行进度并存储状态数据。这些操作会根据其使用情况产生费用，而这些检查点中可能包含产生您的数据写入和保留成本的数据。存储的数据包括调用事件数据、从步骤中返回的有效载荷以及完成回调时传递的数据。了解持久操作的计量方式有助于您估算执行成本并优化工作流程。有关定价详情，请参阅 [Lambda 定价页面](https://aws.amazon.com/lambda/pricing/)。

有效载荷大小是指持久操作所保存的已序列化数据的大小。数据以字节为单位计量，大小可能会因操作使用的序列化器而异。一次操作的有效载荷可以是操作成功完成后的实际结果，也可以是在操作失败时所生成的序列化错误对象。

### 基本操作
<a name="durable-operations-basic"></a>

基本操作是持久性函数的基本构建块：


| 操作 | 检查点时间 | 操作次数 | 保存的数据 | 
| --- | --- | --- | --- | 
| Execution | 已启动 | 1 | 输入有效载荷大小 | 
| Execution | 已完成（成功/失败/已停止） | 0 | 输出有效载荷大小 | 
| 步骤 | 重试/成功/失败 | 1 \$1 N 次重试 | 每次尝试所返回的有效载荷大小 | 
| Wait | 已启动 | 1 | 不适用 | 
| WaitForCondition | 每次轮询尝试 | 1 \$1 N 次轮询 | 每次轮询尝试所返回的有效载荷大小 | 
| 调用级重试 | 已启动 | 1 | 错误对象的有效载荷 | 

### 回调操作
<a name="durable-operations-callbacks"></a>

回调操作使您的函数能够暂停并等待外部系统提供输入。这些操作会在创建回调和完成回调时创建检查点：


| 操作 | 检查点时间 | 操作次数 | 保存的数据 | 
| --- | --- | --- | --- | 
| CreateCallback | 已启动 | 1 | 不适用 | 
| 通过 API 调用完成回调 | Completed | 0 | 回调有效载荷 | 
| WaitForCallback | 已启动 | 3 \$1 N 次重试（上下文 \$1 回调 \$1 步骤） | 提交步骤尝试所返回的有效载荷，再加上两个回调有效载荷的副本 | 

### 复合操作
<a name="durable-operations-compound"></a>

复合操作将多个持久操作组合在一起，以处理诸如并行执行、数组处理和嵌套上下文等复杂的协调模式：


| 操作 | 检查点时间 | 操作次数 | 保存的数据 | 
| --- | --- | --- | --- | 
| Parallel | 已启动 | 1 \$1 N 个分支（1 个父上下文 \$1 N 个子上下文） | 每个分支返回的有效载荷大小的最多两份副本，外加每个分支的状态 | 
| Map | 已启动 | 1 \$1 N 个分支（1 个父上下文 \$1 N 个子上下文） | 每次迭代返回的有效载荷大小的最多两份副本，外加每次迭代的状态 | 
| Promise 助手 | Completed | 1 | Promise 所返回的有效载荷大小 | 
| RunInChildContext | 成功/失败 | 1 | 从子上下文中返回的有效载荷大小 | 

对于上下文，例如来自 `runInChildContext` 或由复合操作在内部使用的上下文，将直接对小于 256KB 的结果进行检查点检查。较大的结果并不会被存储下来——相反，在重放时，它们会通过重新处理上下文的操作而被重新构建。

# 持久性函数支持的运行时
<a name="durable-supported-runtimes"></a>

持久性函数适用于选定的托管运行时和 OCI 容器映像，可提高运行时版本的灵活性。您可以直接在控制台中使用托管运行时为 Node.js 和 Python 创建持久性函数，也可以通过基础架构即代码以编程方式创建。Java（预览版）中的持久性函数目前只能通过容器映像部署。

## Lambda 托管运行时
<a name="durable-managed-runtimes"></a>

当您在 Lambda 控制台中或使用具有 `--durable-config '{"ExecutionTimeout": 3600, "RetentionPeriodInDays": 7}'` 参数的 AWS CLI 创建函数时，以下托管运行时将支持持久性函数。有关 Lambda 运行时的完整信息，请参阅 [Lambda 运行时](lambda-runtimes.md)。


| 语言 | 运行时 | 
| --- | --- | 
| Node.js | nodejs22.x | 
| Node.js | nodejs24.x | 
| Python | python3.13 | 
| Python | python3.14 | 

**注意**  
Lambda 运行时包括用于测试和开发的持久执行 SDK。但是，我们建议将 SDK 包含在您的生产部署包中。这样可以确保版本一致性，并且可以避免可能影响函数行为的潜在运行时更新。

### Node.js
<a name="durable-runtime-nodejs"></a>

在 Node.js 项目中安装 SDK：

```
npm install @aws/durable-execution-sdk-js
```

SDK 支持 JavaScript 和 TypeScript。对于 TypeScript 项目，SDK 包含类型定义。

### Python
<a name="durable-runtime-python"></a>

在 Python 项目中安装 SDK：

```
pip install aws-durable-execution-sdk-python
```

Python SDK 使用同步方法，并且不需要 `async/await`。

### Java（预览版）
<a name="durable-runtime-java"></a>

将依赖项添加到 `pom.xml`：

```
<dependency>
    <groupId>software.amazon.lambda.durable</groupId>
    <artifactId>aws-durable-execution-sdk-java</artifactId>
    <version>VERSION</version>
</dependency>
```

在 Java 项目中安装 SDK：

```
mvn install
```

Java SDK 的预览版现已推出。waitForCondition、waitForCallback、parallel 和 map 操作仍在开发中。

## 容器映像
<a name="durable-container-images"></a>

您可以将持久性函数与容器映像一起使用，以支持其他运行时版本或自定义运行时配置。容器映像使您能够使用并非作为托管运行时提供的运行时版本，或对您的运行时环境进行自定义。

要使用容器映像创建持久性函数：

1. 基于 Lambda 基础映像创建 Dockerfile

1. 在您的容器中安装持久执行 SDK

1. 构建容器映像并将其推送到 Amazon Elastic Container Registry

1. 在启用持久执行的情况下，从容器映像创建 Lambda 函数

### 包含示例
<a name="durable-container-python"></a>

创建 Dockerfile：

------
#### [ Python ]

为 Python 3.11 创建 Dockerfile：

```
FROM public.ecr.aws/lambda/python:3.11

# Copy requirements file
COPY requirements.txt ${LAMBDA_TASK_ROOT}/

# Install dependencies including durable SDK
RUN pip install -r requirements.txt

# Copy function code
COPY lambda_function.py ${LAMBDA_TASK_ROOT}/

# Set the handler
CMD [ "lambda_function.handler" ]
```

创建 `requirements.txt` 文件：

```
aws-durable-execution-sdk-python
```

------
#### [ Java (Preview) ]

为 Java 25 创建 Dockerfile：

```
FROM --platform=linux/amd64 public.ecr.aws/lambda/java:25

# Install Maven
RUN dnf install -y maven

WORKDIR /var/task

# Copy Maven configuration and source code
COPY pom.xml .
COPY src ./src

# Build
RUN mvn clean package -DskipTests

# Move JAR to lib directory
RUN mv target/*.jar lib/

# Set the handler
CMD ["src.path.to.lambdaFunction::handler"]
```

------

构建并推送映像。

```
# Build the image
docker build -t my-durable-function .

# Tag for ECR
docker tag my-durable-function:latest 123456789012.dkr.ecr.us-east-1.amazonaws.com/my-durable-function:latest

# Push to ECR
docker push 123456789012.dkr.ecr.us-east-1.amazonaws.com/my-durable-function:latest
```

创建启用了持久执行的函数：

```
aws lambda create-function \
  --function-name myDurableFunction \
  --package-type Image \
  --code ImageUri=123456789012.dkr.ecr.us-east-1.amazonaws.com/my-durable-function:latest \
  --role arn:aws:iam::123456789012:role/lambda-execution-role \
  --durable-config '{"ExecutionTimeout": 3600, "RetentionPeriodInDays": 7}'
```

有关将容器映像与 Lambda 结合使用的更多信息，请参阅《Lambda 开发人员指南》中的[创建 Lambda 容器映像](https://docs.aws.amazon.com/lambda/latest/dg/images-create.html)。

## 运行时系统注意事项
<a name="durable-runtime-considerations"></a>

**SDK 版本管理：**在您的部署包或容器映像中包含持久执行 SDK。这样可以确保您的函数使用的是特定的 SDK 版本，并且不会受到运行时更新的影响。将 SDK 版本固定在您的 `package.json` 或 `requirements.txt` 中以控制升级时间。

**运行时更新：**AWS 将更新托管运行时，以包括安全补丁和错误修复。这些更新可能包括新的 SDK 版本。为避免发生意外行为，请将 SDK 包含在部署包中，并在部署到生产环境之前进行彻底测试。

**容器映像大小：**容器映像的最大未压缩大小为 10GB。持久执行 SDK 为映像添加了最小大小。通过采用多阶段构建方式并移除不必要的依赖项来优化您的容器。

**冷启动性能：**容器映像的冷启动时间可能比托管运行时长。持久执行 SDK 对冷启动性能的影响极小。如果冷启动延迟对您的应用程序至关重要，请使用预置并发。

# 调用持久性 Lambda 函数
<a name="durable-invoking"></a>

持久性 Lambda 函数支持与标准 Lambda 函数相同的调用方法。您可以通过同步、异步或事件源映射的方式调用持久性函数。调用过程与标准函数相同，但持久性函数为长时间运行的执行和自动状态管理提供了额外的功能。

## 调用方法
<a name="durable-invoking-methods"></a>

**同步调用：**调用持久性函数并等待响应。Lambda 将同步调用限制为 15 分钟（或更短，具体取决于配置的函数和执行超时）。当您需要立即获取结果，或与需要响应的 API 和服务集成时，请使用同步调用。您可以使用等待操作进行高效计算，而不会中断调用方——调用会等待整个持久执行完成。对于幂等执行的启动，请使用[幂等性](durable-execution-idempotency.md)中所述的执行名称参数。

```
aws lambda invoke \
  --function-name my-durable-function:1 \
  --cli-binary-format raw-in-base64-out \
  --payload '{"orderId": "12345"}' \
  response.json
```

**异步调用：**将事件排入队列以进行处理，而不必等待响应。Lambda 将事件置于队列中并立即返回。异步调用最多支持 1 年的执行时间。在需要即发即弃的场景，或当处理过程可以在后台发生时，请使用异步调用。对于幂等执行的启动，请使用[幂等性](durable-execution-idempotency.md)中所述的执行名称参数。

```
aws lambda invoke \
  --function-name my-durable-function:1 \
  --invocation-type Event \
  --cli-binary-format raw-in-base64-out \
  --payload '{"orderId": "12345"}' \
  response.json
```

**事件源映射：**配置 Lambda 以在记录从流或基于队列的服务（如 Amazon SQS、Kinesis 或 DynamoDB）可用时自动调用持久性函数。事件源映射会轮询事件源并使用批量记录调用函数。有关将事件源映射与持久性函数结合使用的详细信息（包括执行持续时间限制），请参阅[结合使用事件源映射与持久性函数](durable-invoking-esm.md)。

有关每种调用方法的完整详细信息，请参阅[同步调用](invocation-sync.md)和[异步调用](invocation-async.md)。

**注意**  
持久性函数支持死信队列（DLQ）用于错误处理，但不支持 Lambda 目标。配置 DLQ 以从失败的调用中捕获记录。

## 限定 ARN 要求
<a name="durable-invoking-qualified-arns"></a>

持久性函数需要使用限定的标识符才能调用。必须使用版本号、别名或 `$LATEST` 来调用持久性函数。您可以使用完全限定的 ARN，也可以使用带有版本/别名后缀的函数名称。不能使用非限定标识符（没有版本或别名后缀）。

**有效调用：**

```
# Using full ARN with version number
arn:aws:lambda:us-east-1:123456789012:function:my-durable-function:1

# Using full ARN with alias
arn:aws:lambda:us-east-1:123456789012:function:my-durable-function:prod

# Using full ARN with $LATEST
arn:aws:lambda:us-east-1:123456789012:function:my-durable-function:$LATEST

# Using function name with version number
my-durable-function:1

# Using function name with alias
my-durable-function:prod
```

**无效调用：**

```
# Unqualified ARN (not allowed)
arn:aws:lambda:us-east-1:123456789012:function:my-durable-function

# Unqualified function name (not allowed)
my-durable-function
```

此要求可确保持久执行在其整个生命周期中保持一致。当持久执行启动时，它会固定到特定的函数版本。如果函数暂停并在数小时或数天后恢复，Lambda 将调用开始执行的相同版本，从而确保整个工作流中的代码一致性。

**最佳实践**  
对于生产环境中的持久性函数，请使用编号版本或别名，而非 `$LATEST`。编号版本是不可变的，支持确定性重放。或者，别名提供稳定的引用，您可以更新该引用以指向新版本，而无需更改调用代码。更新别名时，新执行将使用新版本，而正在进行的执行将继续使用其原始版本。在开发过程中，您可以将 `$LATEST` 用于原型设计或缩短部署时间，但需理解，如果在运行中的执行期间底层代码发生更改，则执行可能无法正确重放（甚至可能失败）。

## 了解执行生命周期
<a name="durable-invoking-execution-lifecycle"></a>

当您调用持久性函数时，Lambda 会创建一个可以跨越多个函数调用的持久执行：

1. **初始调用：**您的调用请求会创建新的持久执行。Lambda 分配一个唯一的执行 ID 并开始处理。

1. **执行和检查点：**当您的函数执行持久操作时，SDK 会创建跟踪进度的检查点。

1. **暂停（如果需要）：**如果您的函数使用持久等待（例如 `wait` 或 `waitForCallback`）或自动步骤重试，Lambda 会暂停执行并停止对计算时间计费。

1. **恢复：**需要恢复时（包括重试之后），Lambda 会再次调用您的函数。SDK 会重放检查点日志，并从暂停执行的地方继续执行。

1. **完成：**当您的函数返回最终结果或抛出未处理的错误时，持久执行完成。

对于同步调用，调用方等待整个持久执行完成，包括任何等待操作。如果执行超过调用超时（15 分钟或更短），则调用超时。对于异步调用，Lambda 会立即返回，并独立地继续执行。使用持久执行 API 来跟踪执行状态并检索最终结果。

## 从应用程序代码调用
<a name="durable-invoking-with-sdk"></a>

使用 AWS SDK 从应用程序代码中调用持久性函数。调用过程与标准函数相同：

------
#### [ TypeScript ]

```
import { LambdaClient, InvokeCommand } from '@aws-sdk/client-lambda';

const client = new LambdaClient({});

// Synchronous invocation
const response = await client.send(new InvokeCommand({
  FunctionName: 'arn:aws:lambda:us-east-1:123456789012:function:my-durable-function:1',
  Payload: JSON.stringify({ orderId: '12345' })
}));

const result = JSON.parse(Buffer.from(response.Payload!).toString());

// Asynchronous invocation
await client.send(new InvokeCommand({
  FunctionName: 'arn:aws:lambda:us-east-1:123456789012:function:my-durable-function:1',
  InvocationType: 'Event',
  Payload: JSON.stringify({ orderId: '12345' })
}));
```

------
#### [ Python ]

```
import boto3
import json

client = boto3.client('lambda')

# Synchronous invocation
response = client.invoke(
    FunctionName='arn:aws:lambda:us-east-1:123456789012:function:my-durable-function:1',
    Payload=json.dumps({'orderId': '12345'})
)

result = json.loads(response['Payload'].read())

# Asynchronous invocation
client.invoke(
    FunctionName='arn:aws:lambda:us-east-1:123456789012:function:my-durable-function:1',
    InvocationType='Event',
    Payload=json.dumps({'orderId': '12345'})
)
```

------

## 链式调用
<a name="durable-invoking-chained"></a>

持久性函数可以使用 `DurableContext` 中的 `invoke` 操作调用其他持久性函数和非持久性函数。这会创建一个链式调用，其中调用函数等待（暂停）被调用函数完成：

------
#### [ TypeScript ]

```
export const handler = withDurableExecution(
  async (event: any, context: DurableContext) => {
    // Invoke another durable function and wait for result
    const result = await context.invoke(
      'process-order',
      'arn:aws:lambda:us-east-1:123456789012:function:order-processor:1',
      { orderId: event.orderId }
    );
    
    return { statusCode: 200, body: JSON.stringify(result) };
  }
);
```

------
#### [ Python ]

```
@durable_execution
def handler(event, context: DurableContext):
    # Invoke another durable function and wait for result
    result = context.invoke(
        'arn:aws:lambda:us-east-1:123456789012:function:order-processor:1',
        {'orderId': event['orderId']},
        name='process-order'
    )
    
    return {'statusCode': 200, 'body': json.dumps(result)}
```

------

链式调用会在调用函数中创建一个检查点。如果调用函数被中断，它将使用被调用函数的结果从检查点恢复，而无需重新调用该函数。

**注意**  
不支持跨账户链式调用。被调用的函数必须与调用函数在同一 AWS 账户中。

# 采用持久性函数的事件源映射
<a name="durable-invoking-esm"></a>

持久性函数适用于所有的 Lambda 事件源映射。为持久性函数配置事件源映射，方法与为标准函数配置事件源映射相同。事件源映射会自动轮询事件源（例如 Amazon SQS、Kinesis 和 DynamoDB Streams），并使用批量记录调用函数。

事件源映射对于那些处理具有复杂多步骤工作流程的流或队列的持久性函数来说非常有用。例如，您可以创建一个持久性函数，用于通过重试、外部 API 调用以及人工审批等功能处理 Amazon SQS 消息。

## 事件源映射如何调用持久性函数
<a name="durable-esm-invocation-behavior"></a>

事件源映射同步调用持久性函数，以等待完整的持久执行完成，然后再处理下一批或将记录标记为已处理。如果持久执行的时间超过 15 分钟，则执行超时并失败。事件源映射收到超时异常，并根据其重试配置处理该异常。

## 15 分钟执行限制
<a name="durable-esm-duration-limit"></a>

当通过事件源映射调用持久性函数时，整个持久执行时间不能超过 15 分钟。此限制适用于从开始到完成的整个持久执行，而不仅仅是单个函数调用。

这个 15 分钟的限制与 Lambda 函数的超时（同样为最长 15 分钟）是相互独立的。函数超时用于控制每次单独调用的运行时长，而持久执行超时则控制从执行开始到完成的总耗时。

**应用场景示例：**
+ **有效：**持久性函数通过三个步骤来处理 Amazon SQS 消息，每个步骤都需要 2 分钟，然后会再等待 5 分钟，之后才会完成最后一个步骤。总执行时间：11 分钟。这之所以能奏效，是因为整个过程不超过 15 分钟。
+ **无效：**持久性函数处理 Amazon SQS 消息，在 2 分钟内完成初始处理，然后会等待 20 分钟以接收外部回调，之后再完成整个流程。总执行时间：22 分钟。这超出了 15 分钟的限制并且会失败。
+ **无效：**持久性函数处理一个 Kinesis 记录，其在各步骤之间的多个等待操作总时长为 30 分钟。尽管每次单独的调用都能迅速完成，但整个执行时间却超过了 15 分钟。

**重要**  
在使用事件源映射时，请将持久执行超时时间设置为 15 分钟或更短，否则创建事件源映射的操作将会失败。如果您的工作流程需要较长的执行时间，请使用下面所描述的中介函数模式。

## 配置事件源映射
<a name="durable-esm-configuration"></a>

使用 Lambda 控制台、AWS CLI 或 AWS SDK 为持久性函数配置事件源映射。所有标准事件源映射属性都适用于持久性函数：

```
aws lambda create-event-source-mapping \
  --function-name arn:aws:lambda:us-east-1:123456789012:function:my-durable-function:1 \
  --event-source-arn arn:aws:sqs:us-east-1:123456789012:my-queue \
  --batch-size 10 \
  --maximum-batching-window-in-seconds 5
```

为持久性函数配置事件源映射时，请记住使用限定的 ARN（带有版本号或别名）。

## 使用事件源映射进行错误处理
<a name="durable-esm-error-handling"></a>

事件源映射提供内置的错误处理功能，可与持久性函数配合使用：
+ **重试行为：**如果初始调用失败，则事件源映射将根据其重试配置进行重试操作。根据您的要求配置最大重试尝试次数和重试间隔时间。
+ **死信队列：**配置死信队列，以捕获所有重试尝试后仍失败的记录。这能够防止信息丢失，并允许对出现故障的记录进行人工检查。
+ **部分批处理失败：**对于 Amazon SQS 和 Kinesis，使用部分批处理故障报告来单独处理记录，并且仅重试失败的记录。
+ **出错时二分：**对于 Kinesis 和 DynamoDB Streams，启用错误时二分以拆分失败的批次并隔离有问题的记录。

**注意**  
持久性函数支持死信队列（DLQ）用于错误处理，但不支持 Lambda 目标。配置 DLQ 以从失败的调用中捕获记录。

有关事件源映射错误处理的完整信息，请参阅[事件源映射](invocation-eventsourcemapping.md)。

## 为长时间运行的工作流程使用中介函数
<a name="durable-esm-intermediary-function"></a>

如果您的工作流程需要超过 15 分钟才能完成，那么可以在事件源映射和持久性函数之间使用一个中介标准 Lambda 函数。中介函数接收来自事件源映射的事件并异步调用持久性函数，从而消除 15 分钟的执行限制。

这种模式将事件源映射的同步调用模式与持久性函数的长时间运行执行模型分离开来。事件源映射调用中介函数，该函数在开始持久执行后会迅速返回。然后，持久性函数可以根据需要独立运行（最长 1 年）。

### 架构
<a name="durable-esm-intermediary-architecture"></a>

中介函数模式使用三个组件：

1. **事件源映射：**轮询事件源（Amazon SQS、Kinesis、DynamoDB Streams），并与批量记录同步调用中介函数。

1. **中介函数：**一种标准的 Lambda 函数，它会从事件源映射接收事件，对数据进行验证和转换（如果需要的话），然后异步调用持久性函数。此函数执行迅速（通常在 1 秒以内完成），并会将控制权交还给事件源映射。

1. **持久性函数：**以复杂、多步骤的逻辑来处理事件，该逻辑能够持续运行较长时间。异步调用，因此不受 15 分钟限制的约束。

### 实施
<a name="durable-esm-intermediary-implementation"></a>

中介函数接收来自事件源映射的整个事件并异步调用持久性函数。使用执行名称参数以确保幂等性执行开始，从而避免在事件源映射重试时出现重复处理的情况：

------
#### [ TypeScript ]

```
import { LambdaClient, InvokeCommand } from '@aws-sdk/client-lambda';
import { SQSEvent } from 'aws-lambda';
import { createHash } from 'crypto';

const lambda = new LambdaClient({});

export const handler = async (event: SQSEvent) => {
  // Invoke durable function asynchronously with execution name
  await lambda.send(new InvokeCommand({
    FunctionName: 'arn:aws:lambda:us-east-1:123456789012:function:my-durable-function:1',
    InvocationType: 'Event',
    Payload: JSON.stringify({
      executionName: event.Name,
      event: event
    })
  }));
  
  return { statusCode: 200 };
};
```

------
#### [ Python ]

```
import boto3
import json
import hashlib

lambda_client = boto3.client('lambda')

def handler(event, context):  
    # Invoke durable function asynchronously with execution name
    lambda_client.invoke(
        FunctionName='arn:aws:lambda:us-east-1:123456789012:function:my-durable-function:1',
        InvocationType='Event',
        Payload=json.dumps({
            'executionName': execution_name,
            'event': event["name"]
        })
    )
    
    return {'statusCode': 200}
```

------

对于中介函数本身的幂等性，请使用 [Powertools for AWS Lambda](https://docs.aws.amazon.com//powertools/) 来防止在事件源映射重试中介函数时重复调用持久性函数。

持久性函数会接收带有执行名称的有效载荷，并对所有涉及长时间运行逻辑的记录进行处理：

------
#### [ TypeScript ]

```
import { withDurableExecution, DurableContext } from '@aws/durable-execution-sdk-js';

export const handler = withDurableExecution(
  async (payload: any, context: DurableContext) => {
    const sqsEvent = payload.event;
    
    // Process each record with complex, multi-step logic
    const results = await context.map(
      sqsEvent.Records,
      async (ctx, record) => {
        const validated = await ctx.step('validate', async () => {
          return validateOrder(JSON.parse(record.body));
        });
        
        // Wait for external approval (could take hours or days)
        const approval = await ctx.waitForCallback(
          'approval',
          async (callbackId) => {
            await requestApproval(callbackId, validated);
          },
          { timeout: { hours: 48 } }
        );
        
        // Complete processing
        return await ctx.step('complete', async () => {
          return completeOrder(validated, approval);
        });
      }
    );
    
    return { statusCode: 200, processed: results.getResults().length };
  }
);
```

------
#### [ Python ]

```
from aws_durable_execution_sdk_python import durable_execution, DurableContext
from aws_durable_execution_sdk_python.config import Duration, WaitForCallbackConfig
from collections.abc import Sequence
import json

def validate_order(order_data: dict) -> dict:
    """Validate order data - always passes."""
    return order_data

def request_approval(callback_id: str, validated_order: dict) -> None:
    """Request approval for the order - always passes."""
    pass

def complete_order(validated_order: dict, approval_result: str) -> dict:
    """Complete the order processing - always passes."""
    return validated_order

@durable_execution
def lambda_handler(payload, context: DurableContext):
    sqs_event = payload['event']

    def process_record(
        ctx: DurableContext, 
        record: dict, 
        index: int, 
        items: Sequence[dict]
    ) -> dict:
        validated = ctx.step(
            lambda _: validate_order(json.loads(record['body'])),
            name=f'validate-{index}'
        )

        approval = ctx.wait_for_callback(
            submitter=lambda callback_id, wait_ctx: request_approval(callback_id, validated),
            name=f'approval-{index}',
            config=WaitForCallbackConfig(timeout=Duration.from_seconds(172800))
        )

        return ctx.step(
            lambda _: complete_order(validated, approval),
            name=f'complete-{index}'
        )

    results = context.map(
        inputs=sqs_event['Records'],
        func=process_record,
        name='process-records'
    )

    return {
        'statusCode': 200, 
        'started': results.started_count,
        'completed': results.success_count,
        'failed': results.failure_count,
        'total': results.total_count
    }
```

------

### 重要注意事项
<a name="durable-esm-intermediary-tradeoffs"></a>

这种模式通过将事件源映射与持久执行相分离，消除了 15 分钟的执行限制。中介函数在启动持久执行后会立即返回，从而使得事件源映射能够继续进行处理。然后，持久性函数可以根据需要长时间独立运行。

中介函数在调用持久性函数时才视为成功，而非在持久执行完成时才算成功。如果持久性执行后续失败，事件源映射将不会进行重试，因为其已经成功处理了该批次。在持久性函数中实施错误处理，并为失败的执行配置死信队列。

使用执行名称参数确保幂等性执行开始。如果事件源映射重试中介函数，则持久性函数将不会开始重复执行，因为执行名称已经存在。

## 支持的事件源
<a name="durable-esm-supported-sources"></a>

持久性函数支持使用事件源映射的所有 Lambda 事件源：
+ Amazon SQS 队列（标准和 FIFO）
+ Kinesis Streams
+ DynamoDB Streams
+ Amazon Managed Streaming for Apache Kafka (Amazon MSK)
+ 自行管理的 Apache Kafka
+ Amazon MQ（ActiveMQ 和 RabbitMQ）
+ Amazon DocumentDB 更改流

在调用持久性函数时，所有事件源类型都受到 15 分钟的持久执行时间限制的约束。

# Lambda 持久性函数的重试次数
<a name="durable-execution-sdk-retries"></a>

持久性函数提供自动重试功能，这使您的应用程序能够抵御暂时性的故障。SDK 在两个层面处理重试：一是针对业务逻辑故障的步骤重试；二是针对基础设施故障的后端重试。

## 步骤重试
<a name="durable-step-retries"></a>

当某个步骤中出现未捕获的异常时，SDK 会根据所配置的重试策略自动重试该步骤。步骤重试是检查点操作，它使 SDK 能够暂停执行并稍后恢复，从而不会丢失进度。

### 步骤重试行为
<a name="durable-step-retry-behavior"></a>

下表介绍了 SDK 如何在步骤中处理异常情况：


| 场景 | 发生了什么 | 计量影响 | 
| --- | --- | --- | 
| 剩余重试次数的步骤中出现异常 | SDK 会为重试创建检查点并暂停该函数。下次调用时，该步骤按照所配置回退延迟重试。 | 1 个操作 \$1 错误有效载荷大小 | 
| 没有剩余重试次数的步骤中出现异常 | 步骤失败并引发异常。如果您的处理程序代码未能捕获此异常，则整个执行过程将会失败。 | 1 个操作 \$1 错误有效载荷大小 | 

当某个步骤需要重试时，SDK 会对重试状态进行检查点处理，并在没有其他工作正在运行的情况下退出 Lambda 调用。这样，SDK 就可以在不消耗计算资源的情况下实施回退延迟。回退期过后，函数将自动恢复。

### 配置步骤重试策略
<a name="durable-step-retry-configuration"></a>

配置重试策略以控制步骤处理失败的方式。您可以指定最大尝试次数、回退间隔和重试条件。

**具有最大尝试次数的指数回退：**

------
#### [ TypeScript ]

```
const result = await context.step('call-api', async () => {
  const response = await fetch('https://api.example.com/data');
  if (!response.ok) throw new Error(`API error: ${response.status}`);
  return await response.json();
}, {
  retryStrategy: (error, attemptCount) => {
    if (attemptCount >= 5) {
      return { shouldRetry: false };
    }
    // Exponential backoff: 2s, 4s, 8s, 16s, 32s (capped at 300s)
    const delay = Math.min(2 * Math.pow(2, attemptCount - 1), 300);
    return { shouldRetry: true, delay: { seconds: delay } };
  }
});
```

------
#### [ Python ]

```
def retry_strategy(error, attempt_count):
    if attempt_count >= 5:
        return {'should_retry': False}
    # Exponential backoff: 2s, 4s, 8s, 16s, 32s (capped at 300s)
    delay = min(2 * (2 ** (attempt_count - 1)), 300)
    return {'should_retry': True, 'delay': delay}

result = context.step(
    lambda _: call_external_api(),
    name='call-api',
    config=StepConfig(retry_strategy=retry_strategy)
)
```

------

**固定间隔回退：**

------
#### [ TypeScript ]

```
const orders = await context.step('query-orders', async () => {
  return await queryDatabase(event.userId);
}, {
  retryStrategy: (error, attemptCount) => {
    if (attemptCount >= 3) {
      return { shouldRetry: false };
    }
    return { shouldRetry: true, delay: { seconds: 5 } };
  }
});
```

------
#### [ Python ]

```
def retry_strategy(error, attempt_count):
    if attempt_count >= 3:
        return {'should_retry': False}
    return {'should_retry': True, 'delay': 5}

orders = context.step(
    lambda _: query_database(event['userId']),
    name='query-orders',
    config=StepConfig(retry_strategy=retry_strategy)
)
```

------

**有条件重试（仅重试特定错误）：**

------
#### [ TypeScript ]

```
const result = await context.step('call-rate-limited-api', async () => {
  const response = await fetch('https://api.example.com/data');
  
  if (response.status === 429) throw new Error('RATE_LIMIT');
  if (response.status === 504) throw new Error('TIMEOUT');
  if (!response.ok) throw new Error(`API_ERROR_${response.status}`);
  
  return await response.json();
}, {
  retryStrategy: (error, attemptCount) => {
    // Only retry rate limits and timeouts
    const isRetryable = error.message === 'RATE_LIMIT' || error.message === 'TIMEOUT';
    
    if (!isRetryable || attemptCount >= 3) {
      return { shouldRetry: false };
    }
    
    // Exponential backoff: 1s, 2s, 4s (capped at 30s)
    const delay = Math.min(Math.pow(2, attemptCount - 1), 30);
    return { shouldRetry: true, delay: { seconds: delay } };
  }
});
```

------
#### [ Python ]

```
def retry_strategy(error, attempt_count):
    # Only retry rate limits and timeouts
    is_retryable = str(error) in ['RATE_LIMIT', 'TIMEOUT']
    
    if not is_retryable or attempt_count >= 3:
        return {'should_retry': False}
    
    # Exponential backoff: 1s, 2s, 4s (capped at 30s)
    delay = min(2 ** (attempt_count - 1), 30)
    return {'should_retry': True, 'delay': delay}

result = context.step(
    lambda _: call_rate_limited_api(),
    name='call-rate-limited-api',
    config=StepConfig(retry_strategy=retry_strategy)
)
```

------

**禁用重试：**

------
#### [ TypeScript ]

```
const isDuplicate = await context.step('check-duplicate', async () => {
  return await checkIfOrderExists(event.orderId);
}, {
  retryStrategy: () => ({ shouldRetry: false })
});
```

------
#### [ Python ]

```
is_duplicate = context.step(
    lambda _: check_if_order_exists(event['orderId']),
    name='check-duplicate',
    config=StepConfig(
        retry_strategy=lambda error, attempt: {'should_retry': False}
    )
)
```

------

当重试策略返回 `shouldRetry: false` 时，步骤会立即失败，无需重试。将此用于那些不应被重试的操作，例如幂等性检查或具有副作用且无法安全重复的操作。

## 步骤之外的异常
<a name="durable-handler-exceptions"></a>

当在处理程序代码中出现未捕获的异常，且该异常未发生在任何步骤之内时，SDK 会将此次执行标记为失败。这能确保您应用程序逻辑中的错误能够被准确地捕获并报告出来。


| 场景 | 发生了什么 | 计量影响 | 
| --- | --- | --- | 
| 在任何步骤之外的处理程序代码中出现异常 | SDK 将执行标记为 FAILED 并返回错误。异常不会被自动重试。 | 错误有效载荷大小 | 

为了使易出错的代码能够自动重试，可以将该代码封装在一个带有重试策略的步骤中。步骤提供自动重试和可配置的回退功能，而步骤之外的代码会立即失败。

## 后端重试
<a name="durable-backend-retries"></a>

当 Lambda 遇到基础设施故障、运行时错误或者 SDK 无法与持久执行服务进行通信时，就会出现后端重试。Lambda 会自动重试这些故障，以帮助持久性函数能够从暂时基础设施问题中恢复。

### 后端重试场景
<a name="durable-backend-retry-scenarios"></a>

当 Lambda 遇到以下情况时，会自动重试您的函数：
+ **内部服务错误**：当 Lambda 或持久执行服务返回 5xx 错误时，表示存在暂时性的服务问题。
+ **节流**：当您的函数因并发限制或服务配额而被限制时。
+ **超时**：当 SDK 在超时时间内无法连接到持久执行服务时。
+ **沙盒初始化失败**：当 Lambda 无法初始化执行环境时。
+ **运行时错误**：当 Lambda 运行时遇到超出您函数代码范围的错误时，例如内存不足错误或进程崩溃等。
+ **无效的检查点令牌错误**：当检查点令牌不再有效时，通常是因为服务端状态发生变化所致。

下表描述了 SDK 如何处理这些场景：


| 场景 | 发生了什么 | 计量影响 | 
| --- | --- | --- | 
| 持久性处理程序之外的运行时错误（OOM、超时、崩溃） | Lambda 会自动重试调用。SDK 会从上一个检查点开始重放，以跳过已完成的步骤。 | 错误有效载荷大小 \$1 每次重试 1 次操作 | 
| 调用 CheckpointDurableExecution / GetDurableExecutionState API 时出现服务错误（5xx）或超时 | Lambda 会自动重试调用。SDK 会从上一个检查点开始重放。 | 错误有效载荷大小 \$1 每次重试 1 次操作 | 
| 调用 CheckpointDurableExecution / GetDurableExecutionState API时出现节流（429）或无效的检查点令牌 | Lambda 会自动通过指数回退重试调用。SDK 会从上一个检查点开始重放。 | 错误有效载荷大小 \$1 每次重试 1 次操作 | 
| 调用 CheckpointDurableExecution / GetDurableExecutionState API 时出现客户端错误（4xx，429 和无效令牌除外） | SDK 将执行标记为 FAILED。由于该错误表明存在永久性问题，所以不会自动重试。 | 错误有效载荷大小 | 

后端重试采用指数回退策略，并持续进行直至函数成功执行或达到执行超时时间。在重放过程中，SDK 跳过已完成的检查点，并从上一次成功的操作处继续执行，从而确保您的函数不会重复执行已完成的工作。

## 重试最佳实践
<a name="durable-retry-best-practices"></a>

在配置重试策略时，请遵循以下最佳实践：
+ **配置明确的重试策略**：不要依赖生产环境中的默认重试行为。为您的使用案例配置明确的重试策略，包括适当的最大重试次数和回退间隔。
+ **使用条件重试**：实施 `shouldRetry` 逻辑，以仅对暂时出现的错误（如速率限制、超时）进行重试，而对于永久性的错误（如验证失败、未找到）则启动快速失效机制。
+ **设置适当的最大尝试次数**：在韧性与执行时间之间取得平衡。尝试次数过多会延缓故障的检测，而尝试次数过少则可能导致不必要的故障发生。
+ **使用指数回退**：指数回退能够减轻下游服务的负载，并提高从暂时性故障中恢复的可能性。
+ **在步骤中封装容易出错的代码**：步骤之外的代码无法自动重试。将外部 API 调用、数据库查询以及其他容易出错的操作封装在具有重试策略的步骤中。
+ **监控重试指标**：在 Amazon CloudWatch 中跟踪步骤重试操作及执行失败情况，以识别模式并优化重试策略。

# 幂等性
<a name="durable-execution-idempotency"></a>

持久性函数通过执行名称为执行启动提供内置的幂等性。当您提供执行名称时，Lambda 会利用该名称来防止重复执行，并确保调用请求能够安全地进行重试。步骤默认具有至少一次执行的语义——在回放过程中，SDK 会返回已检查点保存的结果，而不会重新执行已完成的步骤，但您的业务逻辑必须是幂等性的，以便在完成之前处理可能的重试情况。

**注意**  
Lambda 事件源映射（ESM）在启动时不支持幂等性。因此，每次调用（包括重试）都会启动一个新的持久执行。为确保事件源映射下的执行具有幂等性，可以在您的函数代码中实施幂等性逻辑，例如使用 [Powertools for AWS Lambda](https://docs.aws.amazon.com//powertools/) 来实施，或者使用常规的 Lambda 函数作为代理（调度程序）来调用具有幂等键（执行名称参数）的持久性函数。

## 执行名称
<a name="durable-idempotency-execution-names"></a>

在调用持久性函数时，您可以提供一个执行名称。执行名称充当幂等键，使您能够安全地重试调用请求，而无需创建重复的执行。如果您不提供名称，Lambda 会自动生成一个唯一的执行 ID。

执行名称必须在您的账户和区域内唯一。当您使用已存在的执行名称调用函数时，Lambda 的行为将取决于现有执行的状态以及有效载荷是否与之匹配。

## 幂等性行为
<a name="durable-idempotency-behavior"></a>

以下表格说明了 Lambda 如何根据您是否提供了执行名称、现有执行状态以及有效载荷是否匹配等因素来处理调用请求：


| 场景 | 是否提供了名称？ | 现有执行状态 | 有效载荷相同？ | 行为 | 
| --- | --- | --- | --- | --- | 
| 1 | 否 | 不适用 | 不适用 | 新执行已开始：Lambda 生成一个唯一的执行 ID 并开始新的执行 | 
| 2 | 可以 | 从未存在或保留期已过期 | 不适用 | 新执行已开始：Lambda 使用提供的名称开始新执行 | 
| 3 | 是 | 运行 | 是 | 幂等性启动：Lambda 返回现有的执行信息，而不会启动重复的执行过程。对于同步调用而言，这相当于是重新附加到正在运行的执行 | 
| 4 | 是 | 运行 | 否 | 错误：Lambda 返回 DurableExecutionAlreadyExists 错误，因为具有此名称的执行已经在运行，且具有不同的有效载荷 | 
| 5 | 是 | 已关闭（成功、失败、停止或超时） | 是 | 幂等性启动：Lambda 返回现有的执行信息，而不会启动新执行。将返回已关闭执行的结果 | 
| 6 | 是 | 已关闭（成功、失败、停止或超时） | 否 | 错误：Lambda 返回 DurableExecutionAlreadyExists 错误，因为具有此名称的执行已经在运行，且具有不同的有效载荷 | 

**注意**  
场景 3 和方案 5 展示了幂等性行为，其中 Lambda 能够安全地处理重复的调用请求，它会返回现有的执行信息，而非创建重复项。

## 步骤幂等性
<a name="durable-idempotency-steps"></a>

步骤默认具有至少一次的执行语义。当您的函数在等待、回调或出现故障后重放时，SDK 会将每个步骤与检查点日志进行比对。对于已经完成的步骤，SDK 会返回已进行检查点保存的结果，但不会重新执行该步骤逻辑。但是，如果某个步骤失败或者在该步骤完成之前函数被中断，则该步骤可能会执行多次。

封装在步骤中的业务逻辑必须是幂等性的，以便能够处理可能出现的重试情况。使用幂等键来确保诸如支付或数据库写入等操作仅执行一次，即便该步骤出现重试情况也是如此。

**示例：在步骤中使用幂等键**

------
#### [ TypeScript ]

```
import { withDurableExecution, DurableContext } from '@aws/durable-execution-sdk-js';
import { randomUUID } from 'crypto';

export const handler = withDurableExecution(
  async (event: any, context: DurableContext) => {
    // Generate idempotency key once
    const idempotencyKey = await context.step('generate-key', async () => {
      return randomUUID();
    });
    
    // Use idempotency key in payment API to prevent duplicate charges
    const payment = await context.step('process-payment', async () => {
      return paymentAPI.charge({
        amount: event.amount,
        idempotencyKey: idempotencyKey
      });
    });
    
    return { statusCode: 200, payment };
  }
);
```

------
#### [ Python ]

```
from aws_durable_execution_sdk_python import durable_execution, DurableContext
import uuid

@durable_execution
def handler(event, context: DurableContext):
    # Generate idempotency key once
    idempotency_key = context.step(
        lambda _: str(uuid.uuid4()),
        name='generate-key'
    )
    
    # Use idempotency key in payment API to prevent duplicate charges
    payment = context.step(
        lambda _: payment_api.charge(
            amount=event['amount'],
            idempotency_key=idempotency_key
        ),
        name='process-payment'
    )
    
    return {'statusCode': 200, 'payment': payment}
```

------

您可以通过将执行模式设置为 `AT_MOST_ONCE_PER_RETRY` 来配置采用最多一次执行语义的步骤。这确保了该步骤在每次重试尝试中最多执行一次，但如果在步骤完成之前函数被中断，则该步骤可能根本不会执行。

SDK 通过验证该步骤名称和订单是否与回放期间的检查点日志相匹配，来实施确定性重放。如果您的代码试图按照不同的顺序或使用不同的名称来执行这些步骤，则 SDK 会抛出 `NonDeterministicExecutionError`。

**重放如何与完成的步骤结合使用：**

1. 第一次调用：函数执行步骤 A，创建检查点，然后等待

1. 第二次调用（等待后）：函数从头开始重放，步骤 A 立即返回检查点结果，但不会重新执行，然后继续执行步骤 B

1. 第二次调用（等待后）：函数从头开始重放，步骤 A 和 B 立即返回检查点结果，然后继续执行步骤 C

这种回放机制确保已完成的步骤不会再次执行，但您的业务逻辑仍必须是“幂等性”的，以便在完成之前处理重试情况。

# 测试 Lambda 持久性函数
<a name="durable-testing"></a>

AWS 为持久性函数提供专门的测试 SDK，使您能够在本地和云端运行和检查执行情况。安装适用于您的语言的测试 SDK：

------
#### [ TypeScript ]

```
npm install --save-dev @aws/aws-durable-execution-sdk-js-testing
```

有关完整的 API 文档和示例，请参阅 GitHub 上的 [TypeScript 测试 SDK](https://github.com/aws/aws-durable-execution-sdk-js/tree/development/packages/aws-durable-execution-sdk-js-testing)。

------
#### [ Python ]

```
pip install aws-durable-execution-sdk-python-testing
```

有关完整的 API 文档和示例，请参阅 GitHub 上的 [Python 测试 SDK](https://github.com/aws/aws-durable-execution-sdk-python-testing)。

------

测试 SDK 提供两种测试模式：一种是用于快速单元测试的本地测试模式，另一种是用于针对已部署函数进行集成测试的云测试模式。

## 本地测试
<a name="durable-local-testing"></a>

本地测试可在您的开发环境中运行您的持久性函数，无需使用已部署的资源。测试运行器会直接运行您的函数代码，并记录所有操作以便进行检查。

将本地测试用于单元测试、测试驱动型开发和 CI/CD 管道。测试在本地运行，没有网络延迟，不会产生额外费用。

**示例测试：**

------
#### [ TypeScript ]

```
import { withDurableExecution } from '@aws/aws-durable-execution-sdk-js';
import { DurableFunctionTestRunner } from '@aws/aws-durable-execution-sdk-js-testing';

const handler = withDurableExecution(async (event, context) => {
  const result = await context.step('calculate', async () => {
    return event.a + event.b;
  });
  return result;
});

test('addition works correctly', async () => {
  const runner = new DurableFunctionTestRunner({ handler });
  const result = await runner.run({ a: 5, b: 3 });
  
  expect(result.status).toBe('SUCCEEDED');
  expect(result.result).toBe(8);
  
  const step = result.getStep('calculate');
  expect(step.result).toBe(8);
});
```

------
#### [ Python ]

```
from aws_durable_execution_sdk_python import durable_execution, DurableContext
from aws_durable_execution_sdk_python_testing import DurableFunctionTestRunner
from aws_durable_execution_sdk_python.execution import InvocationStatus

@durable_execution
def handler(event: dict, context: DurableContext) -> int:
    result = context.step(lambda _: event["a"] + event["b"], name="calculate")
    return result

def test_addition():
    runner = DurableFunctionTestRunner(handler=handler)
    with runner:
        result = runner.run(input={"a": 5, "b": 3}, timeout=10)
    
    assert result.status is InvocationStatus.SUCCEEDED
    assert result.result == 8
    
    step = result.get_step("calculate")
    assert step.result == 8
```

------

测试运行器会捕获执行状态，包括最终结果、各个步骤的结果、等待操作、回调以及任何错误信息。您可以按名称来检查操作，也可以遍历所有操作以验证执行行为。

### 执行存储
<a name="durable-execution-stores"></a>

测试 SDK 使用执行存储来保存测试执行数据。默认情况下，测试使用快速且无需清理操作的内存存储。为了调试或分析执行历史记录，您可以使用将执行保存为 JSON 文件的文件系统存储。

**内存存储（默认）：**

内存存储在测试运行期间将执行数据保存在内存中。测试完成后数据就会丢失，因此它非常适合用于标准单元测试以及 CI/CD 管道，在这些流程中，您无需在测试结束后检查执行结果。

**文件系统存储：**

文件系统存储会将执行数据以 JSON 文件的形式保存到磁盘上。每次执行都会被保存在单独的文件中，这样在测试完成后便于查看执行历史记录。在调试复杂的测试失败情况或分析长期的执行模式时，请使用文件系统存储功能。

使用环境变量配置存储：

```
# Use filesystem store
export AWS_DEX_STORE_TYPE=filesystem
export AWS_DEX_STORE_PATH=./test-executions

# Run tests
pytest tests/
```

执行文件以经过清理处理的名称进行存储，并包含完整的执行状态，包括操作、检查点和结果。文件系统存储会自动创建存储目录（如果该目录不存在）。

## 云端测试
<a name="durable-cloud-testing"></a>

云测试会调用 AWS 中的已部署持久性函数，并通过 Lambda API 获取其执行历史记录。利用云测试来在类似生产环境的条件下，通过真实的 AWS 服务和配置来验证其行为。

云测试需要已部署的函数以及具有调用函数和读取执行历史记录权限的 AWS 凭证：

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction",
                "lambda:GetDurableExecution",
                "lambda:GetDurableExecutionHistory"
            ],
            "Resource": "arn:aws:lambda:region:account-id:function:function-name"
        }
    ]
}
```

**云测试示例：**

------
#### [ TypeScript ]

```
import { DurableFunctionCloudTestRunner } from '@aws/aws-durable-execution-sdk-js-testing';

test('deployed function processes orders', async () => {
  const runner = new DurableFunctionCloudTestRunner({
    functionName: 'order-processor',
    region: 'us-east-1'
  });
  
  const result = await runner.run({ orderId: 'order-123' });
  
  expect(result.status).toBe('SUCCEEDED');
  expect(result.result.status).toBe('completed');
});
```

------
#### [ Python ]

```
from aws_durable_execution_sdk_python_testing import (
    DurableFunctionCloudTestRunner,
    DurableFunctionCloudTestRunnerConfig
)

def test_deployed_function():
    config = DurableFunctionCloudTestRunnerConfig(
        function_name="order-processor",
        region="us-east-1"
    )
    runner = DurableFunctionCloudTestRunner(config=config)
    
    result = runner.run(input={"orderId": "order-123"})
    
    assert result.status is InvocationStatus.SUCCEEDED
    assert result.result["status"] == "completed"
```

------

云测试调用实际部署的函数并从 AWS 中检索执行历史记录。这使您可以验证与其他 AWS 服务的集成，检验性能特征，并使用与生产环境类似的数据和配置进行测试。

## 测试内容
<a name="durable-testing-patterns"></a>

通过验证执行结果、操作行为和错误处理来测试持久性函数。要注重业务逻辑的正确性，而非实施细节。

**验证执行结果：**检查函数是否返回给定输入的预期值。同时测试成功执行和错误情况，以确保函数能够正确处理无效输入。

**检查操作执行情况：**验证步骤、等待和回调是否按预期执行。检查步骤结果，以确保中间操作能得出正确的值。验证等待操作已按照适当的超时设置进行配置，并且回调函数是按照正确的设置创建的。

**测试错误处理：**当输入无效数据时，验证函数会正确地返回描述性的错误信息。通过模拟暂时性故障并确认操作能够正确地进行重试来测试重试行为。检查并确认永久性故障不会引发不必要的重试操作。

**验证工作流程：**对于多步骤工作流程，请确认操作按正确的顺序执行。测试条件分支以确保不同的执行路径能够正确运行。验证并行操作能够同时执行，并产生预期的结果。

SDK 文档存储库中包含大量关于测试模式的示例，包括多步骤工作流程、错误场景、超时处理以及轮询模式等。

## 测试策略
<a name="durable-testing-strategy"></a>

在开发过程以及 CI/CD 管道中，使用本地测试来执行单元测试。本地测试运行速度快，不需要 AWS 凭证，并且能即时反馈代码变更情况。编写本地测试以验证业务逻辑、错误处理以及操作行为。

在部署到生产环境之前，请使用云测试进行集成测试。云测试通过真实的 AWS 服务和配置验证其行为，验证性能特征，并测试端到端的工作流程。在暂存环境中运行云测试，以便在问题影响到生产环境之前就捕获集成问题。

在本地测试中模拟外部依赖项，以隔离函数逻辑并保持测试速度。使用云测试来验证与外部服务（如数据库、API 和其他 AWS 服务）的实际集成。

编写有针对性的测试，以验证单一特定行为。使用描述性的测试名称，以说明正在测试的内容。将相关测试组合在一起，并使用测试夹具来编写通用的设置代码。保持测试简单，避免使用难以理解的复杂测试逻辑。

## 调试失败
<a name="durable-testing-debugging"></a>

当测试失败时，检查执行结果以了解问题出在哪里。查看执行状态，以确定函数是成功、失败还是超时。阅读错误消息以了解失败原因。

检查各项操作结果，以找出行为与预期不符的地方。检查步骤结果，以查看产生了哪些值。验证操作顺序，以确认操作按照预期的顺序进行。对操作进行计数，以确保创建了适当数量的步骤、等待和回调。

常见的问题包括：非确定性代码在重放时会产生不同的结果；通过全局变量实现的共享状态在重放时会失效；以及由于条件逻辑错误而导致的操作缺失。使用标准的调试器和日志记录功能逐行执行函数代码并追踪执行流程。

对于云端测试，可在 CloudWatch Logs 中检查执行历史记录，以查看详细的操作日志。利用跟踪功能来跟踪跨服务的执行流程，并找出瓶颈所在。

# 监控持久性函数
<a name="durable-monitoring"></a>

您可以使用 CloudWatch 指标、CloudWatch Logs 和跟踪来监控您的持久性函数。由于持久性函数可以长时间运行并跨越多个函数调用，因此监控它们需要了解它们独特的执行模式，包括检查点、状态转换和重放行为。

## CloudWatch 指标
<a name="durable-monitoring-metrics"></a>

Lambda 自动向 CloudWatch 发布指标，无需额外收费。持久性函数提供标准 Lambda 指标之外的额外指标，以帮助您监控长时间运行的工作流、状态管理和资源利用率。

### 持久执行指标
<a name="durable-monitoring-execution-metrics"></a>

Lambda 为持久执行发出以下指标：


| 指标 | 说明 | 
| --- | --- | 
| ApproximateRunningDurableExecutions | 处于 RUNNING 状态的持久执行次数 | 
| ApproximateRunningDurableExecutionsUtilization | 您的账户当前正在使用的最大运行持久执行数配额百分比。 | 
| DurableExecutionDuration | 持久执行在 RUNNING 状态所经历的总耗时（以毫秒为单位）。 | 
| DurableExecutionStarted | 启动的持久执行数 | 
| DurableExecutionStopped | 使用 StopDurableExecution API 停止的持久执行次数 | 
| DurableExecutionSucceeded | 成功完成的持久执行次数 | 
| DurableExecutionFailed | 以失败状态完成的持久执行次数 | 
| DurableExecutionTimedOut | 超过其配置执行超时时间的持久执行次数 | 
| DurableExecutionOperations | 持久执行内执行的操作累计数量（最大数：3000） | 
| DurableExecutionStorageWrittenBytes | 持久执行所保留的累积数据量（以字节为单位）（最大值：100 MB） | 

### CloudWatch 指标
<a name="durable-monitoring-standard-metrics"></a>

Lambda 发出持久性函数的标准调用、性能和并发指标。由于持久执行在通过检查点和重放的过程中会跨越多个函数调用，因此这些指标的行为与标准函数不同：
+ **调用次数：**统计每次函数调用，包括重放次数。单次持久执行可以生成多个调用数据点。
+ **持续时间：**分别测量每次函数调用。使用 `DurableExecutionDuration` 计算一次持久执行所花费的总时间。
+ **错误：**跟踪函数调用失败次数。将 `DurableExecutionFailed` 用于执行级别的失败。

有关标准 Lambda 指标的完整列表，请参阅 [Lambda 函数的指标类型](https://docs.aws.amazon.com//lambda/latest/dg/monitoring-metrics-types.html)。

### 创建 CloudWatch 告警
<a name="durable-monitoring-alarms"></a>

创建 CloudWatch 警报，以便在指标超过阈值时通知您。常见的警报包括：
+ `ApproximateRunningDurableExecutionsUtilization` 超过配额的 80%
+ `DurableExecutionFailed` 增加到阈值以上
+ `DurableExecutionTimedOut` 表示执行超时
+ `DurableExecutionStorageWrittenBytes` 接近存储限制

有关更多信息，请参阅[使用 CloudWatch 警报](https://docs.aws.amazon.com//AmazonCloudWatch/latest/monitoring/AlarmThatSendsEmail.html)。

## EventBridge 事件
<a name="durable-monitoring-eventbridge"></a>

Lambda 将持久执行状态变更事件发布到 EventBridge。您可以使用这些事件来触发工作流、发送通知，或跟踪您所有持久性函数的执行生命周期变更。

### 持久执行状态变更事件
<a name="durable-eventbridge-status-changes"></a>

Lambda 在每次持久执行状态发生变化时，都会向 EventBridge 发送一个事件。这些事件具有以下特征：
+ **源**：`aws.lambda`
+ **详细信息类型：**`Durable Execution Status Change`

状态变更事件会在以下执行状态下发布：
+ `RUNNING`：执行已启动
+ `SUCCEEDED`：执行成功完成
+ `STOPPED`：使用 StopDurableExecution API 停止的执行
+ `FAILED`：执行失败，出现错误
+ `TIMED_OUT`：执行超过了配置的超时时间

以下示例显示了持久执行状态变更事件：

```
{
  "version": "0",
  "id": "d019b03c-a8a3-9d58-85de-241e96206538",
  "detail-type": "Durable Execution Status Change",
  "source": "aws.lambda",
  "account": "123456789012",
  "time": "2025-11-20T13:08:22Z",
  "region": "us-east-1",
  "resources": [],
  "detail": {
    "durableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function:$LATEST/durable-execution/090c4189-b18b-4296-9d0c-cfd01dc3a122/9f7d84c9-ea3d-3ffc-b3e5-5ec51c34ffc9",
    "durableExecutionName": "order-123",
    "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function:2",
    "status": "RUNNING",
    "startTimestamp": "2025-11-20T13:08:22.345Z"
  }
}
```

对于终端状态（`SUCCEEDED`、`STOPPED`、`FAILED`、`TIMED_OUT`），该事件包括一个指示执行何时完成的 `endTimestamp` 字段。

### 创建 EventBridge 规则
<a name="durable-eventbridge-rules"></a>

创建规则，将持久执行状态变更事件路由到 Amazon Simple Notification Service、Amazon Simple Queue Service 或其他 Lambda 函数等目标。

以下示例创建了一条规则，用于匹配所有持久执行状态变更：

```
{
  "source": ["aws.lambda"],
  "detail-type": ["Durable Execution Status Change"]
}
```

以下示例创建了一条规则，以仅匹配失败的执行：

```
{
  "source": ["aws.lambda"],
  "detail-type": ["Durable Execution Status Change"],
  "detail": {
    "status": ["FAILED"]
  }
}
```

以下示例创建了一条规则，用于匹配特定函数的状态变更：

```
{
  "source": ["aws.lambda"],
  "detail-type": ["Durable Execution Status Change"],
  "detail": {
    "functionArn": [{
      "prefix": "arn:aws:lambda:us-east-1:123456789012:function:my-function"
    }]
  }
}
```

有关创建规则的更多信息，请参阅《EventBridge 用户指南》中的 [Amazon EventBridge 教程](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-tutorial.html)。

## AWS X-Ray 跟踪
<a name="durable-monitoring-xray"></a>

您可以在持久性函数上启用 X-Ray 跟踪。Lambda 会将 X-Ray 跟踪标头传递给持久执行，从而允许您跨工作流跟踪请求。

要使用 Lambda 控制台启用 X-Ray; 跟踪，请选择您的函数，然后选择配置、监控和操作工具，并在 X-Ray 下开启主动跟踪。

要使用 AWS CLI 启用 X-Ray 跟踪：

```
aws lambda update-function-configuration \
    --function-name my-durable-function \
    --tracing-config Mode=Active
```

要使用 AWS SAM 启用 AWS X-Ray 跟踪：

```
Resources:
  MyDurableFunction:
    Type: AWS::Serverless::Function
    Properties:
      Tracing: Active
      DurableConfig:
        ExecutionTimeout: 3600
```

有关 X-Ray 的更多信息，[请参阅《AWS X-Ray 开发人员指南》](https://docs.aws.amazon.com//xray/latest/devguide/aws-xray.html)。

# Lambda 持久性函数的最佳实践
<a name="durable-best-practices"></a>

持久性函数使用基于重放的执行模型，该模型需要的模式与传统 Lambda 函数不同。遵循这些最佳实践，以构建可靠、经济高效的工作流。

## 编写确定性代码
<a name="durable-determinism"></a>

在重放期间，您的函数将从头开始运行，并且必须遵循与原始运行相同的执行路径。持久操作之外的代码必须是确定性的，以在输入相同的情况下产生相同的结果。

**将非确定性操作封装在多个步骤中：**
+ 随机数生成和 UUID
+ 当前时间或时间戳
+ 外部 API 调用和数据库查询
+ 文件系统操作

------
#### [ TypeScript ]

```
import { withDurableExecution, DurableContext } from '@aws/durable-execution-sdk-js';
import { randomUUID } from 'crypto';

export const handler = withDurableExecution(
  async (event: any, context: DurableContext) => {
    // Generate transaction ID inside a step
    const transactionId = await context.step('generate-transaction-id', async () => {
      return randomUUID();
    });
    
    // Use the same ID throughout execution, even during replay
    const payment = await context.step('process-payment', async () => {
      return processPayment(event.amount, transactionId);
    });
    
    return { statusCode: 200, transactionId, payment };
  }
);
```

------
#### [ Python ]

```
from aws_durable_execution_sdk_python import durable_execution, DurableContext
import uuid

@durable_execution
def handler(event, context: DurableContext):
    # Generate transaction ID inside a step
    transaction_id = context.step(
        lambda _: str(uuid.uuid4()),
        name='generate-transaction-id'
    )
    
    # Use the same ID throughout execution, even during replay
    payment = context.step(
        lambda _: process_payment(event['amount'], transaction_id),
        name='process-payment'
    )
    
    return {'statusCode': 200, 'transactionId': transaction_id, 'payment': payment}
```

------

**重要提示**  
不要使用全局变量或闭包来在各个步骤之间共享状态。通过返回值传递数据。全局状态在重放期间中断，因为步骤返回了缓存的结果，但全局变量重置。

**避免闭包突变：**在闭包中捕获的变量可能会在重放期间丢失突变。步骤会返回缓存的结果，但步骤之外的变量更新不会重放。

------
#### [ TypeScript ]

```
// ❌ WRONG: Mutations lost on replay
export const handler = withDurableExecution(async (event, context) => {
  let total = 0;
  
  for (const item of items) {
    await context.step(async () => {
      total += item.price; // ⚠️ Mutation lost on replay!
      return saveItem(item);
    });
  }
  
  return { total }; // Inconsistent value!
});

// ✅ CORRECT: Accumulate with return values
export const handler = withDurableExecution(async (event, context) => {
  let total = 0;
  
  for (const item of items) {
    total = await context.step(async () => {
      const newTotal = total + item.price;
      await saveItem(item);
      return newTotal; // Return updated value
    });
  }
  
  return { total }; // Consistent!
});

// ✅ EVEN BETTER: Use map for parallel processing
export const handler = withDurableExecution(async (event, context) => {
  const results = await context.map(
    items,
    async (ctx, item) => {
      await ctx.step(async () => saveItem(item));
      return item.price;
    }
  );
  
  const total = results.getResults().reduce((sum, price) => sum + price, 0);
  return { total };
});
```

------
#### [ Python ]

```
# ❌ WRONG: Mutations lost on replay
@durable_execution
def handler(event, context: DurableContext):
    total = 0
    
    for item in items:
        context.step(
            lambda _: save_item_and_mutate(item, total),  # ⚠️ Mutation lost on replay!
            name=f'save-item-{item["id"]}'
        )
    
    return {'total': total}  # Inconsistent value!

# ✅ CORRECT: Accumulate with return values
@durable_execution
def handler(event, context: DurableContext):
    total = 0
    
    for item in items:
        total = context.step(
            lambda _: save_item_and_return_total(item, total),
            name=f'save-item-{item["id"]}'
        )
    
    return {'total': total}  # Consistent!

# ✅ EVEN BETTER: Use map for parallel processing
@durable_execution
def handler(event, context: DurableContext):
    def process_item(ctx, item):
        ctx.step(lambda _: save_item(item))
        return item['price']
    
    results = context.map(items, process_item)
    total = sum(results.get_results())
    
    return {'total': total}
```

------

## 进行幂等性设计
<a name="durable-idempotency"></a>

由于重试或重放，操作可能会执行多次。非幂等性操作会导致重复的副作用，例如向客户收取两次费用或发送多封电子邮件。

**使用幂等性令牌：**在步骤中生成令牌，并将其与外部 API 调用结合使用，以防止重复操作。

------
#### [ TypeScript ]

```
import { withDurableExecution, DurableContext } from '@aws/durable-execution-sdk-js';

export const handler = withDurableExecution(
  async (event: any, context: DurableContext) => {
    // Generate idempotency token once
    const idempotencyToken = await context.step('generate-idempotency-token', async () => {
      return crypto.randomUUID();
    });
    
    // Use token to prevent duplicate charges
    const charge = await context.step('charge-payment', async () => {
      return paymentService.charge({
        amount: event.amount,
        cardToken: event.cardToken,
        idempotencyKey: idempotencyToken
      });
    });
    
    return { statusCode: 200, charge };
  }
);
```

------
#### [ Python ]

```
from aws_durable_execution_sdk_python import durable_execution, DurableContext
import uuid

@durable_execution
def handler(event, context: DurableContext):
    # Generate idempotency token once
    idempotency_token = context.step(
        lambda _: str(uuid.uuid4()),
        name='generate-idempotency-token'
    )
    
    # Use token to prevent duplicate charges
    def charge_payment(_):
        return payment_service.charge(
            amount=event['amount'],
            card_token=event['cardToken'],
            idempotency_key=idempotency_token
        )
    
    charge = context.step(charge_payment, name='charge-payment')
    
    return {'statusCode': 200, 'charge': charge}
```

------

**使用最多一次语义：**对于绝不能重复的关键操作（财务交易、库存扣减），请配置最多一次的执行模式。

------
#### [ TypeScript ]

```
// Critical operation that must not duplicate
await context.step('deduct-inventory', async () => {
  return inventoryService.deduct(event.productId, event.quantity);
}, {
  executionMode: 'AT_MOST_ONCE_PER_RETRY'
});
```

------
#### [ Python ]

```
# Critical operation that must not duplicate
context.step(
    lambda _: inventory_service.deduct(event['productId'], event['quantity']),
    name='deduct-inventory',
    config=StepConfig(execution_mode='AT_MOST_ONCE_PER_RETRY')
)
```

------

**数据库幂等性：**使用写入前检查模式、条件更新或更新插入操作来防止出现重复记录。

## 高效管理状态
<a name="durable-state-management"></a>

每个检查点都会将状态保存到持久存储中。大型状态对象会增加成本、减慢检查点创建速度，并影响性能。仅存储重要的工作流协调数据。

**尽量精简状态数据：**
+ 存储 ID 和引用，而不是完整对象
+ 根据需要在步骤内获取详细数据
+ 使用 Amazon S3 或 DynamoDB 处理大型数据，在状态中传递引用
+ 避免在各步骤之间传递大量有效载荷

------
#### [ TypeScript ]

```
import { withDurableExecution, DurableContext } from '@aws/durable-execution-sdk-js';

export const handler = withDurableExecution(
  async (event: any, context: DurableContext) => {
    // Store only the order ID, not the full order object
    const orderId = event.orderId;
    
    // Fetch data within each step as needed
    await context.step('validate-order', async () => {
      const order = await orderService.getOrder(orderId);
      return validateOrder(order);
    });
    
    await context.step('process-payment', async () => {
      const order = await orderService.getOrder(orderId);
      return processPayment(order);
    });
    
    return { statusCode: 200, orderId };
  }
);
```

------
#### [ Python ]

```
from aws_durable_execution_sdk_python import durable_execution, DurableContext

@durable_execution
def handler(event, context: DurableContext):
    # Store only the order ID, not the full order object
    order_id = event['orderId']
    
    # Fetch data within each step as needed
    context.step(
        lambda _: validate_order(order_service.get_order(order_id)),
        name='validate-order'
    )
    
    context.step(
        lambda _: process_payment(order_service.get_order(order_id)),
        name='process-payment'
    )
    
    return {'statusCode': 200, 'orderId': order_id}
```

------

## 设计有效的步骤
<a name="durable-step-design"></a>

步骤是持久性函数中的基本工作单位。精心设计的步骤使工作流程更易于理解、调试和维护。

**步骤设计原则：**
+ **使用描述性名称**：使用类似 `validate-order` 的名称而非 `step1`，会让日志和错误更容易理解
+ **保持名称固定不变**：不要使用带有时间戳或随机值的动态名称。步骤名称必须是确定性的，以便进行重放
+ **合理控制步骤粒度**：将复杂的操作分解为重点明确的步骤，但要避免过细的小步骤，因为这些小步骤会增加检查点的开销
+ **对相关操作进行分组**：那些应该同时成功或失败的操作应归入同一步骤中

## 高效使用等待操作
<a name="durable-wait-operations"></a>

等待操作会暂停执行，但不会消耗资源或产生费用。使用它们而不是让 Lambda 保持运行。

**基于时间的等待：**将 `context.wait()` 用于延迟，而不是 `setTimeout` 或 `sleep`。

**外部回调：**在等待外部系统时使用 `context.waitForCallback()`。请务必设置超时，以避免出现无限制的等待情况。

**轮询：**将 `context.waitForCondition()` 与指数回退一起使用，在不给外部服务造成过大负担的情况下对其进行轮询。

------
#### [ TypeScript ]

```
// Wait 24 hours without cost
await context.wait({ seconds: 86400 });

// Wait for external callback with timeout
const result = await context.waitForCallback(
  'external-job',
  async (callbackId) => {
    await externalService.submitJob({
      data: event.data,
      webhookUrl: `https://api.example.com/callbacks/${callbackId}`
    });
  },
  { timeout: { seconds: 3600 } }
);
```

------
#### [ Python ]

```
# Wait 24 hours without cost
context.wait(86400)

# Wait for external callback with timeout
result = context.wait_for_callback(
    lambda callback_id: external_service.submit_job(
        data=event['data'],
        webhook_url=f'https://api.example.com/callbacks/{callback_id}'
    ),
    name='external-job',
    config=WaitForCallbackConfig(timeout_seconds=3600)
)
```

------

## 其他注意事项
<a name="durable-additional-considerations"></a>

**错误处理：**重试网络超时和速率限制这类暂时出现的故障。不要对永久性故障进行重试操作，例如输入无效或身份验证错误。使用适当的最大尝试次数和回退率配置重试策略。有关详细示例，请参阅[错误处理和重试](durable-execution-sdk-retries.md)。

**性能：**通过存储引用而不是完整有效载荷，最大限度地减少检查点的大小。使用 `context.parallel()` 和 `context.map()` 来同时执行独立的操作。批量执行相关操作以减少检查点开销。

**版本控制：**通过使用版本号或别名来调用函数，以便将执行操作固定在特定的代码版本上。确保新的代码版本可以处理旧版本的状态。不要以中断重放的方式重命名步骤或更改其行为。

**序列化：**使用与 JSON 兼容的类型来处理操作的输入和结果。将日期转换为 ISO 字符串，将自定义对象转换为普通对象，然后再将其传递给持久操作。

**监控：**启用带有执行 ID 和步骤名称的结构化日志记录。设置 CloudWatch 警报，以了解错误率和执行持续时间。使用跟踪来识别瓶颈。有关详细指导，请参阅[监控和调试](durable-monitoring.md)。

**测试：**测试成功路径、错误处理和重放行为。测试回调和等待的超时场景。使用本地测试来缩短迭代时间。有关详细指导，请参阅[测试持久性函数](durable-testing.md)。

**需要避免的常见错误：**不要嵌套 `context.step()` 调用，使用子上下文替代之。将非确定性操作封装在多个步骤中。始终为回调设置超时。在保持步骤粒度的同时减少检查点开销。将引用信息而非大型对象存储在状态中。

## 其他资源
<a name="durable-additional-resources"></a>
+ [Python SDK 文档](https://github.com/aws/aws-durable-execution-sdk-python/tree/main/docs)：完整的 API 参考、测试模式和高级示例
+ [TypeScript SDK 文档](https://github.com/aws/aws-durable-execution-sdk-js/tree/main/docs)：完整的 API 参考、测试模式和高级示例