本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
与 AWS SDK InvokeFlow
配合使用
以下代码示例演示如何使用 InvokeFlow
。
操作示例是大型程序的代码摘录,必须在上下文中运行。在以下代码示例中,您可以查看此操作的上下文:
- JavaScript
-
- 适用于 JavaScript (v3) 的软件开发工具包
-
注意
还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 import { fileURLToPath } from "node:url"; import { BedrockAgentRuntimeClient, InvokeFlowCommand, } from "@aws-sdk/client-bedrock-agent-runtime"; /** * Invokes an alias of a flow to run the inputs that you specify and return * the output of each node as a stream. * * @param {{ * flowIdentifier: string, * flowAliasIdentifier: string, * prompt?: string, * region?: string * }} options * @returns {Promise<import("@aws-sdk/client-bedrock-agent").FlowNodeOutput>} An object containing information about the output from flow invocation. */ export const invokeBedrockFlow = async ({ flowIdentifier, flowAliasIdentifier, prompt = "Hi, how are you?", region = "us-east-1", }) => { const client = new BedrockAgentRuntimeClient({ region }); const command = new InvokeFlowCommand({ flowIdentifier, flowAliasIdentifier, inputs: [ { content: { document: prompt, }, nodeName: "FlowInputNode", nodeOutputName: "document", }, ], }); let flowResponse = {}; const response = await client.send(command); for await (const chunkEvent of response.responseStream) { const { flowOutputEvent, flowCompletionEvent } = chunkEvent; if (flowOutputEvent) { flowResponse = { ...flowResponse, ...flowOutputEvent }; console.log("Flow output event:", flowOutputEvent); } else if (flowCompletionEvent) { flowResponse = { ...flowResponse, ...flowCompletionEvent }; console.log("Flow completion event:", flowCompletionEvent); } } return flowResponse; }; // Call function if run directly import { parseArgs } from "node:util"; import { isMain, validateArgs, } from "@aws-doc-sdk-examples/lib/utils/util-node.js"; const loadArgs = () => { const options = { flowIdentifier: { type: "string", required: true, }, flowAliasIdentifier: { type: "string", required: true, }, prompt: { type: "string", }, region: { type: "string", }, }; const results = parseArgs({ options }); const { errors } = validateArgs({ options }, results); return { errors, results }; }; if (isMain(import.meta.url)) { const { errors, results } = loadArgs(); if (!errors) { invokeBedrockFlow(results.values); } else { console.error(errors.join("\n")); } }
-
有关 API 的详细信息,请参阅 AWS SDK for JavaScript API 参考InvokeFlow中的。
-
- Python
-
- 适用于 Python 的 SDK(Boto3)
-
注意
还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 调用流程。
def invoke_flow(self, flow_id, flow_alias_id, input_data, execution_id): """ Invoke an Amazon Bedrock flow and handle the response stream. Args: param flow_id: The ID of the flow to invoke. param flow_alias_id: The alias ID of the flow. param input_data: Input data for the flow. param execution_id: Execution ID for continuing a flow. Use the value None on first run. Return: Response from the flow. """ try: request_params = None if execution_id is None: # Don't pass execution ID for first run. request_params = { "flowIdentifier": flow_id, "flowAliasIdentifier": flow_alias_id, "inputs": input_data, "enableTrace": True } else: request_params = { "flowIdentifier": flow_id, "flowAliasIdentifier": flow_alias_id, "executionId": execution_id, "inputs": input_data, "enableTrace": True } response = self.agents_runtime_client.invoke_flow(**request_params) if "executionId" not in request_params: execution_id = response['executionId'] result = "" # Get the streaming response for event in response['responseStream']: result = result + str(event) + '\n' print(result) except ClientError as e: logger.error("Couldn't invoke flow %s.", {e}) raise return result
-
有关 API 的详细信息,请参阅适用InvokeFlow于 Python 的AWS SDK (Boto3) API 参考。
-
有关 S AWS DK 开发者指南和代码示例的完整列表,请参阅将 Amazon Bedrock 与 SD AWS K 配合使用。本主题还包括有关入门的信息以及有关先前的 SDK 版本的详细信息。
InvokeAgent
场景