使用带有响应流的调用模型在 Amazon Bedrock 上调用 Anthropic Claud API e 模型 - AWS SDK 代码示例

AWS 文档 AWS SDK示例 GitHub 存储库中还有更多SDK示例

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用带有响应流的调用模型在 Amazon Bedrock 上调用 Anthropic Claud API e 模型

以下代码示例演示如何使用调用模型向 Anthropic Claude 模型API发送短信并打印响应流。

.NET
AWS SDK for .NET
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

使用 Invoke Model API 发送短信并实时处理响应流。

// Use the native inference API to send a text message to Anthropic Claude // and print the response stream. using System; using System.IO; using System.Text.Json; using System.Text.Json.Nodes; using Amazon; using Amazon.BedrockRuntime; using Amazon.BedrockRuntime.Model; // Create a Bedrock Runtime client in the AWS Region you want to use. var client = new AmazonBedrockRuntimeClient(RegionEndpoint.USEast1); // Set the model ID, e.g., Claude 3 Haiku. var modelId = "anthropic.claude-3-haiku-20240307-v1:0"; // Define the user message. var userMessage = "Describe the purpose of a 'hello world' program in one line."; //Format the request payload using the model's native structure. var nativeRequest = JsonSerializer.Serialize(new { anthropic_version = "bedrock-2023-05-31", max_tokens = 512, temperature = 0.5, messages = new[] { new { role = "user", content = userMessage } } }); // Create a request with the model ID, the user message, and an inference configuration. var request = new InvokeModelWithResponseStreamRequest() { ModelId = modelId, Body = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(nativeRequest)), ContentType = "application/json" }; try { // Send the request to the Bedrock Runtime and wait for the response. var streamingResponse = await client.InvokeModelWithResponseStreamAsync(request); // Extract and print the streamed response text in real-time. foreach (var item in streamingResponse.Body) { var chunk = JsonSerializer.Deserialize<JsonObject>((item as PayloadPart).Bytes); var text = chunk["delta"]?["text"] ?? ""; Console.Write(text); } } catch (AmazonBedrockRuntimeException e) { Console.WriteLine($"ERROR: Can't invoke '{modelId}'. Reason: {e.Message}"); throw; }
Go
SDK适用于 Go V2
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

使用 Invoke Model API 发送短信并实时处理响应流。

import ( "context" "encoding/json" "log" "strings" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/bedrockruntime" ) // InvokeModelWrapper encapsulates Amazon Bedrock actions used in the examples. // It contains a Bedrock Runtime client that is used to invoke foundation models. type InvokeModelWrapper struct { BedrockRuntimeClient *bedrockruntime.Client } // Each model provider defines their own individual request and response formats. // For the format, ranges, and default values for the different models, refer to: // https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters.html type Request struct { Prompt string `json:"prompt"` MaxTokensToSample int `json:"max_tokens_to_sample"` Temperature float64 `json:"temperature,omitempty"` } type Response struct { Completion string `json:"completion"` } // Invokes Anthropic Claude on Amazon Bedrock to run an inference and asynchronously // process the response stream. func (wrapper InvokeModelWithResponseStreamWrapper) InvokeModelWithResponseStream(ctx context.Context, prompt string) (string, error) { modelId := "anthropic.claude-v2" // Anthropic Claude requires you to enclose the prompt as follows: prefix := "Human: " postfix := "\n\nAssistant:" prompt = prefix + prompt + postfix request := ClaudeRequest{ Prompt: prompt, MaxTokensToSample: 200, Temperature: 0.5, StopSequences: []string{"\n\nHuman:"}, } body, err := json.Marshal(request) if err != nil { log.Panicln("Couldn't marshal the request: ", err) } output, err := wrapper.BedrockRuntimeClient.InvokeModelWithResponseStream(ctx, &bedrockruntime.InvokeModelWithResponseStreamInput{ Body: body, ModelId: aws.String(modelId), ContentType: aws.String("application/json"), }) if err != nil { errMsg := err.Error() if strings.Contains(errMsg, "no such host") { log.Printf("The Bedrock service is not available in the selected region. Please double-check the service availability for your region at https://aws.amazon.com/about-aws/global-infrastructure/regional-product-services/.\n") } else if strings.Contains(errMsg, "Could not resolve the foundation model") { log.Printf("Could not resolve the foundation model from model identifier: \"%v\". Please verify that the requested model exists and is accessible within the specified region.\n", modelId) } else { log.Printf("Couldn't invoke Anthropic Claude. Here's why: %v\n", err) } } resp, err := processStreamingOutput(ctx, output, func(ctx context.Context, part []byte) error { fmt.Print(string(part)) return nil }) if err != nil { log.Fatal("streaming output processing error: ", err) } return resp.Completion, nil } type StreamingOutputHandler func(ctx context.Context, part []byte) error func processStreamingOutput(ctx context.Context, output *bedrockruntime.InvokeModelWithResponseStreamOutput, handler StreamingOutputHandler) (Response, error) { var combinedResult string resp := Response{} for event := range output.GetStream().Events() { switch v := event.(type) { case *types.ResponseStreamMemberChunk: //fmt.Println("payload", string(v.Value.Bytes)) var resp Response err := json.NewDecoder(bytes.NewReader(v.Value.Bytes)).Decode(&resp) if err != nil { return resp, err } err = handler(ctx, []byte(resp.Completion)) if err != nil { return resp, err } combinedResult += resp.Completion case *types.UnknownUnionMember: fmt.Println("unknown tag:", v.Tag) default: fmt.Println("union is nil or unknown type") } } resp.Completion = combinedResult return resp, nil }
Java
SDK适用于 Java 2.x
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

使用 Invoke Model API 发送短信并实时处理响应流。

// Use the native inference API to send a text message to Anthropic Claude // and print the response stream. import org.json.JSONObject; import org.json.JSONPointer; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.bedrockruntime.BedrockRuntimeAsyncClient; import software.amazon.awssdk.services.bedrockruntime.model.InvokeModelWithResponseStreamRequest; import software.amazon.awssdk.services.bedrockruntime.model.InvokeModelWithResponseStreamResponseHandler; import java.util.Objects; import java.util.concurrent.ExecutionException; import static software.amazon.awssdk.services.bedrockruntime.model.InvokeModelWithResponseStreamResponseHandler.Visitor; public class InvokeModelWithResponseStream { public static String invokeModelWithResponseStream() throws ExecutionException, InterruptedException { // Create a Bedrock Runtime client in the AWS Region you want to use. // Replace the DefaultCredentialsProvider with your preferred credentials provider. var client = BedrockRuntimeAsyncClient.builder() .credentialsProvider(DefaultCredentialsProvider.create()) .region(Region.US_EAST_1) .build(); // Set the model ID, e.g., Claude 3 Haiku. var modelId = "anthropic.claude-3-haiku-20240307-v1:0"; // The InvokeModelWithResponseStream API uses the model's native payload. // Learn more about the available inference parameters and response fields at: // https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-anthropic-claude-messages.html var nativeRequestTemplate = """ { "anthropic_version": "bedrock-2023-05-31", "max_tokens": 512, "temperature": 0.5, "messages": [{ "role": "user", "content": "{{prompt}}" }] }"""; // Define the prompt for the model. var prompt = "Describe the purpose of a 'hello world' program in one line."; // Embed the prompt in the model's native request payload. String nativeRequest = nativeRequestTemplate.replace("{{prompt}}", prompt); // Create a request with the model ID and the model's native request payload. var request = InvokeModelWithResponseStreamRequest.builder() .body(SdkBytes.fromUtf8String(nativeRequest)) .modelId(modelId) .build(); // Prepare a buffer to accumulate the generated response text. var completeResponseTextBuffer = new StringBuilder(); // Prepare a handler to extract, accumulate, and print the response text in real-time. var responseStreamHandler = InvokeModelWithResponseStreamResponseHandler.builder() .subscriber(Visitor.builder().onChunk(chunk -> { var response = new JSONObject(chunk.bytes().asUtf8String()); // Extract and print the text from the content blocks. if (Objects.equals(response.getString("type"), "content_block_delta")) { var text = new JSONPointer("/delta/text").queryFrom(response); System.out.print(text); // Append the text to the response text buffer. completeResponseTextBuffer.append(text); } }).build()).build(); try { // Send the request and wait for the handler to process the response. client.invokeModelWithResponseStream(request, responseStreamHandler).get(); // Return the complete response text. return completeResponseTextBuffer.toString(); } catch (ExecutionException | InterruptedException e) { System.err.printf("Can't invoke '%s': %s", modelId, e.getCause().getMessage()); throw new RuntimeException(e); } } public static void main(String[] args) throws ExecutionException, InterruptedException { invokeModelWithResponseStream(); } }
JavaScript
SDK对于 JavaScript (v3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

使用 Invoke Model API 发送短信并实时处理响应流。

import { fileURLToPath } from "node:url"; import { FoundationModels } from "../../config/foundation_models.js"; import { BedrockRuntimeClient, InvokeModelCommand, InvokeModelWithResponseStreamCommand, } from "@aws-sdk/client-bedrock-runtime"; /** * @typedef {Object} ResponseContent * @property {string} text * * @typedef {Object} MessagesResponseBody * @property {ResponseContent[]} content * * @typedef {Object} Delta * @property {string} text * * @typedef {Object} Message * @property {string} role * * @typedef {Object} Chunk * @property {string} type * @property {Delta} delta * @property {Message} message */ /** * Invokes Anthropic Claude 3 using the Messages API. * * To learn more about the Anthropic Messages API, go to: * https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-anthropic-claude-messages.html * * @param {string} prompt - The input text prompt for the model to complete. * @param {string} [modelId] - The ID of the model to use. Defaults to "anthropic.claude-3-haiku-20240307-v1:0". */ export const invokeModel = async ( prompt, modelId = "anthropic.claude-3-haiku-20240307-v1:0", ) => { // Create a new Bedrock Runtime client instance. const client = new BedrockRuntimeClient({ region: "us-east-1" }); // Prepare the payload for the model. const payload = { anthropic_version: "bedrock-2023-05-31", max_tokens: 1000, messages: [ { role: "user", content: [{ type: "text", text: prompt }], }, ], }; // Invoke Claude with the payload and wait for the response. const command = new InvokeModelCommand({ contentType: "application/json", body: JSON.stringify(payload), modelId, }); const apiResponse = await client.send(command); // Decode and return the response(s) const decodedResponseBody = new TextDecoder().decode(apiResponse.body); /** @type {MessagesResponseBody} */ const responseBody = JSON.parse(decodedResponseBody); return responseBody.content[0].text; }; /** * Invokes Anthropic Claude 3 and processes the response stream. * * To learn more about the Anthropic Messages API, go to: * https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-anthropic-claude-messages.html * * @param {string} prompt - The input text prompt for the model to complete. * @param {string} [modelId] - The ID of the model to use. Defaults to "anthropic.claude-3-haiku-20240307-v1:0". */ export const invokeModelWithResponseStream = async ( prompt, modelId = "anthropic.claude-3-haiku-20240307-v1:0", ) => { // Create a new Bedrock Runtime client instance. const client = new BedrockRuntimeClient({ region: "us-east-1" }); // Prepare the payload for the model. const payload = { anthropic_version: "bedrock-2023-05-31", max_tokens: 1000, messages: [ { role: "user", content: [{ type: "text", text: prompt }], }, ], }; // Invoke Claude with the payload and wait for the API to respond. const command = new InvokeModelWithResponseStreamCommand({ contentType: "application/json", body: JSON.stringify(payload), modelId, }); const apiResponse = await client.send(command); let completeMessage = ""; // Decode and process the response stream for await (const item of apiResponse.body) { /** @type Chunk */ const chunk = JSON.parse(new TextDecoder().decode(item.chunk.bytes)); const chunk_type = chunk.type; if (chunk_type === "content_block_delta") { const text = chunk.delta.text; completeMessage = completeMessage + text; process.stdout.write(text); } } // Return the final response return completeMessage; }; // Invoke the function if this file was run directly. if (process.argv[1] === fileURLToPath(import.meta.url)) { const prompt = 'Write a paragraph starting with: "Once upon a time..."'; const modelId = FoundationModels.CLAUDE_3_HAIKU.modelId; console.log(`Prompt: ${prompt}`); console.log(`Model ID: ${modelId}`); try { console.log("-".repeat(53)); const response = await invokeModel(prompt, modelId); console.log(`\n${"-".repeat(53)}`); console.log("Final structured response:"); console.log(response); } catch (err) { console.log(`\n${err}`); } }
Python
SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

使用 Invoke Model API 发送短信并实时处理响应流。

# Use the native inference API to send a text message to Anthropic Claude # and print the response stream. import boto3 import json # Create a Bedrock Runtime client in the AWS Region of your choice. client = boto3.client("bedrock-runtime", region_name="us-east-1") # Set the model ID, e.g., Claude 3 Haiku. model_id = "anthropic.claude-3-haiku-20240307-v1:0" # Define the prompt for the model. prompt = "Describe the purpose of a 'hello world' program in one line." # Format the request payload using the model's native structure. native_request = { "anthropic_version": "bedrock-2023-05-31", "max_tokens": 512, "temperature": 0.5, "messages": [ { "role": "user", "content": [{"type": "text", "text": prompt}], } ], } # Convert the native request to JSON. request = json.dumps(native_request) # Invoke the model with the request. streaming_response = client.invoke_model_with_response_stream( modelId=model_id, body=request ) # Extract and print the response text in real-time. for event in streaming_response["body"]: chunk = json.loads(event["chunk"]["bytes"]) if chunk["type"] == "content_block_delta": print(chunk["delta"].get("text", ""), end="")