Invoke Anthropic Claude models on Amazon Bedrock using the Invoke Model API with a response stream - Amazon Bedrock

Invoke Anthropic Claude models on Amazon Bedrock using the Invoke Model API with a response stream

The following code examples show how to send a text message to Anthropic Claude models, using the Invoke Model API, and print the response stream.

.NET
AWS SDK for .NET
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Use the Invoke Model API to send a text message and process the response stream in real-time.

// Use the native inference API to send a text message to Anthropic Claude // and print the response stream. using System; using System.IO; using System.Text.Json; using System.Text.Json.Nodes; using Amazon; using Amazon.BedrockRuntime; using Amazon.BedrockRuntime.Model; // Create a Bedrock Runtime client in the AWS Region you want to use. var client = new AmazonBedrockRuntimeClient(RegionEndpoint.USEast1); // Set the model ID, e.g., Claude 3 Haiku. var modelId = "anthropic.claude-3-haiku-20240307-v1:0"; // Define the user message. var userMessage = "Describe the purpose of a 'hello world' program in one line."; //Format the request payload using the model's native structure. var nativeRequest = JsonSerializer.Serialize(new { anthropic_version = "bedrock-2023-05-31", max_tokens = 512, temperature = 0.5, messages = new[] { new { role = "user", content = userMessage } } }); // Create a request with the model ID, the user message, and an inference configuration. var request = new InvokeModelWithResponseStreamRequest() { ModelId = modelId, Body = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(nativeRequest)), ContentType = "application/json" }; try { // Send the request to the Bedrock Runtime and wait for the response. var streamingResponse = await client.InvokeModelWithResponseStreamAsync(request); // Extract and print the streamed response text in real-time. foreach (var item in streamingResponse.Body) { var chunk = JsonSerializer.Deserialize<JsonObject>((item as PayloadPart).Bytes); var text = chunk["delta"]?["text"] ?? ""; Console.Write(text); } } catch (AmazonBedrockRuntimeException e) { Console.WriteLine($"ERROR: Can't invoke '{modelId}'. Reason: {e.Message}"); throw; }
Go
SDK for Go V2
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Use the Invoke Model API to send a text message and process the response stream in real-time.

// Each model provider defines their own individual request and response formats. // For the format, ranges, and default values for the different models, refer to: // https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters.html type Request struct { Prompt string `json:"prompt"` MaxTokensToSample int `json:"max_tokens_to_sample"` Temperature float64 `json:"temperature,omitempty"` } type Response struct { Completion string `json:"completion"` } // Invokes Anthropic Claude on Amazon Bedrock to run an inference and asynchronously // process the response stream. func (wrapper InvokeModelWithResponseStreamWrapper) InvokeModelWithResponseStream(prompt string) (string, error) { modelId := "anthropic.claude-v2" // Anthropic Claude requires you to enclose the prompt as follows: prefix := "Human: " postfix := "\n\nAssistant:" prompt = prefix + prompt + postfix request := ClaudeRequest{ Prompt: prompt, MaxTokensToSample: 200, Temperature: 0.5, StopSequences: []string{"\n\nHuman:"}, } body, err := json.Marshal(request) if err != nil { log.Panicln("Couldn't marshal the request: ", err) } output, err := wrapper.BedrockRuntimeClient.InvokeModelWithResponseStream(context.Background(), &bedrockruntime.InvokeModelWithResponseStreamInput{ Body: body, ModelId: aws.String(modelId), ContentType: aws.String("application/json"), }) if err != nil { errMsg := err.Error() if strings.Contains(errMsg, "no such host") { log.Printf("The Bedrock service is not available in the selected region. Please double-check the service availability for your region at https://aws.amazon.com/about-aws/global-infrastructure/regional-product-services/.\n") } else if strings.Contains(errMsg, "Could not resolve the foundation model") { log.Printf("Could not resolve the foundation model from model identifier: \"%v\". Please verify that the requested model exists and is accessible within the specified region.\n", modelId) } else { log.Printf("Couldn't invoke Anthropic Claude. Here's why: %v\n", err) } } resp, err := processStreamingOutput(output, func(ctx context.Context, part []byte) error { fmt.Print(string(part)) return nil }) if err != nil { log.Fatal("streaming output processing error: ", err) } return resp.Completion, nil } type StreamingOutputHandler func(ctx context.Context, part []byte) error func processStreamingOutput(output *bedrockruntime.InvokeModelWithResponseStreamOutput, handler StreamingOutputHandler) (Response, error) { var combinedResult string resp := Response{} for event := range output.GetStream().Events() { switch v := event.(type) { case *types.ResponseStreamMemberChunk: //fmt.Println("payload", string(v.Value.Bytes)) var resp Response err := json.NewDecoder(bytes.NewReader(v.Value.Bytes)).Decode(&resp) if err != nil { return resp, err } err = handler(context.Background(), []byte(resp.Completion)) if err != nil { return resp, err } combinedResult += resp.Completion case *types.UnknownUnionMember: fmt.Println("unknown tag:", v.Tag) default: fmt.Println("union is nil or unknown type") } } resp.Completion = combinedResult return resp, nil }
Java
SDK for Java 2.x
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Use the Invoke Model API to send a text message and process the response stream in real-time.

// Use the native inference API to send a text message to Anthropic Claude // and print the response stream. import org.json.JSONObject; import org.json.JSONPointer; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.bedrockruntime.BedrockRuntimeAsyncClient; import software.amazon.awssdk.services.bedrockruntime.model.InvokeModelWithResponseStreamRequest; import software.amazon.awssdk.services.bedrockruntime.model.InvokeModelWithResponseStreamResponseHandler; import java.util.Objects; import java.util.concurrent.ExecutionException; import static software.amazon.awssdk.services.bedrockruntime.model.InvokeModelWithResponseStreamResponseHandler.Visitor; public class InvokeModelWithResponseStream { public static String invokeModelWithResponseStream() throws ExecutionException, InterruptedException { // Create a Bedrock Runtime client in the AWS Region you want to use. // Replace the DefaultCredentialsProvider with your preferred credentials provider. var client = BedrockRuntimeAsyncClient.builder() .credentialsProvider(DefaultCredentialsProvider.create()) .region(Region.US_EAST_1) .build(); // Set the model ID, e.g., Claude 3 Haiku. var modelId = "anthropic.claude-3-haiku-20240307-v1:0"; // The InvokeModelWithResponseStream API uses the model's native payload. // Learn more about the available inference parameters and response fields at: // https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-anthropic-claude-messages.html var nativeRequestTemplate = """ { "anthropic_version": "bedrock-2023-05-31", "max_tokens": 512, "temperature": 0.5, "messages": [{ "role": "user", "content": "{{prompt}}" }] }"""; // Define the prompt for the model. var prompt = "Describe the purpose of a 'hello world' program in one line."; // Embed the prompt in the model's native request payload. String nativeRequest = nativeRequestTemplate.replace("{{prompt}}", prompt); // Create a request with the model ID and the model's native request payload. var request = InvokeModelWithResponseStreamRequest.builder() .body(SdkBytes.fromUtf8String(nativeRequest)) .modelId(modelId) .build(); // Prepare a buffer to accumulate the generated response text. var completeResponseTextBuffer = new StringBuilder(); // Prepare a handler to extract, accumulate, and print the response text in real-time. var responseStreamHandler = InvokeModelWithResponseStreamResponseHandler.builder() .subscriber(Visitor.builder().onChunk(chunk -> { var response = new JSONObject(chunk.bytes().asUtf8String()); // Extract and print the text from the content blocks. if (Objects.equals(response.getString("type"), "content_block_delta")) { var text = new JSONPointer("/delta/text").queryFrom(response); System.out.print(text); // Append the text to the response text buffer. completeResponseTextBuffer.append(text); } }).build()).build(); try { // Send the request and wait for the handler to process the response. client.invokeModelWithResponseStream(request, responseStreamHandler).get(); // Return the complete response text. return completeResponseTextBuffer.toString(); } catch (ExecutionException | InterruptedException e) { System.err.printf("Can't invoke '%s': %s", modelId, e.getCause().getMessage()); throw new RuntimeException(e); } } public static void main(String[] args) throws ExecutionException, InterruptedException { invokeModelWithResponseStream(); } }
JavaScript
SDK for JavaScript (v3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Use the Invoke Model API to send a text message and process the response stream in real-time.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { fileURLToPath } from "url"; import { FoundationModels } from "../../config/foundation_models.js"; import { BedrockRuntimeClient, InvokeModelCommand, InvokeModelWithResponseStreamCommand, } from "@aws-sdk/client-bedrock-runtime"; /** * @typedef {Object} ResponseContent * @property {string} text * * @typedef {Object} MessagesResponseBody * @property {ResponseContent[]} content * * @typedef {Object} Delta * @property {string} text * * @typedef {Object} Message * @property {string} role * * @typedef {Object} Chunk * @property {string} type * @property {Delta} delta * @property {Message} message */ /** * Invokes Anthropic Claude 3 using the Messages API. * * To learn more about the Anthropic Messages API, go to: * https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-anthropic-claude-messages.html * * @param {string} prompt - The input text prompt for the model to complete. * @param {string} [modelId] - The ID of the model to use. Defaults to "anthropic.claude-3-haiku-20240307-v1:0". */ export const invokeModel = async ( prompt, modelId = "anthropic.claude-3-haiku-20240307-v1:0", ) => { // Create a new Bedrock Runtime client instance. const client = new BedrockRuntimeClient({ region: "us-east-1" }); // Prepare the payload for the model. const payload = { anthropic_version: "bedrock-2023-05-31", max_tokens: 1000, messages: [ { role: "user", content: [{ type: "text", text: prompt }], }, ], }; // Invoke Claude with the payload and wait for the response. const command = new InvokeModelCommand({ contentType: "application/json", body: JSON.stringify(payload), modelId, }); const apiResponse = await client.send(command); // Decode and return the response(s) const decodedResponseBody = new TextDecoder().decode(apiResponse.body); /** @type {MessagesResponseBody} */ const responseBody = JSON.parse(decodedResponseBody); return responseBody.content[0].text; }; /** * Invokes Anthropic Claude 3 and processes the response stream. * * To learn more about the Anthropic Messages API, go to: * https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-anthropic-claude-messages.html * * @param {string} prompt - The input text prompt for the model to complete. * @param {string} [modelId] - The ID of the model to use. Defaults to "anthropic.claude-3-haiku-20240307-v1:0". */ export const invokeModelWithResponseStream = async ( prompt, modelId = "anthropic.claude-3-haiku-20240307-v1:0", ) => { // Create a new Bedrock Runtime client instance. const client = new BedrockRuntimeClient({ region: "us-east-1" }); // Prepare the payload for the model. const payload = { anthropic_version: "bedrock-2023-05-31", max_tokens: 1000, messages: [ { role: "user", content: [{ type: "text", text: prompt }], }, ], }; // Invoke Claude with the payload and wait for the API to respond. const command = new InvokeModelWithResponseStreamCommand({ contentType: "application/json", body: JSON.stringify(payload), modelId, }); const apiResponse = await client.send(command); let completeMessage = ""; // Decode and process the response stream for await (const item of apiResponse.body) { /** @type Chunk */ const chunk = JSON.parse(new TextDecoder().decode(item.chunk.bytes)); const chunk_type = chunk.type; if (chunk_type === "content_block_delta") { const text = chunk.delta.text; completeMessage = completeMessage + text; process.stdout.write(text); } } // Return the final response return completeMessage; }; // Invoke the function if this file was run directly. if (process.argv[1] === fileURLToPath(import.meta.url)) { const prompt = 'Write a paragraph starting with: "Once upon a time..."'; const modelId = FoundationModels.CLAUDE_3_HAIKU.modelId; console.log(`Prompt: ${prompt}`); console.log(`Model ID: ${modelId}`); try { console.log("-".repeat(53)); const response = await invokeModel(prompt, modelId); console.log("\n" + "-".repeat(53)); console.log("Final structured response:"); console.log(response); } catch (err) { console.log(`\n${err}`); } }
Python
SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Use the Invoke Model API to send a text message and process the response stream in real-time.

# Use the native inference API to send a text message to Anthropic Claude # and print the response stream. import boto3 import json # Create a Bedrock Runtime client in the AWS Region of your choice. client = boto3.client("bedrock-runtime", region_name="us-east-1") # Set the model ID, e.g., Claude 3 Haiku. model_id = "anthropic.claude-3-haiku-20240307-v1:0" # Define the prompt for the model. prompt = "Describe the purpose of a 'hello world' program in one line." # Format the request payload using the model's native structure. native_request = { "anthropic_version": "bedrock-2023-05-31", "max_tokens": 512, "temperature": 0.5, "messages": [ { "role": "user", "content": [{"type": "text", "text": prompt}], } ], } # Convert the native request to JSON. request = json.dumps(native_request) # Invoke the model with the request. streaming_response = client.invoke_model_with_response_stream( modelId=model_id, body=request ) # Extract and print the response text in real-time. for event in streaming_response["body"]: chunk = json.loads(event["chunk"]["bytes"]) if chunk["type"] == "content_block_delta": print(chunk["delta"].get("text", ""), end="")

For a complete list of AWS SDK developer guides and code examples, see Using this service with an AWS SDK. This topic also includes information about getting started and details about previous SDK versions.