

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 订阅工作流程教程第 2 部分：实现工作流程
<a name="swf-sns-tutorial-implementing-workflow"></a>

到目前为止，我们的代码都是比较通用的。在此部分，我们将开始实际定义工作流程执行的操作以及需要什么活动才能实现它。

**Topics**
+ [设计工作流程](#designing-the-workflow)
+ [设置工作流程代码](#setting-up-our-workflow-code)
+ [注册工作流程](#registering-the-workflow)
+ [轮询决策](#polling-for-decisions)
+ [启动工作流程执行](#starting-the-workflow-execution)
+ [后续步骤](#implementing-workflow-next-steps)

## 设计工作流程
<a name="designing-the-workflow"></a>

回想一下，此工作流程的初步构想由以下步骤组成：

1. 向用户获取订阅地址（电子邮件或手机短信）。

1. 创建 SNS 主题，然后让所提供的终端节点订阅该主题。

1. 等待用户确认订阅。

1. 如果用户确认，则向该主题发布一条祝贺消息。

我们可将工作流程中的每步视为工作流程必须执行的一个*活动*。我们的*工作流程* 负责在适当的时间安排每个活动以及协调活动之间的数据传输。

对于此工作流程，我们将为其中每个步骤创建一个单独的活动，并为其提供描述性名称：

1. get\_contact\_activity

1. subscribe\_topic\_activity

1. wait\_for\_confirmation\_activity

1. send\_result\_activity

这些活动将按顺序执行，并且后续步骤中将使用每步中的数据。

我们可以将应用程序设计为将所有代码放在一个源文件中，但这与 Amazon SWF 的设计宗旨相悖。它适合的工作流程可跨越整个 Internet 范围，因此我们至少要将应用程序分为两个单独的执行文件：
+ `swf_sns_workflow.rb` - 包含工作流程和工作流程启动者。
+ `swf_sns_activities.rb` - 包含活动和活动启动者。

可在单独的时段、单独的计算机甚至世界上的不同地点运行工作流程和活动实现。由于 Amazon SWF 会跟踪工作流和活动的细节，因此无论活动在何处运行，工作流都能协调其计划和数据传输。

## 设置工作流程代码
<a name="setting-up-our-workflow-code"></a>

我们首先将创建一个名为 `swf_sns_workflow.rb` 的文件。在此文件中，声明一个名为的类**SampleWorkflow**。以下是类声明及其构造函数 `initialize` 方法。

```
require_relative 'utils.rb'

# SampleWorkflow - the main workflow for the SWF/SNS Sample
#
# See the file called `README.md` for a description of what this file does.
class SampleWorkflow

  attr_accessor :name

  def initialize(workflowId)

    # the domain to look for decision tasks in.
    @domain = init_domain

    # the task list is used to poll for decision tasks.
    @workflowId = workflowId

    # The list of activities to run, in order. These name/version hashes can be
    # passed directly to AWS::SimpleWorkflow::DecisionTask#schedule_activity_task.
    @activity_list = [
      { :name => 'get_contact_activity', :version => 'v1' },
      { :name => 'subscribe_topic_activity', :version => 'v1' },
      { :name => 'wait_for_confirmation_activity', :version => 'v1' },
      { :name => 'send_result_activity', :version => 'v1' },
    ].reverse! # reverse the order... we're treating this like a stack.

    register_workflow
  end
```

如您所见，我们保留以下类实例数据：
+ `domain` - 从 `init_domain` 中的 `utils.rb` 检索的域名。
+ `workflowId` - 任务列表传入到 `initialize`。
+ `activity_list` - 活动列表，其中具有我们将运行的活动的名称和版本。

域名、活动名称和活动版本足以让 Amazon SWF 明确地识别活动类型，因此，这些就是我们要计划活动所需保存的所有数据。

工作流程的 *decider* 代码将使用任务列表轮询决策任务并安排活动。

在此函数的最后，我们调用一个尚未定义的方法：`register_workflow`。接下来，我们将定义此方法。

## 注册工作流程
<a name="registering-the-workflow"></a>

要使用工作流程类型，必须先注册它。如同活动类型一样，工作流程类型按其域、名称和版本进行标识。此外，与域和活动类型一样，无法重新注册现有的工作流程类型。如果需要更改有关工作流程类型的任何内容，则必须为其提供新版本，基本上就是新建一个类型。

以下是 `register_workflow` 的代码，它用于检索我们在上次运行时注册的现有工作流程类型，如果尚未注册该工作流程，则注册它。

```
  # Registers the workflow
  def register_workflow
    workflow_name = 'swf-sns-workflow'
    @workflow_type = nil

    # a default value...
    workflow_version = '1'

    # Check to see if this workflow type already exists. If so, use it.
    @domain.workflow_types.each do | a |
      if (a.name == workflow_name) && (a.version == workflow_version)
        @workflow_type = a
      end
    end

    if @workflow_type.nil?
      options =  {
        :default_child_policy => :terminate,
        :default_task_start_to_close_timeout => 3600,
        :default_execution_start_to_close_timeout => 24 * 3600 }

      puts "registering workflow: #{workflow_name}, #{workflow_version}, #{options.inspect}"
      @workflow_type = @domain.workflow_types.register(workflow_name, workflow_version, options)
    end

    puts "** registered workflow: #{workflow_name}"
  end
```

首先，我们通过循环访问域的 [workflow\_types](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/Domain.html#workflow_types-instance_method) 集合，查看是否已注册工作流名称和版本。如果找到了匹配项，则将使用已注册的工作流程类型。

[如果我们找不到匹配项，则会注册一个新的工作流程类型（通过在我们搜索工作流程的同一个`workflow_types`集合上调用 register），名称为 “swf-sns-workflow”，版本为 “1”，并包含以下选项。](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/WorkflowTypeCollection.html#register-instance_method)

```
      options =  {
        :default_child_policy => :terminate,
        :default_task_start_to_close_timeout => 3600,
        :default_execution_start_to_close_timeout => 24 * 3600 }
```

注册期间传入的选项用于为工作流程类型设置*默认行为*，因此不需要在每次开始执行新工作流程时设置这些值。

在这里，我们只设置了一些超时值：从任务开始到结束时可用的最长时间（1 小时）以及工作流程执行完毕可用的最长时间（24 小时）。如果超出其中任意一个时间，则任务或工作流程将超时。

有关超时值的详细信息，请参阅[Amazon SWF 超时类型](swf-timeout-types.md)。

## 轮询决策
<a name="polling-for-decisions"></a>

在每个工作流程执行的核心部分都有一个*决策程序*。决策程序的职责是管理工作流程自身的执行。决策程序接收*决策任务*，然后通过安排新活动、删除并重新启动活动，或通过将工作流程执行的状态设置为完成、已取消或失败，响应这些任务。

决策程序按照工作流程执行的*任务列表* 名称接收要响应的决策任务。要轮询决策任务，可在域的 [decision\_tasks](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/Domain.html#decision_tasks-instance_method) 集合上调用 [poll](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/DecisionTaskCollection.html#poll-instance_method)，以循环遍历可用的决策任务。然后，可通过循环访问决策任务的 [new\_events](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/DecisionTask.html#new_events-instance_method) 集合，检查其中是否有新事件。

返回的事件是[AWS::SimpleWorkflow::HistoryEvent](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/HistoryEvent.html)对象，您可以使用返回事件的 e [vent\_type 成员来获取事件的类型](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/HistoryEvent.html#event_type-instance_method)。有关历史事件类型的列表和描述，请参阅[HistoryEvent](https://docs.aws.amazon.com/amazonswf/latest/apireference/API_HistoryEvent.html)《*亚马逊简单工作流程服务 API 参考*》。

下面是决策任务轮询器逻辑的开头。我们的工作流程类中一个名为 `poll_for_decisions` 的新方法。

```
  def poll_for_decisions
    # first, poll for decision tasks...
    @domain.decision_tasks.poll(@workflowId) do | task |
      task.new_events.each do | event |
        case event.event_type
```

我们现在将根据收到的 `event_type` 划分决策的执行。我们可能收到的第一个是**WorkflowExecutionStarted**。收到该事件意味着 Amazon SWF 正在向决策程序发出信号，示意它应开始执行工作流。我们首先将通过对轮询时收到的任务调用 [schedule\_activity\_task](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/DecisionTask.html#schedule_activity_task-instance_method)，安排第一个活动。

我们将在活动列表中声明的第一个活动传递给它，由于我们颠倒了列表，因此可将其用作堆栈，而该活动占据列表上的 `last` 位置。我们定义的“活动”只是由名称和版本号组成的映射，但这就是 Amazon SWF 识别活动以进行计划所需的全部信息（假定已注册该活动）。

```
          when 'WorkflowExecutionStarted'
            # schedule the last activity on the (reversed, remember?) list to
            # begin the workflow.
            puts "** scheduling activity task: #{@activity_list.last[:name]}"

            task.schedule_activity_task( @activity_list.last,
              { :workflowId => "#{@workflowId}-activities" } )
```

当我们计划活动时，Amazon SWF 会向我们在计划时传入的活动任务列表发送一个*活动任务*，示意任务开始。我们将在 [订阅工作流教程第 3 部分：实现活动](swf-sns-tutorial-implementing-activities.md) 中处理活动任务，但值得注意的是，我们在此并未执行任务。我们仅需告知 Amazon SWF 应该*计划*该任务。

我们需要解决的下一个活动是**ActivityTaskCompleted**事件，该事件发生在 Amazon SWF 收到来自活动任务的活动已完成响应时。

```
          when 'ActivityTaskCompleted'
            # we are running the activities in strict sequential order, and
            # using the results of the previous activity as input for the next
            # activity.
            last_activity = @activity_list.pop

            if(@activity_list.empty?)
              puts "!! All activities complete! Sending complete_workflow_execution..."
              task.complete_workflow_execution
              return true;
            else
              # schedule the next activity, passing any results from the
              # previous activity. Results will be received in the activity
              # task.
              puts "** scheduling activity task: #{@activity_list.last[:name]}"
              if event.attributes.has_key?('result')
                task.schedule_activity_task(
                  @activity_list.last,
                  { :input => event.attributes[:result],
                    :workflowId => "#{@workflowId}-activities" } )
              else
                task.schedule_activity_task(
                  @activity_list.last, { :workflowId => "#{@workflowId}-activities" } )
              end
            end
```

由于我们以线性方式执行任务，并且一次只有一个活动在执行，所以我们将借此机会从`activity_list`堆栈中弹出已完成的任务。如果这样导致列表变空，则表示我们的工作流程已完成。在这种情况下，我们通过调用任务的 [complete\_workflow\_execution](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/DecisionTask.html#complete_workflow_execution-instance_method) 向 Amazon SWF 发出工作流已完成的信号。

如果列表中仍有条目，则我们将安排列表上的下一活动（仍处于最后一个位置）。但是，这一次我们将查看上一活动在完成后是否向 Amazon SWF 返回了任何结果数据，这些数据将在事件属性中的可选 `result` 键中提供给工作流。如果该活动产生了结果，则我们将其作为 `input` 选项传递给所安排的下一活动以及活动任务列表。

通过检索完成的活动的 `result` 值以及通过设置安排的活动的 `input` 值，我们可将数据从一个活动传递到下一个，或可使用活动中的数据，根据活动得到的结果更改决策程序中的行为。

就本教程而言，在定义工作流程的行为时，这两种事件类型最为重要。但是，活动可以生成除之外的事件**ActivityTaskCompleted**。我们将通过为**ActivityTaskTimedOut**和**ActivityTaskFailed**事件以及事件提供演示处理程序代码来总结决策程序代码，这些代码将在 **WorkflowExecutionCompleted**Amazon SWF 处理`complete_workflow_execution`我们用完要运行的活动时发出的调用时生成。

```
          when 'ActivityTaskTimedOut'
            puts "!! Failing workflow execution! (timed out activity)"
            task.fail_workflow_execution
            return false

          when 'ActivityTaskFailed'
            puts "!! Failing workflow execution! (failed activity)"
            task.fail_workflow_execution
            return false

          when 'WorkflowExecutionCompleted'
            puts "## Yesss, workflow execution completed!"
            task.workflow_execution.terminate
            return false
        end
      end
    end
  end
```

## 启动工作流程执行
<a name="starting-the-workflow-execution"></a>

在将为要轮询的工作流程生成任何决策任务之前，我们需要启动工作流程执行。

要启动工作流程执行，请在注册的工作流程类型 () [AWS::SimpleWorkflow::WorkflowType](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/WorkflowType.html)上调用 [start\_e](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/WorkflowType.html#start_execution-instance_method) xecution。我们将在此周围定义一个小型包装器，以利用在类构造函数中检索的 `workflow_type` 实例成员。

```
  def start_execution
    workflow_execution = @workflow_type.start_execution( {
      :workflowId => @workflowId } )
    poll_for_decisions
  end
end
```

一旦执行工作流，工作流的任务列表（它作为工作流执行选项传入 [start\_execution](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/WorkflowType.html#start_execution-instance_method)）上即开始显示决策事件。

与注册工作流程类型时提供的选项不同，不将传递给 `start_execution` 的选项视为工作流程类型的一部分。无需更改工作流程的版本，即可随意在每次执行工作流程时更改这些选项。

因为我们希望工作流程在运行文件时开始执行，所以添加一些实例化类的代码，然后调用我们刚才定义`start_execution`的方法。

```
if __FILE__ == $0
  require 'securerandom'

  # Use a different task list name every time we start a new workflow execution.
  #
  # This avoids issues if our pollers re-start before SWF considers them closed,
  # causing the pollers to get events from previously-run executions.
  workflowId = SecureRandom.uuid

  # Let the user start the activity worker first...

  puts ""
  puts "Amazon SWF Example"
  puts "------------------"
  puts ""
  puts "Start the activity worker, preferably in a separate command-line window, with"
  puts "the following command:"
  puts ""
  puts "> ruby swf_sns_activities.rb #{workflowId}-activities"
  puts ""
  puts "You can copy & paste it if you like, just don't copy the '>' character."
  puts ""
  puts "Press return when you're ready..."

  i = gets

  # Now, start the workflow.

  puts "Starting workflow execution."
  sample_workflow = SampleWorkflow.new(workflowId)
  sample_workflow.start_execution
end
```

为避免任务列表命名发生任何冲突，我们将使用 `SecureRandom.uuid` 生成可用作任务列表名称的随机 UUID，确保将不同的任务列表名称用于每次工作流程执行。

**注意**  
任务列表用于记录有关工作流程执行的事件，因此，如果将同一任务列表用于多次执行同一工作流程类型，则可能获得在上一次执行期间生成的事件，当多次执行间隔较小（试验新代码或运行测试时经常这样）时尤为如此。

为了避免必须处理以前执行中的项目的问题，可对每次执行使用新任务列表，在开始工作流程执行时指定该列表。

此处还有一些代码，可向运行代码的人员（很可能是您）提供说明以及提供任务列表的“活动”版本。决策程序使用此任务列表名称为工作流程安排活动，而活动实现将对此任务列表名称侦听活动事件以了解何时开始安排的活动并提供有关活动执行的最新消息。

这段代码还等待用户开始运行活动启动程序，*然后再* 开始工作流程执行，因此在所提供的任务列表上开始出现活动任务时，活动启动程序将准备好作出响应。

## 后续步骤
<a name="implementing-workflow-next-steps"></a>

您已实现工作流程。接下来，将在[订阅工作流教程第 3 部分：实现活动](swf-sns-tutorial-implementing-activities.md)中定义活动和活动启动程序。