用SDK于 Python 的 Step Functions 示例 (Boto3) - AWS SDK代码示例

AWS 文档 AWS SDK示例 GitHub 存储库中还有更多SDK示例

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

用SDK于 Python 的 Step Functions 示例 (Boto3)

以下代码示例向您展示了如何使用 with Step Functions 来执行操作和实现常见场景。 AWS SDK for Python (Boto3)

基础知识是向您展示如何在服务中执行基本操作的代码示例。

操作是大型程序的代码摘录,必须在上下文中运行。您可以通过操作了解如何调用单个服务函数,还可以通过函数相关场景的上下文查看操作。

场景是向您展示如何通过在一个服务中调用多个函数或与其他 AWS 服务结合来完成特定任务的代码示例。

每个示例都包含一个指向完整源代码的链接,您可以在其中找到有关如何在上下文中设置和运行代码的说明。

开始使用

以下代码示例展示了如何开始使用 Step Functions。

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

import boto3 def hello_stepfunctions(stepfunctions_client): """ Use the AWS SDK for Python (Boto3) to create an AWS Step Functions client and list the state machines in your account. This list might be empty if you haven't created any state machines. This example uses the default settings specified in your shared credentials and config files. :param stepfunctions_client: A Boto3 Step Functions Client object. """ print("Hello, Step Functions! Let's list up to 10 of your state machines:") state_machines = stepfunctions_client.list_state_machines(maxResults=10) for sm in state_machines["stateMachines"]: print(f"\t{sm['name']}: {sm['stateMachineArn']}") if __name__ == "__main__": hello_stepfunctions(boto3.client("stepfunctions"))
  • 有关API详细信息,请参阅ListStateMachines中的 AWS SDKPython (Boto3) API 参考。

基础知识

以下代码示例展示了如何:

  • 创建活动。

  • 根据包含先前创建的活动作为步骤的 Amazon States Language 定义创建状态机。

  • 运行状态机并使用用户输入响应活动。

  • 运行完成后获取最终状态和输出,然后清理资源。

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

在命令提示符中运行交互式场景。

class StateMachineScenario: """Runs an interactive scenario that shows how to get started using Step Functions.""" def __init__(self, activity, state_machine, iam_client): """ :param activity: An object that wraps activity actions. :param state_machine: An object that wraps state machine actions. :param iam_client: A Boto3 AWS Identity and Access Management (IAM) client. """ self.activity = activity self.state_machine = state_machine self.iam_client = iam_client self.state_machine_role = None def prerequisites(self, state_machine_role_name): """ Finds or creates an IAM role that can be assumed by Step Functions. A role of this kind is required to create a state machine. The state machine used in this example does not call any additional services, so it needs no additional permissions. :param state_machine_role_name: The name of the role. :return: Data about the role. """ trust_policy = { "Version": "2012-10-17", "Statement": [ { "Sid": "", "Effect": "Allow", "Principal": {"Service": "states.amazonaws.com"}, "Action": "sts:AssumeRole", } ], } try: role = self.iam_client.get_role(RoleName=state_machine_role_name) print(f"Prerequisite IAM role {state_machine_role_name} already exists.") except ClientError as err: if err.response["Error"]["Code"] == "NoSuchEntity": role = None else: logger.error( "Couldn't get prerequisite IAM role %s. Here's why: %s: %s", state_machine_role_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise if role is None: try: role = self.iam_client.create_role( RoleName=state_machine_role_name, AssumeRolePolicyDocument=json.dumps(trust_policy), ) except ClientError as err: logger.error( "Couldn't create prerequisite IAM role %s. Here's why: %s: %s", state_machine_role_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise self.state_machine_role = role["Role"] def find_or_create_activity(self, activity_name): """ Finds or creates a Step Functions activity. :param activity_name: The name of the activity. :return: The Amazon Resource Name (ARN) of the activity. """ print("First, let's set up an activity and state machine.") activity_arn = self.activity.find(activity_name) if activity_arn is None: activity_arn = self.activity.create(activity_name) print( f"Activity {activity_name} created. Its Amazon Resource Name (ARN) is " f"{activity_arn}." ) else: print(f"Activity {activity_name} already exists.") return activity_arn def find_or_create_state_machine( self, state_machine_name, activity_arn, state_machine_file ): """ Finds or creates a Step Functions state machine. :param state_machine_name: The name of the state machine. :param activity_arn: The ARN of an activity that is used as a step in the state machine. This ARN is injected into the state machine definition that's used to create the state machine. :param state_machine_file: The path to a file containing the state machine definition. :return: The ARN of the state machine. """ state_machine_arn = self.state_machine.find(state_machine_name) if state_machine_arn is None: with open(state_machine_file) as state_machine_file: state_machine_def = state_machine_file.read().replace( "{{DOC_EXAMPLE_ACTIVITY_ARN}}", activity_arn ) state_machine_arn = self.state_machine.create( state_machine_name, state_machine_def, self.state_machine_role["Arn"], ) print(f"State machine {state_machine_name} created.") else: print(f"State machine {state_machine_name} already exists.") print("-" * 88) print(f"Here's some information about state machine {state_machine_name}:") state_machine_info = self.state_machine.describe(state_machine_arn) for field in ["name", "status", "stateMachineArn", "roleArn"]: print(f"\t{field}: {state_machine_info[field]}") return state_machine_arn def run_state_machine(self, state_machine_arn, activity_arn): """ Run the state machine. The state machine used in this example is a simple chat simulation. It contains an activity step in a loop that is used for user interaction. When the state machine gets to the activity step, it waits for an external application to get task data and submit a response. This function acts as the activity application by getting task input and responding with user input. :param state_machine_arn: The ARN of the state machine. :param activity_arn: The ARN of the activity used as a step in the state machine. :return: The ARN of the run. """ print( f"Let's run the state machine. It's a simplistic, non-AI chat simulator " f"we'll call ChatSFN." ) user_name = q.ask("What should ChatSFN call you? ", q.non_empty) run_input = {"name": user_name} print("Starting state machine...") run_arn = self.state_machine.start(state_machine_arn, json.dumps(run_input)) action = None while action != "done": activity_task = self.activity.get_task(activity_arn) task_input = json.loads(activity_task["input"]) print(f"ChatSFN: {task_input['message']}") action = task_input["actions"][ q.choose("What now? ", task_input["actions"]) ] task_response = {"action": action} self.activity.send_task_success( activity_task["taskToken"], json.dumps(task_response) ) return run_arn def finish_state_machine_run(self, run_arn): """ Wait for the state machine run to finish, then print final status and output. :param run_arn: The ARN of the run to retrieve. """ print(f"Let's get the final output from the state machine:") status = "RUNNING" while status == "RUNNING": run_output = self.state_machine.describe_run(run_arn) status = run_output["status"] if status == "RUNNING": print( "The state machine is still running, let's wait for it to finish." ) wait(1) elif status == "SUCCEEDED": print(f"ChatSFN: {json.loads(run_output['output'])['message']}") else: print(f"Run status: {status}.") def cleanup( self, state_machine_name, state_machine_arn, activity_name, activity_arn, state_machine_role_name, ): """ Clean up resources created by this example. :param state_machine_name: The name of the state machine. :param state_machine_arn: The ARN of the state machine. :param activity_name: The name of the activity. :param activity_arn: The ARN of the activity. :param state_machine_role_name: The name of the role used by the state machine. """ if q.ask( "Do you want to delete the state machine, activity, and role created for this " "example? (y/n) ", q.is_yesno, ): self.state_machine.delete(state_machine_arn) print(f"Deleted state machine {state_machine_name}.") self.activity.delete(activity_arn) print(f"Deleted activity {activity_name}.") self.iam_client.delete_role(RoleName=state_machine_role_name) print(f"Deleted role {state_machine_role_name}.") def run_scenario(self, activity_name, state_machine_name): print("-" * 88) print("Welcome to the AWS Step Functions state machines demo.") print("-" * 88) activity_arn = self.find_or_create_activity(activity_name) state_machine_arn = self.find_or_create_state_machine( state_machine_name, activity_arn, "../../../resources/sample_files/chat_sfn_state_machine.json", ) print("-" * 88) run_arn = self.run_state_machine(state_machine_arn, activity_arn) print("-" * 88) self.finish_state_machine_run(run_arn) print("-" * 88) self.cleanup( state_machine_name, state_machine_arn, activity_name, activity_arn, self.state_machine_role["RoleName"], ) print("-" * 88) print("\nThanks for watching!") print("-" * 88) if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") try: stepfunctions_client = boto3.client("stepfunctions") iam_client = boto3.client("iam") scenario = StateMachineScenario( Activity(stepfunctions_client), StateMachine(stepfunctions_client), iam_client, ) scenario.prerequisites("doc-example-state-machine-chat") scenario.run_scenario("doc-example-activity", "doc-example-state-machine") except Exception: logging.exception("Something went wrong with the demo.")

定义一个包装状态机和操作的类。

class StateMachine: """Encapsulates Step Functions state machine actions.""" def __init__(self, stepfunctions_client): """ :param stepfunctions_client: A Boto3 Step Functions client. """ self.stepfunctions_client = stepfunctions_client def create(self, name, definition, role_arn): """ Creates a state machine with the specific definition. The state machine assumes the provided role before it starts a run. :param name: The name to give the state machine. :param definition: The Amazon States Language definition of the steps in the the state machine. :param role_arn: The Amazon Resource Name (ARN) of the role that is assumed by Step Functions when the state machine is run. :return: The ARN of the newly created state machine. """ try: response = self.stepfunctions_client.create_state_machine( name=name, definition=definition, roleArn=role_arn ) except ClientError as err: logger.error( "Couldn't create state machine %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["stateMachineArn"] def find(self, name): """ Find a state machine by name. This requires listing the state machines until one is found with a matching name. :param name: The name of the state machine to search for. :return: The ARN of the state machine if found; otherwise, None. """ try: paginator = self.stepfunctions_client.get_paginator("list_state_machines") for page in paginator.paginate(): for state_machine in page.get("stateMachines", []): if state_machine["name"] == name: return state_machine["stateMachineArn"] except ClientError as err: logger.error( "Couldn't list state machines. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def describe(self, state_machine_arn): """ Get data about a state machine. :param state_machine_arn: The ARN of the state machine to look up. :return: The retrieved state machine data. """ try: response = self.stepfunctions_client.describe_state_machine( stateMachineArn=state_machine_arn ) except ClientError as err: logger.error( "Couldn't describe state machine %s. Here's why: %s: %s", state_machine_arn, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response def start(self, state_machine_arn, run_input): """ Start a run of a state machine with a specified input. A run is also known as an "execution" in Step Functions. :param state_machine_arn: The ARN of the state machine to run. :param run_input: The input to the state machine, in JSON format. :return: The ARN of the run. This can be used to get information about the run, including its current status and final output. """ try: response = self.stepfunctions_client.start_execution( stateMachineArn=state_machine_arn, input=run_input ) except ClientError as err: logger.error( "Couldn't start state machine %s. Here's why: %s: %s", state_machine_arn, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["executionArn"] def describe_run(self, run_arn): """ Get data about a state machine run, such as its current status or final output. :param run_arn: The ARN of the run to look up. :return: The retrieved run data. """ try: response = self.stepfunctions_client.describe_execution( executionArn=run_arn ) except ClientError as err: logger.error( "Couldn't describe run %s. Here's why: %s: %s", run_arn, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response def delete(self, state_machine_arn): """ Delete a state machine and all of its run data. :param state_machine_arn: The ARN of the state machine to delete. """ try: response = self.stepfunctions_client.delete_state_machine( stateMachineArn=state_machine_arn ) except ClientError as err: logger.error( "Couldn't delete state machine %s. Here's why: %s: %s", state_machine_arn, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response

定义一个包装活动操作的类。

class Activity: """Encapsulates Step Function activity actions.""" def __init__(self, stepfunctions_client): """ :param stepfunctions_client: A Boto3 Step Functions client. """ self.stepfunctions_client = stepfunctions_client def create(self, name): """ Create an activity. :param name: The name of the activity to create. :return: The Amazon Resource Name (ARN) of the newly created activity. """ try: response = self.stepfunctions_client.create_activity(name=name) except ClientError as err: logger.error( "Couldn't create activity %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["activityArn"] def find(self, name): """ Find an activity by name. This requires listing activities until one is found with a matching name. :param name: The name of the activity to search for. :return: If found, the ARN of the activity; otherwise, None. """ try: paginator = self.stepfunctions_client.get_paginator("list_activities") for page in paginator.paginate(): for activity in page.get("activities", []): if activity["name"] == name: return activity["activityArn"] except ClientError as err: logger.error( "Couldn't list activities. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def get_task(self, activity_arn): """ Gets task data for an activity. When a state machine is waiting for the specified activity, a response is returned with data from the state machine. When a state machine is not waiting, this call blocks for 60 seconds. :param activity_arn: The ARN of the activity to get task data for. :return: The task data for the activity. """ try: response = self.stepfunctions_client.get_activity_task( activityArn=activity_arn ) except ClientError as err: logger.error( "Couldn't get a task for activity %s. Here's why: %s: %s", activity_arn, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response def send_task_success(self, task_token, task_response): """ Sends a success response to a waiting activity step. A state machine with an activity step waits for the activity to get task data and then respond with either success or failure before it resumes processing. :param task_token: The token associated with the task. This is included in the response to the get_activity_task action and must be sent without modification. :param task_response: The response data from the activity. This data is received and processed by the state machine. """ try: self.stepfunctions_client.send_task_success( taskToken=task_token, output=task_response ) except ClientError as err: logger.error( "Couldn't send task success. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete(self, activity_arn): """ Delete an activity. :param activity_arn: The ARN of the activity to delete. """ try: response = self.stepfunctions_client.delete_activity( activityArn=activity_arn ) except ClientError as err: logger.error( "Couldn't delete activity %s. Here's why: %s: %s", activity_arn, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response

操作

以下代码示例演示如何使用 CreateActivity

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class Activity: """Encapsulates Step Function activity actions.""" def __init__(self, stepfunctions_client): """ :param stepfunctions_client: A Boto3 Step Functions client. """ self.stepfunctions_client = stepfunctions_client def create(self, name): """ Create an activity. :param name: The name of the activity to create. :return: The Amazon Resource Name (ARN) of the newly created activity. """ try: response = self.stepfunctions_client.create_activity(name=name) except ClientError as err: logger.error( "Couldn't create activity %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["activityArn"]
  • 有关API详细信息,请参阅CreateActivity中的 AWS SDKPython (Boto3) API 参考。

以下代码示例演示如何使用 CreateStateMachine

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class StateMachine: """Encapsulates Step Functions state machine actions.""" def __init__(self, stepfunctions_client): """ :param stepfunctions_client: A Boto3 Step Functions client. """ self.stepfunctions_client = stepfunctions_client def create(self, name, definition, role_arn): """ Creates a state machine with the specific definition. The state machine assumes the provided role before it starts a run. :param name: The name to give the state machine. :param definition: The Amazon States Language definition of the steps in the the state machine. :param role_arn: The Amazon Resource Name (ARN) of the role that is assumed by Step Functions when the state machine is run. :return: The ARN of the newly created state machine. """ try: response = self.stepfunctions_client.create_state_machine( name=name, definition=definition, roleArn=role_arn ) except ClientError as err: logger.error( "Couldn't create state machine %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["stateMachineArn"]
  • 有关API详细信息,请参阅CreateStateMachine中的 AWS SDKPython (Boto3) API 参考。

以下代码示例演示如何使用 DeleteActivity

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class Activity: """Encapsulates Step Function activity actions.""" def __init__(self, stepfunctions_client): """ :param stepfunctions_client: A Boto3 Step Functions client. """ self.stepfunctions_client = stepfunctions_client def delete(self, activity_arn): """ Delete an activity. :param activity_arn: The ARN of the activity to delete. """ try: response = self.stepfunctions_client.delete_activity( activityArn=activity_arn ) except ClientError as err: logger.error( "Couldn't delete activity %s. Here's why: %s: %s", activity_arn, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response
  • 有关API详细信息,请参阅DeleteActivity中的 AWS SDKPython (Boto3) API 参考。

以下代码示例演示如何使用 DeleteStateMachine

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class StateMachine: """Encapsulates Step Functions state machine actions.""" def __init__(self, stepfunctions_client): """ :param stepfunctions_client: A Boto3 Step Functions client. """ self.stepfunctions_client = stepfunctions_client def delete(self, state_machine_arn): """ Delete a state machine and all of its run data. :param state_machine_arn: The ARN of the state machine to delete. """ try: response = self.stepfunctions_client.delete_state_machine( stateMachineArn=state_machine_arn ) except ClientError as err: logger.error( "Couldn't delete state machine %s. Here's why: %s: %s", state_machine_arn, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response
  • 有关API详细信息,请参阅DeleteStateMachine中的 AWS SDKPython (Boto3) API 参考。

以下代码示例演示如何使用 DescribeExecution

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

def describe_run(self, run_arn): """ Get data about a state machine run, such as its current status or final output. :param run_arn: The ARN of the run to look up. :return: The retrieved run data. """ try: response = self.stepfunctions_client.describe_execution( executionArn=run_arn ) except ClientError as err: logger.error( "Couldn't describe run %s. Here's why: %s: %s", run_arn, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response
  • 有关API详细信息,请参阅DescribeExecution中的 AWS SDKPython (Boto3) API 参考。

以下代码示例演示如何使用 DescribeStateMachine

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class StateMachine: """Encapsulates Step Functions state machine actions.""" def __init__(self, stepfunctions_client): """ :param stepfunctions_client: A Boto3 Step Functions client. """ self.stepfunctions_client = stepfunctions_client def describe(self, state_machine_arn): """ Get data about a state machine. :param state_machine_arn: The ARN of the state machine to look up. :return: The retrieved state machine data. """ try: response = self.stepfunctions_client.describe_state_machine( stateMachineArn=state_machine_arn ) except ClientError as err: logger.error( "Couldn't describe state machine %s. Here's why: %s: %s", state_machine_arn, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response

以下代码示例演示如何使用 GetActivityTask

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class Activity: """Encapsulates Step Function activity actions.""" def __init__(self, stepfunctions_client): """ :param stepfunctions_client: A Boto3 Step Functions client. """ self.stepfunctions_client = stepfunctions_client def get_task(self, activity_arn): """ Gets task data for an activity. When a state machine is waiting for the specified activity, a response is returned with data from the state machine. When a state machine is not waiting, this call blocks for 60 seconds. :param activity_arn: The ARN of the activity to get task data for. :return: The task data for the activity. """ try: response = self.stepfunctions_client.get_activity_task( activityArn=activity_arn ) except ClientError as err: logger.error( "Couldn't get a task for activity %s. Here's why: %s: %s", activity_arn, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response
  • 有关API详细信息,请参阅GetActivityTask中的 AWS SDKPython (Boto3) API 参考。

以下代码示例演示如何使用 ListActivities

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class Activity: """Encapsulates Step Function activity actions.""" def __init__(self, stepfunctions_client): """ :param stepfunctions_client: A Boto3 Step Functions client. """ self.stepfunctions_client = stepfunctions_client def find(self, name): """ Find an activity by name. This requires listing activities until one is found with a matching name. :param name: The name of the activity to search for. :return: If found, the ARN of the activity; otherwise, None. """ try: paginator = self.stepfunctions_client.get_paginator("list_activities") for page in paginator.paginate(): for activity in page.get("activities", []): if activity["name"] == name: return activity["activityArn"] except ClientError as err: logger.error( "Couldn't list activities. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • 有关API详细信息,请参阅ListActivities中的 AWS SDKPython (Boto3) API 参考。

以下代码示例演示如何使用 ListStateMachines

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

通过在状态机列表中搜索账户来按名称查找状态机。

class StateMachine: """Encapsulates Step Functions state machine actions.""" def __init__(self, stepfunctions_client): """ :param stepfunctions_client: A Boto3 Step Functions client. """ self.stepfunctions_client = stepfunctions_client def find(self, name): """ Find a state machine by name. This requires listing the state machines until one is found with a matching name. :param name: The name of the state machine to search for. :return: The ARN of the state machine if found; otherwise, None. """ try: paginator = self.stepfunctions_client.get_paginator("list_state_machines") for page in paginator.paginate(): for state_machine in page.get("stateMachines", []): if state_machine["name"] == name: return state_machine["stateMachineArn"] except ClientError as err: logger.error( "Couldn't list state machines. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • 有关API详细信息,请参阅ListStateMachines中的 AWS SDKPython (Boto3) API 参考。

以下代码示例演示如何使用 SendTaskSuccess

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class Activity: """Encapsulates Step Function activity actions.""" def __init__(self, stepfunctions_client): """ :param stepfunctions_client: A Boto3 Step Functions client. """ self.stepfunctions_client = stepfunctions_client def send_task_success(self, task_token, task_response): """ Sends a success response to a waiting activity step. A state machine with an activity step waits for the activity to get task data and then respond with either success or failure before it resumes processing. :param task_token: The token associated with the task. This is included in the response to the get_activity_task action and must be sent without modification. :param task_response: The response data from the activity. This data is received and processed by the state machine. """ try: self.stepfunctions_client.send_task_success( taskToken=task_token, output=task_response ) except ClientError as err: logger.error( "Couldn't send task success. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • 有关API详细信息,请参阅SendTaskSuccess中的 AWS SDKPython (Boto3) API 参考。

以下代码示例演示如何使用 StartExecution

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class StateMachine: """Encapsulates Step Functions state machine actions.""" def __init__(self, stepfunctions_client): """ :param stepfunctions_client: A Boto3 Step Functions client. """ self.stepfunctions_client = stepfunctions_client def start(self, state_machine_arn, run_input): """ Start a run of a state machine with a specified input. A run is also known as an "execution" in Step Functions. :param state_machine_arn: The ARN of the state machine to run. :param run_input: The input to the state machine, in JSON format. :return: The ARN of the run. This can be used to get information about the run, including its current status and final output. """ try: response = self.stepfunctions_client.start_execution( stateMachineArn=state_machine_arn, input=run_input ) except ClientError as err: logger.error( "Couldn't start state machine %s. Here's why: %s: %s", state_machine_arn, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["executionArn"]
  • 有关API详细信息,请参阅StartExecution中的 AWS SDKPython (Boto3) API 参考。

场景

以下代码示例说明如何创建用于从数据库表中检索消息记录的 AWS Step Functions Messenger 应用程序。

SDK适用于 Python (Boto3)

演示如何使用 with 创建信使应用程序,该 AWS SDK for Python (Boto3) 应用程序从 Amazon DynamoDB 表中检索消息记录并通过亚马逊简单队列服务 (Amazon) 将其发送。 AWS Step Functions SQS状态机集成了扫描数据库中是否有未发送消息的 AWS Lambda 功能。

  • 创建检索并更新 Amazon DynamoDB 表中的消息记录的状态机。

  • 更新状态机定义以同时向亚马逊简单队列服务 (AmazonSQS) 发送消息。

  • 启动和停止状态机运行。

  • 使用服务集成,从状态机连接到 Lambda、DynamoDB 和SQS亚马逊。

有关如何设置和运行的完整源代码和说明,请参阅上的完整示例GitHub

本示例中使用的服务
  • DynamoDB

  • Lambda

  • Amazon SQS

  • Step Functions

以下代码示例展示了如何使用 Amazon Bedrock 和 Step Functions 构建和编排生成式人工智能应用程序。

SDK适用于 Python (Boto3)

Amazon Bedrock Serverless Prompt Chaining 场景演示了如何使用 A AWS Step Functionsmazon Bedrock 和来构建和编排复杂、无服务器且高度https://docs.aws.amazon.com/bedrock/latest/userguide/agents.html可扩展的生成式 AI 应用程序。它包含以下工作示例:

  • 为文学博客撰写一份对给定小说的分析。此示例说明了一个简单的、按顺序排列的提示链。

  • 生成有关给定主题的短篇小说。此示例说明了 AI 如何以迭代方式处理其先前生成的项目列表。

  • 创建前往给定目的地的周末度假行程。此示例说明如何并行处理多个不同的提示。

  • 向扮演电影制片人的人类用户推销电影创意。此示例说明了如何使用不同的推理参数对同一个提示进行并行处理,如何回溯到链中的上一个步骤,以及如何将人工输入作为工作流程的一部分。

  • 根据用户手头的食材计划膳食。这个例子说明了提示链如何整合两个不同的人工智能对话,两个人工智能角色相互进行辩论以改善最终结果。

  • 查找并总结当今最热门的 GitHub 存储库。此示例说明如何链接多个与外部APIs交互的 AI 代理。

有关完整的源代码以及设置和运行说明,请参阅上的完整项目GitHub

本示例中使用的服务
  • Amazon Bedrock

  • Amazon Bedrock 运行时系统

  • Amazon 基岩代理商

  • Amazon 基岩代理运行时

  • Step Functions