

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 訂閱工作流程教學第 2 部分：實作工作流程
<a name="swf-sns-tutorial-implementing-workflow"></a>

到目前為止，我們的程式碼顯得較通用。因此，我們需要開始實際定義工作流程的運作方式，以及實作工作流程所需的活動。

**Topics**
+ [設計工作流程](#designing-the-workflow)
+ [設定工作流程程式碼](#setting-up-our-workflow-code)
+ [註冊工作流程](#registering-the-workflow)
+ [輪詢決策](#polling-for-decisions)
+ [啟動工作流程執行](#starting-the-workflow-execution)
+ [後續步驟](#implementing-workflow-next-steps)

## 設計工作流程
<a name="designing-the-workflow"></a>

回想一下，此工作流程最初的構想包含了下列步驟：

1. 從使用者取得訂閱地址 (電子郵件或簡訊)。

1. 建立 SNS 主題並將提供的端點訂閱到主題。

1. 等待使用者確認訂閱。

1. 如果使用者確認，將發佈賀辭到主題。

我們可以將工作流程中的每個步驟視為必須執行的「活動」**。「工作流程」**負責排程每個活動以於適當的時間執行，以及協調活動之間的資料傳輸。

針對此工作流程，我們將為所有這些步驟建立不同的活動，並以描述性的名稱將活動命名為：

1. get\_contact\_activity

1. subscribe\_topic\_activity

1. wait\_for\_confirmation\_activity

1. send\_result\_activity

這些活動將會依序執行，且每個步驟中的資料將用於後續步驟。

我們可以設計應用程式，讓所有程式碼都存在於一個來源檔案中，但這與 Amazon SWF 的設計方式相反。後者是針對可跨整個網際網路規模的工作流程所設計，因此讓我們將應用程式至少分為兩個不同的執行檔：
+ `swf_sns_workflow.rb` - 包含工作流程和工作流程啟動者。
+ `swf_sns_activities.rb` - 包含活動和活動啟動者。

工作流程和活動實作可以在不同的視窗、不同的電腦，甚至在世界上不同的區域中執行。由於 Amazon SWF 會追蹤工作流程和活動的詳細資訊，因此您的工作流程可以協調活動的排程和資料傳輸，無論活動在何處執行。

## 設定工作流程程式碼
<a name="setting-up-our-workflow-code"></a>

我們將從建立稱為 `swf_sns_workflow.rb` 的檔案開始。在此檔案中，宣告稱為 **SampleWorkflow** 的類別。以下是類別宣告和其建構函數：`initialize` 方法。

```
require_relative 'utils.rb'

# SampleWorkflow - the main workflow for the SWF/SNS Sample
#
# See the file called `README.md` for a description of what this file does.
class SampleWorkflow

  attr_accessor :name

  def initialize(workflowId)

    # the domain to look for decision tasks in.
    @domain = init_domain

    # the task list is used to poll for decision tasks.
    @workflowId = workflowId

    # The list of activities to run, in order. These name/version hashes can be
    # passed directly to AWS::SimpleWorkflow::DecisionTask#schedule_activity_task.
    @activity_list = [
      { :name => 'get_contact_activity', :version => 'v1' },
      { :name => 'subscribe_topic_activity', :version => 'v1' },
      { :name => 'wait_for_confirmation_activity', :version => 'v1' },
      { :name => 'send_result_activity', :version => 'v1' },
    ].reverse! # reverse the order... we're treating this like a stack.

    register_workflow
  end
```

如您所見，我們會保留下列類別執行個體資料：
+ `domain` - `utils.rb` 中擷取自 `init_domain` 的網域名稱。
+ `workflowId` - 傳入 `initialize` 的任務清單。
+ `activity_list` - 活動清單，其具有我們將執行之活動的名稱和版本。

網域名稱、活動名稱和活動版本足以讓 Amazon SWF 正面識別活動類型，因此這是我們為了排定活動而需要保留的所有資料。

工作流程「決策者」**程式碼將會使用任務清單來輪詢決策任務及排程活動。

在此函數結束時，我們呼叫尚未定義的方法：`register_workflow`。我們接下來將定義此方法。

## 註冊工作流程
<a name="registering-the-workflow"></a>

若要使用工作流程類型，我們必須先予以註冊。如同活動類型，工作流程類型是以其網域、名稱和版本進行識別。另外，如同網域和活動類型，您無法重新註冊現有的工作流程類型。如果您需要變更有關工作流程類型的任何資訊，則必須提供新版本，基本上即會建立新的類型。

下列 `register_workflow` 程式碼用來擷取我們在先前執行上註冊的現有工作流程類型，或者註冊尚未註冊的工作流程。

```
  # Registers the workflow
  def register_workflow
    workflow_name = 'swf-sns-workflow'
    @workflow_type = nil

    # a default value...
    workflow_version = '1'

    # Check to see if this workflow type already exists. If so, use it.
    @domain.workflow_types.each do | a |
      if (a.name == workflow_name) && (a.version == workflow_version)
        @workflow_type = a
      end
    end

    if @workflow_type.nil?
      options =  {
        :default_child_policy => :terminate,
        :default_task_start_to_close_timeout => 3600,
        :default_execution_start_to_close_timeout => 24 * 3600 }

      puts "registering workflow: #{workflow_name}, #{workflow_version}, #{options.inspect}"
      @workflow_type = @domain.workflow_types.register(workflow_name, workflow_version, options)
    end

    puts "** registered workflow: #{workflow_name}"
  end
```

首先要藉由逐一查看網域的 [workflow\_types](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/Domain.html#workflow_types-instance_method) 集合，確認是否已註冊工作流程名稱和版本。如果我們找到相符項目，就會使用已註冊的工作流程類型。

如果我們找不到相符項目，則會以名稱 'swf-sns-workflow'、版本 '1' 和下列選項註冊新的工作流程類型 （在搜尋工作流程的相同`workflow_types`集合上呼叫[註冊](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/WorkflowTypeCollection.html#register-instance_method))。

```
      options =  {
        :default_child_policy => :terminate,
        :default_task_start_to_close_timeout => 3600,
        :default_execution_start_to_close_timeout => 24 * 3600 }
```

在註冊期間傳遞的選項會用來設定工作流程類型的「預設行為」**，因此不需要在每次開始新的工作流程執行時設定這些值。

我們在這裡只會設定一些逾時值：從任務開始到結束所需的最長時間 (一小時)，以及工作流程執行完成所需的最長時間 (24 小時)。如果超過這兩個時間中的其中一個，則任務或工作流程將會逾時。

如需逾時值的詳細資訊，請參閱「[Amazon SWF 逾時類型](swf-timeout-types.md)」。

## 輪詢決策
<a name="polling-for-decisions"></a>

每個工作流程執行的核心即為「決策者」**。決策者的責任在於管理工作流程本身的執行。決策者會收到並回應「決策任務」**，方法是排程新活動、取消並重新啟動活動，或是將工作流程執行的狀態設定為完成、已取消或失敗。

決策者會使用工作流程執行的「任務清單」**名稱來接收要回應的決策任務。若要輪詢決策任務，請在網域的 [decision\_tasks](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/Domain.html#decision_tasks-instance_method) 集合上呼叫[輪詢](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/DecisionTaskCollection.html#poll-instance_method)，以循環切換可用的決策任務。您接著可以逐一查看 [new\_events](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/DecisionTask.html#new_events-instance_method) 集合，來檢查決策任務中的新事件。

傳回的事件為 [AWS::SimpleWorkflow::HistoryEvent](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/HistoryEvent.html) 物件，而使用所傳回事件的 [event\_type](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/HistoryEvent.html#event_type-instance_method) 成員即可取得事件類型。如需歷史記錄事件類型的清單和說明，請參閱《*Amazon Simple Workflow Service API 參考*》中的 [HistoryEvent](https://docs.aws.amazon.com/amazonswf/latest/apireference/API_HistoryEvent.html)。

以下是決策任務輪詢器邏輯的開始。工作流程類別中的新方法稱為 `poll_for_decisions`。

```
  def poll_for_decisions
    # first, poll for decision tasks...
    @domain.decision_tasks.poll(@workflowId) do | task |
      task.new_events.each do | event |
        case event.event_type
```

我們現在將根據收到的 `event_type` 來分支處理決策者的執行。我們可能會收到的第一個類型為 **WorkflowExecutionStarted**。收到此事件時，表示 Amazon SWF 會向您的決策者發出訊號，表示應該開始工作流程執行。首先，對輪詢時收到的任務呼叫 [schedule\_activity\_task](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/DecisionTask.html#schedule_activity_task-instance_method) 以排程第一個活動。

我們會將活動清單中宣告的第一個活動傳遞給它，但因為我們會反轉清單如堆疊一般使用，所以第一個活動會佔用清單上的 `last` 位置。我們定義的「活動」只是由名稱和版本編號組成的地圖，但這是 Amazon SWF 識別排程活動所需的一切，假設活動已註冊。

```
          when 'WorkflowExecutionStarted'
            # schedule the last activity on the (reversed, remember?) list to
            # begin the workflow.
            puts "** scheduling activity task: #{@activity_list.last[:name]}"

            task.schedule_activity_task( @activity_list.last,
              { :workflowId => "#{@workflowId}-activities" } )
```

當我們排程活動時，Amazon SWF 會將*活動任務*傳送至我們在排程活動任務時傳入的活動任務清單，發出開始任務的訊號。我們將在「[訂閱工作流程教學第 3 部分：實作活動](swf-sns-tutorial-implementing-activities.md)」中處理活動任務，但需注意的是我們並不會在此執行任務。我們只會告知 Amazon SWF 應該*排程*。

我們需要處理的下一個活動是 **ActivityTaskCompleted** 事件，會在 Amazon SWF 從活動任務收到活動完成的回應時發生。

```
          when 'ActivityTaskCompleted'
            # we are running the activities in strict sequential order, and
            # using the results of the previous activity as input for the next
            # activity.
            last_activity = @activity_list.pop

            if(@activity_list.empty?)
              puts "!! All activities complete! Sending complete_workflow_execution..."
              task.complete_workflow_execution
              return true;
            else
              # schedule the next activity, passing any results from the
              # previous activity. Results will be received in the activity
              # task.
              puts "** scheduling activity task: #{@activity_list.last[:name]}"
              if event.attributes.has_key?('result')
                task.schedule_activity_task(
                  @activity_list.last,
                  { :input => event.attributes[:result],
                    :workflowId => "#{@workflowId}-activities" } )
              else
                task.schedule_activity_task(
                  @activity_list.last, { :workflowId => "#{@workflowId}-activities" } )
              end
            end
```

由於我們以線性方式執行任務，而且一次只執行一個活動，我們將藉此機會從`activity_list`堆疊中彈出已完成的任務。如果此結果為空白清單，則表示我們的工作流程已完成。在此情況下，我們會呼叫任務的 [complete\_workflow\_execution](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/DecisionTask.html#complete_workflow_execution-instance_method)，向 Amazon SWF 發出我們的工作流程已完成的訊號。

如果清單上仍有項目，我們會排程清單上的下一個活動 (一樣會在最後一個位置)。不過，這次我們會查看先前的活動是否在完成時傳回任何結果資料至 Amazon SWF，該資料會在事件的屬性中，以選用的`result`金鑰提供給工作流程。如果活動有結果產生，我們會將之做為 `input` 選項連同活動任務清單一起傳遞給下一個排程的活動。

透過擷取已完成活動的 `result` 值，及設定已排程活動的 `input` 值，我們可以根據活動的結果，將資料從某個活動傳遞給下一個活動，或使用某個活動的資料來變更決策者的行為。

基於本教學的用途，這兩個事件類型在定義工作流程的行為時最為重要。不過，活動可能產生 **ActivityTaskCompleted** 以外的事件。我們會為 **ActivityTaskTimedOut** 和 **ActivityTaskFailed** 事件提供示範處理常式程式碼，並為 **WorkflowExecutionCompleted** 事件提供示範處理常式程式碼，該事件會在 Amazon SWF 處理我們用完要執行的活動時發出的`complete_workflow_execution`呼叫時產生。

```
          when 'ActivityTaskTimedOut'
            puts "!! Failing workflow execution! (timed out activity)"
            task.fail_workflow_execution
            return false

          when 'ActivityTaskFailed'
            puts "!! Failing workflow execution! (failed activity)"
            task.fail_workflow_execution
            return false

          when 'WorkflowExecutionCompleted'
            puts "## Yesss, workflow execution completed!"
            task.workflow_execution.terminate
            return false
        end
      end
    end
  end
```

## 啟動工作流程執行
<a name="starting-the-workflow-execution"></a>

在產生工作流程要輪詢的任何決策任務以前，我們得先啟動工作流程執行。

若要啟動工作流程執行，請呼叫已註冊工作流程類型的 [start\_execution](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/WorkflowType.html#start_execution-instance_method) ([AWS::SimpleWorkflow::WorkflowType](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/WorkflowType.html))。我們將對此定義小型包裝函式，以利用我們在類別建構函數中擷取的 `workflow_type` 執行個體成員。

```
  def start_execution
    workflow_execution = @workflow_type.start_execution( {
      :workflowId => @workflowId } )
    poll_for_decisions
  end
end
```

工作流程執行之時，決策事件會開始出現在工作流程的任務清單上，而任務清單在 [start\_execution](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/WorkflowType.html#start_execution-instance_method) 中會以工作流程執行選項傳遞。

與註冊工作流程類型時提供的選項不同，傳遞給 `start_execution` 的選項不會視為工作流程類型的一部分。您可以自由地針對個別的工作流程執行變更這些項目，而不需要變更工作流程版本。

由於我們希望工作流程在執行檔案時開始執行，請新增一些執行個體化 類別的程式碼，然後呼叫我們剛定義的`start_execution`方法。

```
if __FILE__ == $0
  require 'securerandom'

  # Use a different task list name every time we start a new workflow execution.
  #
  # This avoids issues if our pollers re-start before SWF considers them closed,
  # causing the pollers to get events from previously-run executions.
  workflowId = SecureRandom.uuid

  # Let the user start the activity worker first...

  puts ""
  puts "Amazon SWF Example"
  puts "------------------"
  puts ""
  puts "Start the activity worker, preferably in a separate command-line window, with"
  puts "the following command:"
  puts ""
  puts "> ruby swf_sns_activities.rb #{workflowId}-activities"
  puts ""
  puts "You can copy & paste it if you like, just don't copy the '>' character."
  puts ""
  puts "Press return when you're ready..."

  i = gets

  # Now, start the workflow.

  puts "Starting workflow execution."
  sample_workflow = SampleWorkflow.new(workflowId)
  sample_workflow.start_execution
end
```

為了避免任何任務清單命名衝突，我們將使用 `SecureRandom.uuid` 產生可用做任務清單名稱的隨機 UUID，藉以保證每個工作流程執行用的是不同的任務清單名稱。

**注意**  
任務清單會用來記錄工作流程執行的事件，因此如果您在相同工作流程類型的多次執行當中使用相同的任務清單，則可能會取得上一次執行所產生的事件，尤其會發生在近乎連續執行工作流程的時候，大多在試用新程式碼或執行測試的情況下。

為了避免必須處理先前執行之成品的問題，我們可以針對每次執行都使用新的任務清單，在我們開始工作流程執行時予以指定。

這裡也有一些程式碼可提供說明給執行它的人員 (可能是您)，以及提供任務清單的「活動」版本。決策者使用此任務清單名稱來排程工作流程的活動，而活動實作將會接聽此任務清單名稱的活動事件，知道何時開始排程的活動，以及提供活動執行的更新。

程式碼也會在啟動工作流程執行「之前」**，等待使用者開始執行活動啟動者，因此活動任務開始出現於提供的任務清單時，活動啟動者就已準備好回應。

## 後續步驟
<a name="implementing-workflow-next-steps"></a>

您已實作工作流程。接下來，您會在「[訂閱工作流程教學第 3 部分：實作活動](swf-sns-tutorial-implementing-activities.md)」中定義活動和啟動者程式碼。