运行使用适用于 Java 的 AWS Flow Framework 编写的程序 - AWS Flow Framework 适用于 Java

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

运行使用适用于 Java 的 AWS Flow Framework 编写的程序

该框架提供工作线程类,用于初始化适用于 Java 的 AWS Flow Framework 运行时并与 Amazon SWF 通信。为了实现工作流或活动工作线程,您必须创建并启动工作线程类的实例。这些工作线程类负责管理持续进行的异步操作、调用取消阻止的异步方法,以及与 Amazon SWF 通信。可通过工作流和活动实现、线程数量、要轮询的任务列表等对其进行配置。

该框架附带两个工作线程类,一个用于活动,一个用于工作流。要运行工作流逻辑,需使用 WorkflowWorker 类。同样,对活动使用 ActivityWorker 类。这些类会自动轮询 Amazon SWF 以获取活动任务,并在实现中调用相应的方法。

以下示例显示如何实例化 WorkflowWorker 和开始轮询任务:

AmazonSimpleWorkflow swfClient = new AmazonSimpleWorkflowClient(awsCredentials); WorkflowWorker worker = new WorkflowWorker(swfClient, "domain1", "tasklist1"); // Add workflow implementation types worker.addWorkflowImplementationType(MyWorkflowImpl.class); // Start worker worker.start();

创建 ActivityWorker 实例和开始轮询任务的基本步骤如下:

AmazonSimpleWorkflow swfClient = new AmazonSimpleWorkflowClient(awsCredentials); ActivityWorker worker = new ActivityWorker(swfClient, "domain1", "tasklist1"); worker.addActivitiesImplementation(new MyActivitiesImpl()); // Start worker worker.start();

当您要关闭活动或决策程序时,您的应用程序应关闭正在使用的工作线程类的实例以及 Amazon SWF Java 客户端实例。这将确保正确释放该工作线程类使用的所有资源。

worker.shutdown(); worker.awaitTermination(1, TimeUnit.MINUTES);

要开始执行,只需创建生成的外部客户端的实例并调用 @Execute 方法。

MyWorkflowClientExternalFactory factory = new MyWorkflowClientExternalFactoryImpl(); MyWorkflowClientExternal client = factory.getClient(); client.start();

WorkflowWorker

顾名思义,此工作线程类供工作流实现使用。它配置了任务列表和工作流实现类型。工作线程类运行循环,在指定的任务列表中轮询决策任务。当收到决策任务时,它会创建工作流实现的实例并调用 @Execute 方法来处理任务。

ActivityWorker

对于实现活动工作线程,您可以使用 ActivityWorker 类来方便地轮询任务列表,以查找活动任务。可为活动工作线程配置活动实现对象。此工作线程类运行循环,在指定的任务列表中轮询活动任务。收到活动任务时,它会查找您提供的相应实现并调用活动方法来处理任务。与调用工厂来为每个决策任务创建新实例的 WorkflowWorker 不同,ActivityWorker 直接使用您提供的对象。

ActivityWorker 类使用适用于 Java 的 AWS Flow Framework 注释来确定注册和执行选项。

工作线程的线程模型

在适用于 Java 的 AWS Flow Framework 中,活动或决策程序都体现为工作线程类的实例。您的应用程序负责在每台机器和每个进程上配置和实例化应作为工作线程的工作线程对象。然后,工作线程对象将自动接收来自 Amazon SWF 的任务,将其分派给您的活动或工作流实现,并向 Amazon SWF 报告结果。单个工作流实例可能跨越许多工作线程。如果 Amazon SWF 有一个或多个待处理的活动任务,它会将任务分配给第一个可用的工作线程,然后再分派给下一个,以此类推。这样便可同时在不同工作线程中处理属于同一工作流实例的任务。

基于适用于 Java 的 AWS Flow Framework 应用程序

此外,还可以将每个工作线程配置为在多个线程上处理任务。这意味着,即使只有一个工作线程,工作流实例的活动任务也可以并行运行。

决策任务的行为类似,但例外情况是 Amazon SWF 保证在执行给定工作流时,一次仅执行一个决策。单个工作流执行通常需要多个决策任务;因此,这也可能导致在多个进程和线程上执行。决策程序配置了工作流实现的类型。当决策程序收到决策任务时,它会创建工作流实现的实例 (对象)。该框架提供了用于创建这些实例的可扩展工厂模式。默认工作流工厂每次创建一个新对象。您可以提供自定义工厂来覆盖此行为。

与配置了工作流实现类型的决策程序不同,活动工作线程配置了活动实现的实例 (对象)。当活动工作线程收到活动任务时,该任务将被分派至适当的活动实现对象。

工作线程类的线程模型

工作流工作线程维护单个线程池,并在用于轮询 Amazon SWF 以获取任务的同一线程中执行工作流。由于活动要长时间运行(至少与工作流逻辑相比),活动工作线程类需要维护两个独立的线程池;一个用于轮询 Amazon SWF 以获取活动任务,另一个通过执行活动实现来处理任务。这样,您便可以独立于用于执行任务的线程数,来配置用于轮询任务的线程数。例如,您可以将少量线程用于轮询,而将大量线程用于执行任务。活动工作线程类仅在有可用的轮询线程和用于处理任务的可用线程时,才会轮询 Amazon SWF 以获取任务。

此线程和实例行为意味着:

  1. 活动实现必须是无状态的。不应使用实例变量在活动对象中存储应用程序状态。但是,您可以使用字段来存储数据库连接等资源。

  2. 活动实现必须是线程安全的。由于相同实例可同时用来处理来自不同线程的任务,因此活动代码对共享资源的访问必须同步。

  3. 工作流实现可以是有状态的,实例变量可用于存储状态。即使创建工作流实现的新实例来处理每个决策任务,架构也将确保可正确地重新创建状态。但是,工作流实现必须是确定性的。有关更多详细信息,请参阅深入剖析 一节。

  4. 在使用默认工厂时,工作流实现不需要是线程安全的。默认实现确保,一次只有一个线程使用工作流实现的一个实例。

工作线程扩展性

适用于 Java 的 AWS Flow Framework 还包含一些低级别的工作线程类,可为您提供精细控制和扩展性。使用它们,您可以完全自定义工作流和活动类型注册,并设置用于创建实现对象的工厂。这些工作线程为 GenericWorkflowWorkerGenericActivityWorker

GenericWorkflowWorker 可以配置一个工厂,以创建工作流定义工厂。工作流定义工厂负责创建工作流实现的实例以及提供注册选项等配置设置。在正常情况下,您应直接使用 WorkflowWorker 类。它会自动创建和配置框架中所提供的工厂 POJOWorkflowDefinitionFactoryFactoryPOJOWorkflowDefinitionFactory 的实现。该工厂要求工作流实现类必须具有无参数的构造函数。该构造函数用于在运行时创建工作流对象的实例。该工厂会查看您在工作流接口和实现中使用的注释,以创建相应的注册和执行选项。

您可以通过实现 WorkflowDefinitionFactoryWorkflowDefinitionFactoryFactoryWorkflowDefinition 来提供您自己的工厂实现。WorkflowDefinition 类供工作线程类用来分派决策任务和信号。通过实现这些基类,您可以完全自定义工厂并向工作流实现分派请求。例如,您可以使用这些扩展性点根据您自己的注释提供用于编写工作流的自定义编程模型,或从 WSDL 而不是框架使用的代码优先方法生成该模型。要使用您的自定义工厂,您必须使用 GenericWorkflowWorker 类。有关这些类的详细信息,请参阅AWS SDK for Java 文档。

同样,GenericActivityWorker 允许您提供自定义活动实现工厂。通过实现 ActivityImplementationFactoryActivityImplementation 类,您可以完全控制活动实例化以及自定义注册和执行选项。有关这些类的详细信息,请参阅AWS SDK for Java 文档。