範例藍本 - AWS Flow Framework 適用於 Java

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

範例藍本

有一個程式碼變更的類別視為回溯不相容。這些變更包含可修改已排程之任務的數目、類型或順序的更新。請考量下列範例:

您可以撰寫決策者程式碼來排定兩個計時器任務。您可以啟動執行,並執行決策。因此,已排定 ID 為 12 的兩個計時器任務。

如果您更新決策者程式碼在執行下個決策之前只排定一個計時器,則在下個決策任務期間,框架將無法重播第二個 TimerFired 事件,因為 ID 2 不符合程式碼產生的任何計時器任務。

藍本大綱

下列大綱顯示此藍本的步驟。此藍本的最後目標為遷移至系統,且此系統只排定一個計時器,但不會導致遷移前啟動的執行發生錯誤。

  1. 初始決策者版本

    1. 撰寫決策者。

    2. 啟動決策者。

    3. 決策者排定兩個計時器。

    4. 決策者啟動五次執行。

    5. 停止決策者。

  2. 回溯不相容決策者變更

    1. 修改決策者。

    2. 啟動決策者。

    3. 決策者排定一個計時器。

    4. 決策者啟動五次執行。

下列各節所包含的 Java 程式碼範例顯示如何實作此藍本。「解決方案」小節中的程式碼範例顯示各種方法來修正回溯不相容變更。

注意

您可以使用 AWS SDK for Java 的最新版本來執行此程式碼。

常見程式碼

下列 Java 程式碼在此藍本的範例之間不會變更。

SampleBase.java

package sample; import java.util.ArrayList; import java.util.List; import java.util.UUID; import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow; import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClientBuilder; import com.amazonaws.services.simpleworkflow.flow.JsonDataConverter; import com.amazonaws.services.simpleworkflow.model.DescribeWorkflowExecutionRequest; import com.amazonaws.services.simpleworkflow.model.DomainAlreadyExistsException; import com.amazonaws.services.simpleworkflow.model.RegisterDomainRequest; import com.amazonaws.services.simpleworkflow.model.Run; import com.amazonaws.services.simpleworkflow.model.StartWorkflowExecutionRequest; import com.amazonaws.services.simpleworkflow.model.TaskList; import com.amazonaws.services.simpleworkflow.model.WorkflowExecution; import com.amazonaws.services.simpleworkflow.model.WorkflowExecutionDetail; import com.amazonaws.services.simpleworkflow.model.WorkflowType; public class SampleBase { protected String domain = "DeciderChangeSample"; protected String taskList = "DeciderChangeSample-" + UUID.randomUUID().toString(); protected AmazonSimpleWorkflow service = AmazonSimpleWorkflowClientBuilder.defaultClient(); { try { AmazonSimpleWorkflowClientBuilder.defaultClient().registerDomain(new RegisterDomainRequest().withName(domain).withDescription("desc").withWorkflowExecutionRetentionPeriodInDays("7")); } catch (DomainAlreadyExistsException e) { } } protected List<WorkflowExecution> workflowExecutions = new ArrayList<>(); protected void startFiveExecutions(String workflow, String version, Object input) { for (int i = 0; i < 5; i++) { String id = UUID.randomUUID().toString(); Run startWorkflowExecution = service.startWorkflowExecution( new StartWorkflowExecutionRequest().withDomain(domain).withTaskList(new TaskList().withName(taskList)).withInput(new JsonDataConverter().toData(new Object[] { input })).withWorkflowId(id).withWorkflowType(new WorkflowType().withName(workflow).withVersion(version))); workflowExecutions.add(new WorkflowExecution().withWorkflowId(id).withRunId(startWorkflowExecution.getRunId())); sleep(1000); } } protected void printExecutionResults() { waitForExecutionsToClose(); System.out.println("\nResults:"); for (WorkflowExecution wid : workflowExecutions) { WorkflowExecutionDetail details = service.describeWorkflowExecution(new DescribeWorkflowExecutionRequest().withDomain(domain).withExecution(wid)); System.out.println(wid.getWorkflowId() + " " + details.getExecutionInfo().getCloseStatus()); } } protected void waitForExecutionsToClose() { loop: while (true) { for (WorkflowExecution wid : workflowExecutions) { WorkflowExecutionDetail details = service.describeWorkflowExecution(new DescribeWorkflowExecutionRequest().withDomain(domain).withExecution(wid)); if ("OPEN".equals(details.getExecutionInfo().getExecutionStatus())) { sleep(1000); continue loop; } } return; } } protected void sleep(int millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }

Input.java

package sample; public class Input { private Boolean skipSecondTimer; public Input() { } public Input(Boolean skipSecondTimer) { this.skipSecondTimer = skipSecondTimer; } public Boolean getSkipSecondTimer() { return skipSecondTimer != null && skipSecondTimer; } public Input setSkipSecondTimer(Boolean skipSecondTimer) { this.skipSecondTimer = skipSecondTimer; return this; } }

撰寫初始決策者程式碼

以下是決策者的初始 Java 程式碼。此程式碼註冊為第 1 版,並且排定兩個五秒計時器任務。

InitialDecider.java

package sample.v1; import com.amazonaws.services.simpleworkflow.flow.DecisionContext; import com.amazonaws.services.simpleworkflow.flow.DecisionContextProviderImpl; import com.amazonaws.services.simpleworkflow.flow.WorkflowClock; import com.amazonaws.services.simpleworkflow.flow.annotations.Execute; import com.amazonaws.services.simpleworkflow.flow.annotations.Workflow; import com.amazonaws.services.simpleworkflow.flow.annotations.WorkflowRegistrationOptions; import sample.Input; @Workflow @WorkflowRegistrationOptions(defaultExecutionStartToCloseTimeoutSeconds = 60, defaultTaskStartToCloseTimeoutSeconds = 5) public interface Foo { @Execute(version = "1") public void sample(Input input); public static class Impl implements Foo { private DecisionContext decisionContext = new DecisionContextProviderImpl().getDecisionContext(); private WorkflowClock clock = decisionContext.getWorkflowClock(); @Override public void sample(Input input) { System.out.println("Decision (V1) WorkflowId: " + decisionContext.getWorkflowContext().getWorkflowExecution().getWorkflowId()); clock.createTimer(5); clock.createTimer(5); } } }

模擬回溯不相容變更

下列修改過的決策者 Java 程式碼是個不錯的回溯不相容變更範例。程式碼仍然註冊為第 1 版,但只排定一個計時器。

ModifiedDecider.java

package sample.v1.modified; import com.amazonaws.services.simpleworkflow.flow.DecisionContext; import com.amazonaws.services.simpleworkflow.flow.DecisionContextProviderImpl; import com.amazonaws.services.simpleworkflow.flow.WorkflowClock; import com.amazonaws.services.simpleworkflow.flow.annotations.Execute; import com.amazonaws.services.simpleworkflow.flow.annotations.Workflow; import com.amazonaws.services.simpleworkflow.flow.annotations.WorkflowRegistrationOptions; import sample.Input; @Workflow @WorkflowRegistrationOptions(defaultExecutionStartToCloseTimeoutSeconds = 60, defaultTaskStartToCloseTimeoutSeconds = 5) public interface Foo { @Execute(version = "1") public void sample(Input input); public static class Impl implements Foo { private DecisionContext decisionContext = new DecisionContextProviderImpl().getDecisionContext(); private WorkflowClock clock = decisionContext.getWorkflowClock(); @Override public void sample(Input input) { System.out.println("Decision (V1 modified) WorkflowId: " + decisionContext.getWorkflowContext().getWorkflowExecution().getWorkflowId()); clock.createTimer(5); } } }

下列 Java 程式碼可讓您藉由執行修改過的決策者,來模擬進行回溯不相容變更的問題。

RunModifiedDecider.java

package sample; import com.amazonaws.services.simpleworkflow.flow.WorkflowWorker; public class BadChange extends SampleBase { public static void main(String[] args) throws Exception { new BadChange().run(); } public void run() throws Exception { // Start the first version of the decider WorkflowWorker before = new WorkflowWorker(service, domain, taskList); before.addWorkflowImplementationType(sample.v1.Foo.Impl.class); before.start(); // Start a few executions startFiveExecutions("Foo.sample", "1", new Input()); // Stop the first decider worker and wait a few seconds // for its pending pollers to match and return before.suspendPolling(); sleep(2000); // At this point, three executions are still open, with more decisions to make // Start the modified version of the decider WorkflowWorker after = new WorkflowWorker(service, domain, taskList); after.addWorkflowImplementationType(sample.v1.modified.Foo.Impl.class); after.start(); // Start a few more executions startFiveExecutions("Foo.sample", "1", new Input()); printExecutionResults(); } }

當您執行程式時,那三個失敗的執行就是在決策者初始版本下啟動並在遷移之後繼續的執行。