

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# サブスクリプションワークフローのチュートリアルのパート 2: ワークフローの実装
<a name="swf-sns-tutorial-implementing-workflow"></a>

これまでは、コードはかなり汎用的でした。この部分で、ワークフローが何を行うか、またそれを実装するのにどのようなアクティビティが必要になるかを実際に定義し始めます。

**Topics**
+ [ワークフローの設計](#designing-the-workflow)
+ [ワークフローコードの設定](#setting-up-our-workflow-code)
+ [ワークフローの登録](#registering-the-workflow)
+ [決定のポーリング](#polling-for-decisions)
+ [ワークフロー実行の開始](#starting-the-workflow-execution)
+ [次のステップ](#implementing-workflow-next-steps)

## ワークフローの設計
<a name="designing-the-workflow"></a>

思い返して頂くと、このワークフローの最初のアイデアは以下のステップで構成されていました。

1. ユーザーからサブスクリプションアドレス (E メールまたは SMS) を取得します。

1. SNS トピックを作成し、トピックに対して提供されたエンドポイントにサブスクライブします。

1. ユーザーによるサブスクリプションの確認を待機します。

1. ユーザーが確認した場合、トピックに対して成功のメッセージを発行します。

ワークフローの各ステップのことを、実行する必要のある*アクティビティ*と考えることができます。*ワークフロー*は、適切なタイミングで各アクティビティをスケジュールし、アクティビティ間のデータ転送を調整します。

このワークフローでは、これらのステップの各々に別個のアクティビティを作成し、それらに記述的に名前を付けます。

1. get\_contact\_activity

1. subscribe\_topic\_activity

1. wait\_for\_confirmation\_activity

1. send\_result\_activity

これらのアクティビティは順番に実行され、各ステップからのデータは、次のステップで使用されます。

コードすべてが 1 つのソースファイルに存在するようにアプリケーションを設計できますが、これは Amazon SWF の設計された方法に反しています。範囲がインターネット全体に及ぶワークフローのために設計されているので、少なくともアプリケーションを 2 つの別個の実行可能ファイルに分割しましょう。
+ `swf_sns_workflow.rb` - ワークフローおよびワークフロースターターが含まれています。
+ `swf_sns_activities.rb` - アクティビティおよびアクティビティスターターが含まれています。

ワークフローとアクティビティの実装は、別々のウィンドウ、別々のコンピュータ、または世界の異なる場所でも実行できます。Amazon SWF がワークフローとアクティビティの詳細を追跡しているため、ワークフローは、実行中の場所に関係なくアクティビティのスケジューリングとデータ転送を調整できます。

## ワークフローコードの設定
<a name="setting-up-our-workflow-code"></a>

`swf_sns_workflow.rb` というファイルを作成して開始します。このファイルでは、**SampleWorkflow** というクラスを宣言します。クラスの宣言とそのコンストラクタである `initialize` メソッドを次に示します。

```
require_relative 'utils.rb'

# SampleWorkflow - the main workflow for the SWF/SNS Sample
#
# See the file called `README.md` for a description of what this file does.
class SampleWorkflow

  attr_accessor :name

  def initialize(workflowId)

    # the domain to look for decision tasks in.
    @domain = init_domain

    # the task list is used to poll for decision tasks.
    @workflowId = workflowId

    # The list of activities to run, in order. These name/version hashes can be
    # passed directly to AWS::SimpleWorkflow::DecisionTask#schedule_activity_task.
    @activity_list = [
      { :name => 'get_contact_activity', :version => 'v1' },
      { :name => 'subscribe_topic_activity', :version => 'v1' },
      { :name => 'wait_for_confirmation_activity', :version => 'v1' },
      { :name => 'send_result_activity', :version => 'v1' },
    ].reverse! # reverse the order... we're treating this like a stack.

    register_workflow
  end
```

ご覧のとおり、次のクラスインスタンスデータを保持しています。
+ `domain` - `utils.rb` の `init_domain` から取得したドメイン名。
+ `workflowId` - `initialize` に渡されるタスクリスト。
+ `activity_list` - 実行するアクティビティの名前とバージョンを持つアクティビティリスト。

Amazon SWF がアクティビティタイプを確実に識別するには、ドメイン名、アクティビティ名、アクティビティバージョンで十分であるため、スケジュールするためにアクティビティについて保持する必要があるデータはそれだけです。

タスクリストは、決定タスクをポーリングしアクティビティをスケジュールするために、ワークフローの*ディサイダー*コードによって使用されます。

この関数の最後に、まだ定義していないメソッドである `register_workflow` を呼び出します。次にこのメソッドを定義します。

## ワークフローの登録
<a name="registering-the-workflow"></a>

ワークフロータイプを使用するには、まずそれを登録する必要があります。アクティビティタイプと同様、ワークフロータイプは、そのドメイン、名前、およびバージョンによって識別されます。また、ドメインもアクティビティタイプも同様ですが、既存のワークフロータイプを再登録することはできません。ワークフロータイプについて何か変更する必要がある場合は、それに新しいバージョンを提供する必要がありますが、それにより本質的に新しいタイプが作成されます。

`register_workflow` のコードは次のとおりです。以前の実行時に登録した既存のワークフロータイプを取得するため、またはまだ登録されていない場合はそのワークフローを登録するために使用されます。

```
  # Registers the workflow
  def register_workflow
    workflow_name = 'swf-sns-workflow'
    @workflow_type = nil

    # a default value...
    workflow_version = '1'

    # Check to see if this workflow type already exists. If so, use it.
    @domain.workflow_types.each do | a |
      if (a.name == workflow_name) && (a.version == workflow_version)
        @workflow_type = a
      end
    end

    if @workflow_type.nil?
      options =  {
        :default_child_policy => :terminate,
        :default_task_start_to_close_timeout => 3600,
        :default_execution_start_to_close_timeout => 24 * 3600 }

      puts "registering workflow: #{workflow_name}, #{workflow_version}, #{options.inspect}"
      @workflow_type = @domain.workflow_types.register(workflow_name, workflow_version, options)
    end

    puts "** registered workflow: #{workflow_name}"
  end
```

まず、ドメインの [workflow\_types](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/Domain.html#workflow_types-instance_method) コレクションを反復処理することにより、ワークフロー名とバージョンが既に登録されているかどうか確認します。一致が検出されると、既に登録されていたワークフロータイプを使用します。

一致するものが見つからない場合、（ワークフローを検索したのと同じ `workflow_types` コレクションで [register](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/WorkflowTypeCollection.html#register-instance_method) を呼び出すことにより）名前を「swf-sns-workflow」、バージョンを「1」、オプションを次のようにした新しいワークフロータイプが登録されます。

```
      options =  {
        :default_child_policy => :terminate,
        :default_task_start_to_close_timeout => 3600,
        :default_execution_start_to_close_timeout => 24 * 3600 }
```

登録中に渡されるオプションは、ワークフロータイプの*デフォルトの動作*を設定するために使用されるので、新しいワークフロー実行を開始するたびにこれらの値を設定する必要はありません。

ここでは、タスクの開始から終了までの最大時間 (1 時間)、およびワークフロー実行が完了するまでにかかる最大時間 (24 時間) というタイムアウト値だけを設定します。これらの時間のどちらかを超過すると、タスクまたはワークフローがタイムアウトします。

タイムアウト値の詳細については、「[Amazon SWF タイムアウトの種類](swf-timeout-types.md)」を参照してください。

## 決定のポーリング
<a name="polling-for-decisions"></a>

すべてのワークフロー実行の中心には、*ディサイダー*があります。ディサイダーには、ワークフロー自体の実行を管理する責任があります。ディサイダーは*決定タスク*を取得し、新しいアクティビティのスケジューリングやアクティビティのキャンセルや再開を行うか、ワークフロー実行の状態を完了、キャンセル済み、または失敗と設定するかのどちらかでそれに応答します。

ディサイダーはワークフロー実行の*タスクリスト*名を使用して、応答する決定タスクを取得します。決定タスクをポーリングするには、ドメインの [decision\_tasks](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/Domain.html#decision_tasks-instance_method) コレクションで [poll](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/DecisionTaskCollection.html#poll-instance_method) を呼び出して、使用可能な決定タスクをループします。その後、[new\_events](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/DecisionTask.html#new_events-instance_method) コレクションに対して反復処理することにより、その決定タスクに新しいイベントがないか確認できます。

返されるイベントは [AWS::SimpleWorkflow::HistoryEvent](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/HistoryEvent.html) オブジェクトで、その返されたイベントの [event\_type](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/HistoryEvent.html#event_type-instance_method) メンバーを使用することによって、イベントのタイプを取得できます。履歴イベントタイプのリストと説明については、[Amazon Simple Workflow Service API リファレンス](https://docs.aws.amazon.com/amazonswf/latest/apireference/API_HistoryEvent.html) の「*HistoryEvent*」を参照してください。

以下に決定タスクポーラーのロジックの冒頭を示します。`poll_for_decisions` というワークフロークラスの新しいメソッドです。

```
  def poll_for_decisions
    # first, poll for decision tasks...
    @domain.decision_tasks.poll(@workflowId) do | task |
      task.new_events.each do | event |
        case event.event_type
```

次に、取得する `event_type` に基づいてディサイダーの実行を分岐します。取得する可能性が高い最初のものは、**WorkflowExecutionStarted** です。このイベントを取得した場合、それは Amazon SWF がディサイダーにワークフロー実行を開始するよう合図していることを意味します。まず初めに、ポーリング中に取得したタスクで [schedule\_activity\_task](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/DecisionTask.html#schedule_activity_task-instance_method) を呼び出すことで、最初のアクティビティをスケジューリングします。

それにアクティビティリストで宣言した最初のアクティビティを渡します。これは、スタックのように使用できるようリストを逆にしたために、リストの `last` 位置にあります。定義した「アクティビティ」は名前とバージョン番号で構成されたマップにすぎませんが、Amazon SWF がアクティビティは登録済みであると想定してスケジューリング用のアクティビティを識別するために必要とするのはこれだけです。

```
          when 'WorkflowExecutionStarted'
            # schedule the last activity on the (reversed, remember?) list to
            # begin the workflow.
            puts "** scheduling activity task: #{@activity_list.last[:name]}"

            task.schedule_activity_task( @activity_list.last,
              { :workflowId => "#{@workflowId}-activities" } )
```

アクティビティをスケジュールすると、Amazon SWF はスケジューリング中に渡されるアクティビティタスクリストに *アクティビティタスク* を送り、開始すべきタスクを合図します。「[サブスクリプションワークフローのチュートリアルのパート 3: アクティビティの実装](swf-sns-tutorial-implementing-activities.md)」でアクティビティタスクを扱いますが、ここでそのタスクを実行しないことは注目に値します。Amazon SWF にはそれを *scheduled* (スケジュールする) ようにとだけ指示します。

取り組む必要のある次のアクティビティは **ActivityTaskCompleted** イベントで、これは Amazon SWF がアクティビティタスクからアクティビティ完了の応答を取得したときに発生します。

```
          when 'ActivityTaskCompleted'
            # we are running the activities in strict sequential order, and
            # using the results of the previous activity as input for the next
            # activity.
            last_activity = @activity_list.pop

            if(@activity_list.empty?)
              puts "!! All activities complete! Sending complete_workflow_execution..."
              task.complete_workflow_execution
              return true;
            else
              # schedule the next activity, passing any results from the
              # previous activity. Results will be received in the activity
              # task.
              puts "** scheduling activity task: #{@activity_list.last[:name]}"
              if event.attributes.has_key?('result')
                task.schedule_activity_task(
                  @activity_list.last,
                  { :input => event.attributes[:result],
                    :workflowId => "#{@workflowId}-activities" } )
              else
                task.schedule_activity_task(
                  @activity_list.last, { :workflowId => "#{@workflowId}-activities" } )
              end
            end
```

タスクは直線的に実行されており、一度に実行されているアクティビティは 1 つだけであるため、この機会に`activity_list`スタックから完了したタスクをポップします。この結果が空のリストなら、ワークフローが完了していることが分かります。この場合、タスクで [complete\_workflow\_execution](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/DecisionTask.html#complete_workflow_execution-instance_method) を呼び出すことにより、ワークフローが完了したことを Amazon SWF に通知します。

リストにまだエントリがあるイベントでは、リストで (この場合もやはり last 位置にある) 次のアクティビティをスケジュールします。ただし今回は、完了時に前のアクティビティが Amazon SWF に結果データを返したかどうかを確認します。そのデータは、イベントの属性、オプションの `result` キーでワークフローに提供されます。アクティビティが結果を生成した場合は、それを `input` オプションとして、アクティビティタスクリストと共に次のスケジュールされたアクティビティに渡します。

完了済みアクティビティの `result` 値を取得することにより、またスケジュールされたアクティビティの `input` 値を設定することにより、1 つのアクティビティから次のアクティビティへとデータを渡したり、アクティビティから取得したデータを使用して、アクティビティの結果に基づいてディサイダー内の動作を変更することができます。

このチュートリアルでは、これら 2 つのイベントタイプがワークフローの動作を定義するのに最も重要です。ただし、アクティビティは **ActivityTaskCompleted** 以外のイベントを生成することができます。**ActivityTaskTimedOut** イベントと **ActivityTaskFailed** イベント、および実行するアクティビティが不足したときに Amazon SWF が行う `complete_workflow_execution` 呼び出しの処理時に生成される **WorkflowExecutionCompleted** イベントのデモハンドラコードを提供して、ディサイダーコードをまとめます。

```
          when 'ActivityTaskTimedOut'
            puts "!! Failing workflow execution! (timed out activity)"
            task.fail_workflow_execution
            return false

          when 'ActivityTaskFailed'
            puts "!! Failing workflow execution! (failed activity)"
            task.fail_workflow_execution
            return false

          when 'WorkflowExecutionCompleted'
            puts "## Yesss, workflow execution completed!"
            task.workflow_execution.terminate
            return false
        end
      end
    end
  end
```

## ワークフロー実行の開始
<a name="starting-the-workflow-execution"></a>

ワークフローがポーリングするための決定タスクを生成する前に、ワークフロー実行を開始する必要があります。

ワークフローの実行を開始するには、登録済みのワークフロータイプ ([AWS::SimpleWorkflow::WorkflowType](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/WorkflowType.html)) で [start\_execution](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/WorkflowType.html#start_execution-instance_method) を呼び出します。クラスコンストラクタで取得した `workflow_type` インスタンスメンバーを使用するため、これに小さなラッパーを定義します。

```
  def start_execution
    workflow_execution = @workflow_type.start_execution( {
      :workflowId => @workflowId } )
    poll_for_decisions
  end
end
```

ワークフローが実行中になると、決定イベントがワークフローのタスクリスト上に含まれ始め、[start\_execution](https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/WorkflowType.html#start_execution-instance_method) のワークフロー実行オプションとして渡されます。

ワークフロータイプが登録されるときに提供されるオプションとは異なり、`start_execution` に渡されるオプションは、ワークフロータイプの一部とは見なされません。これらは、ワークフローのバージョンを変更せずに、ワークフロー実行ごとに自由に変更できます。

ファイルの実行時にワークフローの実行を開始するため、 クラスをインスタンス化し、先ほど定義した `start_execution`メソッドを呼び出すコードを追加します。

```
if __FILE__ == $0
  require 'securerandom'

  # Use a different task list name every time we start a new workflow execution.
  #
  # This avoids issues if our pollers re-start before SWF considers them closed,
  # causing the pollers to get events from previously-run executions.
  workflowId = SecureRandom.uuid

  # Let the user start the activity worker first...

  puts ""
  puts "Amazon SWF Example"
  puts "------------------"
  puts ""
  puts "Start the activity worker, preferably in a separate command-line window, with"
  puts "the following command:"
  puts ""
  puts "> ruby swf_sns_activities.rb #{workflowId}-activities"
  puts ""
  puts "You can copy & paste it if you like, just don't copy the '>' character."
  puts ""
  puts "Press return when you're ready..."

  i = gets

  # Now, start the workflow.

  puts "Starting workflow execution."
  sample_workflow = SampleWorkflow.new(workflowId)
  sample_workflow.start_execution
end
```

タスクリストの名前の競合を避けるため、`SecureRandom.uuid` を使用してタスクリスト名として使用できるランダムな UUID を生成し、各ワークフロー実行に異なるタスクリスト名が使用されることを保証します。

**注記**  
タスクリストはワークフロー実行に関するイベントを記録するために使用されるので、同じワークフロータイプの複数の実行に同じタスクリストを使用すると、前の実行中に生成されたイベントを取得する可能性があります。新しいコードの試行やテストの実行中によくあるケースですが、特に互いにすぐ連続して実行される場合に、その可能性があります。

前の実行からのアーティファクトに対処しなければならないという問題を回避するには、各実行に新しいタスクリストを使用して、ワークフロー実行の開始時にそれを指定できます。

ここでもまた、それを実行している人 (おそらくお客様) に手順を示し、タスクリストの「アクティビティ」バージョンを提供するコードが少しあります。ディサイダーはこのタスクリスト名を使用してワークフローのアクティビティをスケジュールし、アクティビティ実装は、このタスクリスト名のアクティビティイベントをリッスンして、そのスケジュールされたアクティビティをいつ開始するかを知り、アクティビティ実行に関する更新を提供します。

また、コードはワークフロー実行を開始する*前に*ユーザーがアクティビティスターターの実行を開始するのを待機します。そのため、アクティビティタスクが提供されたタスクリストに表示され始めるときには、アクティビティスターターは応答する準備が整っています。

## 次のステップ
<a name="implementing-workflow-next-steps"></a>

ワークフローは実装しました。次に、「[サブスクリプションワークフローのチュートリアルのパート 3: アクティビティの実装](swf-sns-tutorial-implementing-activities.md)」では、アクティビティとアクティビティスターターを定義します。