重试失败的活动 - AWS Flow Framework 适用于 Java

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

重试失败的活动

活动有时会由于临时原因而失败,例如,连接临时中断。在其他时间,活动可能会成功,因此,处理活动失败的相应方法通常是重试活动,可能会重试很多次。

可以使用几种不同的策略重试活动;最佳的策略取决于您的工作流程详细信息。这些策略分为三种基本类别:

  • 重试直到成功策略就是不断重试活动,直到完成为止。

  • 指数重试策略以指数方式增加重试尝试的时间间隔,直到活动完成或该过程达到指定的停止点,如最大尝试次数。

  • 自定义重试策略确定在每个失败尝试后是否或如何重试活动。

以下几节介绍了如何实施这些策略。示例工作流程工作线程均使用单个活动 (unreliableActivity),它随机执行以下操作之一:

  • 立即完成

  • 有意超过超时值而失败

  • 有意引发 IllegalStateException 而失败

重试直到成功策略

最简单的重试策略是在每次失败时不断重试活动,直到最终成功。基本模式如下:

  1. 在工作流程的入口点方法中实现施嵌套的 TryCatchTryCatchFinally 类。

  2. doTry 中执行活动

  3. 如果活动失败,该框架调用 doCatch,这会再次运行入口点方法。

  4. 重复步骤 2-3,直到活动成功完成。

以下工作流程实施重试直到成功策略。工作流程接口是在 RetryActivityRecipeWorkflow 中实现的,并具有一个方法 (runUnreliableActivityTillSuccess),它是工作流程的入口点。工作流程工作线程是在 RetryActivityRecipeWorkflowImpl 中实现的,如下所示:

public class RetryActivityRecipeWorkflowImpl implements RetryActivityRecipeWorkflow { @Override public void runUnreliableActivityTillSuccess() { final Settable<Boolean> retryActivity = new Settable<Boolean>(); new TryCatch() { @Override protected void doTry() throws Throwable { Promise<Void> activityRanSuccessfully = client.unreliableActivity(); setRetryActivityToFalse(activityRanSuccessfully, retryActivity); } @Override protected void doCatch(Throwable e) throws Throwable { retryActivity.set(true); } }; restartRunUnreliableActivityTillSuccess(retryActivity); } @Asynchronous private void setRetryActivityToFalse( Promise<Void> activityRanSuccessfully, @NoWait Settable<Boolean> retryActivity) { retryActivity.set(false); } @Asynchronous private void restartRunUnreliableActivityTillSuccess( Settable<Boolean> retryActivity) { if (retryActivity.get()) { runUnreliableActivityTillSuccess(); } } }

工作流程的工作方式如下所示:

  1. runUnreliableActivityTillSuccess 创建一个名为 retryActivitySettable<Boolean> 对象,它用于指示活动是否失败并应进行重试。Settable<T> 派生自 Promise<T>,它的工作方式基本相同,但您手动设置 Settable<T> 对象的值。

  2. runUnreliableActivityTillSuccess 实现一个匿名的嵌套 TryCatch 类以处理 unreliableActivity 活动引发的任何异常。有关如何处理异步代码引发的异常的详细讨论,请参阅错误处理

  3. doTry 执行 unreliableActivity 活动,它返回一个名为 activityRanSuccessfullyPromise<Void> 对象。

  4. doTry 调用异步 setRetryActivityToFalse 方法,它使用两个参数:

    • activityRanSuccessfully 使用 unreliableActivity 活动返回的 Promise<Void> 对象。

    • retryActivity 使用 retryActivity 对象。

    如果 unreliableActivity 完成,activityRanSuccessfully 将变为就绪状态 setRetryActivityToFalse 并将 retryActivity 设置为 false。否则,activityRanSuccessfully 从不变为就绪状态并且 setRetryActivityToFalse 不执行。

  5. 如果 unreliableActivity 引发异常,该框架将调用 doCatch 并为其传递异常对象。doCatchretryActivity 设置为 true。

  6. runUnreliableActivityTillSuccess 调用异步 restartRunUnreliableActivityTillSuccess 方法并为其传递 retryActivity 对象。由于 retryActivity 具有 Promise<T> 类型,restartRunUnreliableActivityTillSuccess 将推迟执行,直到 retryActivity 准备就绪,在 TryCatch 完成后将变为该状态。

  7. retryActivity 准备就绪时,restartRunUnreliableActivityTillSuccess 提取该值。

    • 如果值为 false,则重试成功。restartRunUnreliableActivityTillSuccess 不执行任何操作,并终止重试序列。

    • 如果值为 true,则重试失败。restartRunUnreliableActivityTillSuccess 调用 runUnreliableActivityTillSuccess 以再次执行活动。

  8. 重复步骤 1-7,直到 unreliableActivity 完成。

注意

doCatch 不处理异常;它仅将 retryActivity 对象设置为 true 以指示活动失败。重试是由异步 restartRunUnreliableActivityTillSuccess 方法处理的,它推迟执行,直到 TryCatch 完成。使用这种方法的原因是,如果在 doCatch 中重试活动,则无法取消该活动。如果在 restartRunUnreliableActivityTillSuccess 中重试活动,则可以执行可取消的活动。

指数重试策略

在使用指数重试策略时,该框架在指定的时间段 (N 秒) 后再次执行失败的活动。如果该尝试失败,该框架在 2N 秒后再次执行该活动,然后在 4N 秒后再次执行该活动,依此类推。由于等待时间可能会变得很长,您通常会在某个时间点停止重试尝试,而不是无限期继续执行。

该框架提供三种方法以实施指数重试策略:

  • @ExponentialRetry 注释是最简单的方法,但您必须在编译时设置重试配置选项。

  • RetryDecorator 类允许您在运行时设置重试配置,并根据需要对其进行更改。

  • AsyncRetryingExecutor 类允许您在运行时设置重试配置,并根据需要对其进行更改。此外,该框架调用用户实现的 AsyncRunnable.run 方法以运行每个重试尝试。

所有方法都支持以下配置选项,其中时间值以秒为单位:

  • 初始重试等待时间。

  • 退避系数,它用于计算重试间隔,如下所示:

    retryInterval = initialRetryIntervalSeconds * Math.pow(backoffCoefficient, numberOfTries - 2)

    默认值是 2.0。

  • 最大重试次数。默认值为无限制。

  • 最大重试间隔。默认值为无限制。

  • 过期时间。在重试过程的总持续时间超过该值时,重试尝试将停止。默认值为无限制。

  • 将触发重试过程的异常。默认情况下,每个异常都会触发重试过程。

  • 不会触发重试尝试的异常。默认情况下,不会排除任何异常。

以下几节介绍了可实施指数重试策略的各种方法。

使用 @ExponentialRetry 的指数重试

为活动实施指数重试策略的最简单方法是,在接口定义中将 @ExponentialRetry 注释应用于活动。如果活动失败,该框架根据指定的选项值自动处理重试过程。基本模式如下:

  1. @ExponentialRetry 应用于相应的活动并指定重试配置。

  2. 如果注释的活动失败,该框架根据注释的参数指定的配置自动重试活动。

ExponentialRetryAnnotationWorkflow 工作流程工作线程使用 @ExponentialRetry 注释实施指数重试策略。它使用在 ExponentialRetryAnnotationActivities 中实现接口定义的 unreliableActivity 活动,如下所示:

@Activities(version = "1.0") @ActivityRegistrationOptions( defaultTaskScheduleToStartTimeoutSeconds = 30, defaultTaskStartToCloseTimeoutSeconds = 30) public interface ExponentialRetryAnnotationActivities { @ExponentialRetry( initialRetryIntervalSeconds = 5, maximumAttempts = 5, exceptionsToRetry = IllegalStateException.class) public void unreliableActivity(); }

@ExponentialRetry 选项指定以下策略:

  • 仅在活动引发 IllegalStateException 时重试。

  • 使用 5 秒初始等待时间。

  • 不超过 5 次重试尝试。

工作流程接口是在 RetryWorkflow 中实现的,并具有一个方法 (process),它是工作流程的入口点。工作流程工作线程是在 ExponentialRetryAnnotationWorkflowImpl 中实现的,如下所示:

public class ExponentialRetryAnnotationWorkflowImpl implements RetryWorkflow { public void process() { handleUnreliableActivity(); } public void handleUnreliableActivity() { client.unreliableActivity(); } }

工作流程的工作方式如下所示:

  1. process 运行同步 handleUnreliableActivity 方法。

  2. handleUnreliableActivity 执行 unreliableActivity 活动。

如果活动引发 IllegalStateException 而失败,该框架自动运行在 ExponentialRetryAnnotationActivities 中指定的重试策略。

使用 RetryDecorator 类的指数重试

@ExponentialRetry 易于使用。不过,配置是静态的,并且是在编译时设置的,因此,每次活动失败时,该框架使用相同的重试策略。您可以使用 RetryDecorator 类实施更灵活的指数重试策略,它允许在运行时指定配置并根据需要进行更改。基本模式如下:

  1. 创建并配置一个 ExponentialRetryPolicy 对象以指定重试配置。

  2. 创建一个 RetryDecorator 对象,并将步骤 1 中的 ExponentialRetryPolicy 对象传递给构造函数。

  3. 将活动客户端的类名传递给 RetryDecorator 对象的 decorate 方法,以将装饰器对象应用于活动。

  4. 执行活动。

如果活动失败,该框架根据 ExponentialRetryPolicy 对象的配置重试活动。您可以根据需要修改该对象以更改重试配置。

注意

@ExponentialRetry 注释和 RetryDecorator 类相互排斥。您无法使用 RetryDecorator 动态覆盖 @ExponentialRetry 注释指定的重试策略。

以下工作流程实现说明了如何使用 RetryDecorator 类实施指数重试策略。它使用一个没有 @ExponentialRetry 注释的 unreliableActivity 活动。工作流程接口是在 RetryWorkflow 中实现的,并具有一个方法 (process),它是工作流程的入口点。工作流程工作线程是在 DecoratorRetryWorkflowImpl 中实现的,如下所示:

public class DecoratorRetryWorkflowImpl implements RetryWorkflow { ... public void process() { long initialRetryIntervalSeconds = 5; int maximumAttempts = 5; ExponentialRetryPolicy retryPolicy = new ExponentialRetryPolicy( initialRetryIntervalSeconds).withMaximumAttempts(maximumAttempts); Decorator retryDecorator = new RetryDecorator(retryPolicy); client = retryDecorator.decorate(RetryActivitiesClient.class, client); handleUnreliableActivity(); } public void handleUnreliableActivity() { client.unreliableActivity(); } }

工作流程的工作方式如下所示:

  1. process 使用以下方法创建并配置一个 ExponentialRetryPolicy 对象:

    • 将初始重试间隔传递给构造函数。

    • 调用对象的 withMaximumAttempts 方法以将最大尝试次数设置为 5。ExponentialRetryPolicy 公开了其他 with 对象,您可以使用这些对象指定其他配置选项。

  2. process 创建一个名为 retryDecoratorRetryDecorator 对象,并将步骤 1 中的 ExponentialRetryPolicy 对象传递给构造函数。

  3. process 调用 retryDecorator.decorate 方法并为其传递活动客户端的类名,以将装饰器应用于活动。

  4. handleUnreliableActivity 执行活动。

如果活动失败,该框架根据步骤 1 中指定的配置重试活动。

注意

ExponentialRetryPolicy 类的几个 with 方法具有相应的 set 方法,您可以随时调用该方法以修改相应的配置选项:setBackoffCoefficientsetMaximumAttemptssetMaximumRetryIntervalSecondssetMaximumRetryExpirationIntervalSeconds

使用 AsyncRetryingExecutor 类的指数重试

RetryDecorator 类在配置重试过程方面提供比 @ExponentialRetry 更大的灵活性,但该框架仍会根据 ExponentialRetryPolicy 对象的当前配置自动运行重试尝试。更灵活的方法是使用 AsyncRetryingExecutor 类。除了允许您在运行时配置重试过程以外,该框架还会调用用户实现的 AsyncRunnable.run 方法以运行每个重试尝试,而不是直接执行活动。

基本模式如下:

  1. 创建并配置一个 ExponentialRetryPolicy 对象以指定重试配置。

  2. 创建一个 AsyncRetryingExecutor 对象,并为其传递 ExponentialRetryPolicy 对象以及一个工作流程时钟实例。

  3. 实现一个匿名的嵌套 TryCatchFinallyTryCatch 类。

  4. 实现一个匿名的 AsyncRunnable 类,并覆盖 run 方法以实现用于运行活动的自定义代码。

  5. 覆盖 doTry 以调用 AsyncRetryingExecutor 对象的 execute 方法,并为其传递步骤 4 中的 AsyncRunnable 类。AsyncRetryingExecutor 对象调用 AsyncRunnable.run 以运行活动。

  6. 如果活动失败,则 AsyncRetryingExecutor 对象根据步骤 1 中指定的重试策略再次调用 AsyncRunnable.run 方法。

以下工作流程说明了如何使用 AsyncRetryingExecutor 类实施指数重试策略。它使用与前面讨论的 DecoratorRetryWorkflow 工作流程相同的 unreliableActivity 活动。工作流程接口是在 RetryWorkflow 中实现的,并具有一个方法 (process),它是工作流程的入口点。工作流程工作线程是在 AsyncExecutorRetryWorkflowImpl 中实现的,如下所示:

public class AsyncExecutorRetryWorkflowImpl implements RetryWorkflow { private final RetryActivitiesClient client = new RetryActivitiesClientImpl(); private final DecisionContextProvider contextProvider = new DecisionContextProviderImpl(); private final WorkflowClock clock = contextProvider.getDecisionContext().getWorkflowClock(); public void process() { long initialRetryIntervalSeconds = 5; int maximumAttempts = 5; handleUnreliableActivity(initialRetryIntervalSeconds, maximumAttempts); } public void handleUnreliableActivity(long initialRetryIntervalSeconds, int maximumAttempts) { ExponentialRetryPolicy retryPolicy = new ExponentialRetryPolicy(initialRetryIntervalSeconds).withMaximumAttempts(maximumAttempts); final AsyncExecutor executor = new AsyncRetryingExecutor(retryPolicy, clock); new TryCatch() { @Override protected void doTry() throws Throwable { executor.execute(new AsyncRunnable() { @Override public void run() throws Throwable { client.unreliableActivity(); } }); } @Override protected void doCatch(Throwable e) throws Throwable { } }; } }

工作流程的工作方式如下所示:

  1. process 调用 handleUnreliableActivity 方法并为其传递配置设置。

  2. handleUnreliableActivity 使用步骤 1 中的配置设置创建 ExponentialRetryPolicy 对象 retryPolicy

  3. handleUnreliableActivity 创建 AsyncRetryExecutor 对象 executor,并将步骤 2 中的 ExponentialRetryPolicy 对象以及一个工作流程时钟实例传递给构造函数。

  4. handleUnreliableActivity 实现一个匿名的嵌套 TryCatch 类,并覆盖 doTrydoCatch 方法以运行重试尝试并处理任何异常。

  5. doTry 创建一个匿名的 AsyncRunnable 类,并覆盖 run 方法以实现自定义代码以执行 unreliableActivity。为简单起见,run 仅执行活动,但您可以根据需要实现更复杂的方法。

  6. doTry 调用 executor.execute 并将其传递到 AsyncRunnable 对象。execute 调用 AsyncRunnable 对象的 run 方法以运行活动。

  7. 如果活动失败,执行程序根据 retryPolicy 对象配置再次调用 run

有关如何使用 TryCatch 类处理错误的详细讨论,请参阅 适用于 Java 的 AWS Flow Framework 异常

自定义重试策略

重试失败活动的最灵活方法是自定义策略,它递归调用一个异步方法以运行重试尝试,这与重试直到成功策略非常相似。不过,您实现自定义逻辑以确定是否以及如何运行每个连续重试尝试,而不是直接再次运行活动。基本模式如下:

  1. 创建一个 Settable<T> 状态对象,它用于指示活动是否失败。

  2. 实现一个嵌套 TryCatchTryCatchFinally 类。

  3. doTry 执行活动。

  4. 如果活动失败,doCatch 设置该状态对象以指示活动失败。

  5. 调用一个异步失败处理方法,并为其传递该状态对象。该方法推迟执行,直到 TryCatchTryCatchFinally 完成。

  6. 该失败处理方法确定是否重试活动,如果重试,确定何时重试活动。

以下工作流程说明如何实现自定义重试策略。它使用与 DecoratorRetryWorkflowAsyncExecutorRetryWorkflow 工作流程相同的 unreliableActivity 活动。工作流程接口是在 RetryWorkflow 中实现的,并具有一个方法 (process),它是工作流程的入口点。工作流程工作线程是在 CustomLogicRetryWorkflowImpl 中实现的,如下所示:

public class CustomLogicRetryWorkflowImpl implements RetryWorkflow { ... public void process() { callActivityWithRetry(); } @Asynchronous public void callActivityWithRetry() { final Settable<Throwable> failure = new Settable<Throwable>(); new TryCatchFinally() { protected void doTry() throws Throwable { client.unreliableActivity(); } protected void doCatch(Throwable e) { failure.set(e); } protected void doFinally() throws Throwable { if (!failure.isReady()) { failure.set(null); } } }; retryOnFailure(failure); } @Asynchronous private void retryOnFailure(Promise<Throwable> failureP) { Throwable failure = failureP.get(); if (failure != null && shouldRetry(failure)) { callActivityWithRetry(); } } protected Boolean shouldRetry(Throwable e) { //custom logic to decide to retry the activity or not return true; } }

工作流程的工作方式如下所示:

  1. process 调用异步 callActivityWithRetry 方法。

  2. callActivityWithRetry 创建一个名为 failure 的 Settable<Throwable> 对象,此对象用于指示活动是否失败。Settable<T> 派生自 Promise<T>,它的工作方式基本相同,但您手动设置 Settable<T> 对象的值。

  3. callActivityWithRetry 实现一个匿名的嵌套 TryCatchFinally 类以处理 unreliableActivity 引发的任何异常。有关如何处理异步代码引发的异常的详细讨论,请参阅适用于 Java 的 AWS Flow Framework 异常

  4. doTry 执行 unreliableActivity

  5. 如果 unreliableActivity 引发异常,则该框架将调用 doCatch 并将其传递给异常对象。doCatchfailure 设置为异常对象(指示活动失败)并使该对象处于就绪状态。

  6. doFinally 检查 failure 是否准备就绪,只有在 doCatch 设置了 failure 时,它才为 true。

    • 如果 failure 准备就绪,则 doFinally 不执行任何操作。

    • 如果 failure 未准备就绪,则活动完成并且 doFinally 将 failure 设置为 null

  7. callActivityWithRetry 调用异步 retryOnFailure 方法并为其传递 failure。由于 failure 具有 Settable<T> 类型,callActivityWithRetry 将推迟执行,直到 failure 准备就绪,在 TryCatchFinally 完成后将变为该状态。

  8. retryOnFailure 从 failure 中获取值。

    • 如果 failure 设置为 null,则重试尝试成功。retryOnFailure 不执行任何操作,这会终止重试过程。

    • 如果 failure 设置为一个异常对象并且 shouldRetry 返回 true,则 retryOnFailure 调用 callActivityWithRetry 以重试活动。

      shouldRetry 实现自定义逻辑以确定是否重试失败的活动。为简单起见,shouldRetry 始终返回 true 并且 retryOnFailure 立即执行活动,但您可以根据需要实现更复杂的逻辑。

  9. 重复步骤 2 到 8,直至 unreliableActivity 完成 shouldRetry 决定停止该过程。

注意

doCatch 不处理重试过程;它仅设置 failure 以指示活动失败。重试过程是由异步 retryOnFailure 方法处理的,它推迟执行,直到 TryCatch 完成。使用这种方法的原因是,如果在 doCatch 中重试活动,则无法取消该活动。如果在 retryOnFailure 中重试活动,则可以执行可取消的活动。