

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Menggunakan API Streaming Dua Arah
<a name="speech-bidirection"></a>

**catatan**  
Dokumentasi ini untuk Amazon Nova Versi 1. Untuk panduan Amazon Nova 2 Sonic, kunjungi [Memulai](https://docs.aws.amazon.com/nova/latest/nova2-userguide/sonic-getting-started.html).

Model Amazon Nova Sonic menggunakan `InvokeModelWithBidirectionalStream` API, yang memungkinkan percakapan streaming dua arah waktu nyata. Ini berbeda dari pola permintaan-respons tradisional dengan mempertahankan saluran terbuka untuk streaming audio berkelanjutan di kedua arah.

Berikut ini AWS SDKs mendukung API streaming dua arah baru:
+ [AWS SDK for .NET](https://aws.amazon.com/sdk-for-net/)
+ [AWS SDK for C\$1\$1](https://aws.amazon.com/sdk-for-cpp/)
+ [AWS SDK for Java](https://aws.amazon.com/sdk-for-java/)
+ [AWS SDK untuk JavaScript](https://aws.amazon.com/sdk-for-javascript/)
+ [AWS SDK para Kotlin](https://aws.amazon.com/sdk-for-kotlin/)
+ [AWS SDK for Ruby](https://aws.amazon.com/sdk-for-ruby/)
+ [AWS SDK untuk Rust](https://aws.amazon.com/sdk-for-rust/)
+ [AWS SDK para Swift](https://aws.amazon.com/sdk-for-swift/)

Pengembang Python dapat menggunakan [SDK eksperimental baru](https://github.com/awslabs/aws-sdk-python) ini yang membuatnya lebih mudah untuk menggunakan kemampuan streaming dua arah Amazon Nova Sonic.

Contoh kode berikut akan membantu Anda memulai dengan API dua arah. Untuk daftar lengkap contoh, lihat halaman Sampel [Github](https://github.com/aws-samples/amazon-nova-samples/tree/main/speech-to-speech) Amazon Nova Sonic.

## Menyiapkan klien
<a name="set-up-the-client"></a>

Contoh berikut dapat digunakan untuk mengatur klien dan mulai menggunakan API dua arah.

------
#### [ Python ]

```
def _initialize_client(self):
    """Initialize the Bedrock client."""
    config = Config(
        endpoint_uri=f"https://bedrock-runtime.{self.region}.amazonaws.com",
        region=self.region,
        aws_credentials_identity_resolver=EnvironmentCredentialsResolver(),
        http_auth_scheme_resolver=HTTPAuthSchemeResolver(),
        http_auth_schemes={"aws.auth#sigv4": SigV4AuthScheme()}
    )
    self.bedrock_client = BedrockRuntimeClient(config=config)
```

------
#### [ Java ]

```
// The nettyBuilder is optional and mentioned here for clarity, all our APIs support http2
// and will default to the protocol if the netty builder is not specified.
NettyNioAsyncHttpClient.Builder nettyBuilder = NettyNioAsyncHttpClient.builder()
        .readTimeout(Duration.of(180, ChronoUnit.SECONDS))
        .maxConcurrency(20)
        .protocol(Protocol.HTTP2)
        .protocolNegotiation(ProtocolNegotiation.ALPN);
        

BedrockRuntimeAsyncClient client = BedrockRuntimeAsyncClient.builder()
        .region(Region.US_EAST_1)
        .credentialsProvider(ProfileCredentialsProvider.create("NOVA-PROFILE"))
        .httpClientBuilder(nettyBuilder)
        .build();
```

------
#### [ Node.js ]

```
const { BedrockRuntimeClient } = require("@aws-sdk/client-bedrock-runtime");
const { NodeHttp2Handler } = require("@smithy/node-http-handler");
const { fromIni } = require("@aws-sdk/credential-provider-ini");

// Configure HTTP/2 client for bidirectional streaming 
// (This is optional, all our APIs support http2 so we will default to http2 if handler is not specified)
const nodeHttp2Handler = new NodeHttp2Handler({
    requestTimeout: 300000,
    sessionTimeout: 300000,
    disableConcurrentStreams: false,
    maxConcurrentStreams: 20,
});

// Create a Bedrock client
const client = new BedrockRuntimeClient({
    region: "us-east-1",
    credentials: fromIni({ profile: "NOVA-PROFILE" }), // Or use other credential providers
    requestHandler: nodeHttp2Handler,
});
```

------

## Menangani acara
<a name="handle-events"></a>

Contoh berikut dapat digunakan untuk menangani peristiwa dengan API dua arah.

------
#### [ Python ]

```
self.stream_response = await self.bedrock_client.invoke_model_with_bidirectional_stream(
                InvokeModelWithBidirectionalStreamInput(model_id=self.model_id)
            )
self.is_active = True
```

```
async def _process_responses(self):
        """Process incoming responses from Bedrock."""
        try:            
            while self.is_active:
                try:
                    output = await self.stream_response.await_output()
                    result = await output[1].receive()
                    if result.value and result.value.bytes_:
                        try:
                            response_data = result.value.bytes_.decode('utf-8')
                            json_data = json.loads(response_data)
                            
                            # Handle different response types
                            if 'event' in json_data:
                                if 'contentStart' in json_data['event']:
                                    content_start = json_data['event']['contentStart']
                                    # set role
                                    self.role = content_start['role']
                                    # Check for speculative content
                                    if 'additionalModelFields' in content_start:
                                        try:
                                            additional_fields = json.loads(content_start['additionalModelFields'])
                                            if additional_fields.get('generationStage') == 'SPECULATIVE':
                                                self.display_assistant_text = True
                                            else:
                                                self.display_assistant_text = False
                                        except json.JSONDecodeError:
                                            print("Error parsing additionalModelFields")
                                elif 'textOutput' in json_data['event']:
                                    text_content = json_data['event']['textOutput']['content']
                                    role = json_data['event']['textOutput']['role']
                                    # Check if there is a barge-in
                                    if '{ "interrupted" : true }' in text_content:
                                        self.barge_in = True

                                    if (self.role == "ASSISTANT" and self.display_assistant_text):
                                        print(f"Assistant: {text_content}")
                                    elif (self.role == "USER"):
                                        print(f"User: {text_content}")

                                elif 'audioOutput' in json_data['event']:
                                    audio_content = json_data['event']['audioOutput']['content']
                                    audio_bytes = base64.b64decode(audio_content)
                                    await self.audio_output_queue.put(audio_bytes)
                                elif 'toolUse' in json_data['event']:
                                    self.toolUseContent = json_data['event']['toolUse']
                                    self.toolName = json_data['event']['toolUse']['toolName']
                                    self.toolUseId = json_data['event']['toolUse']['toolUseId']
                                elif 'contentEnd' in json_data['event'] and json_data['event'].get('contentEnd', {}).get('type') == 'TOOL':
                                    toolResult = await self.processToolUse(self.toolName, self.toolUseContent)
                                    toolContent = str(uuid.uuid4())
                                    await self.send_tool_start_event(toolContent)
                                    await self.send_tool_result_event(toolContent, toolResult)
                                    await self.send_tool_content_end_event(toolContent)
                                elif 'completionEnd' in json_data['event']:
                                    # Handle end of conversation, no more response will be generated
                                    print("End of response sequence")
                                   
                            
                            # Put the response in the output queue for other components
                            await self.output_queue.put(json_data)
                        except json.JSONDecodeError:
                            await self.output_queue.put({"raw_data": response_data})
                except StopAsyncIteration:
                    # Stream has ended
                    break
                except Exception as e:
                   # Handle ValidationException properly
                    if "ValidationException" in str(e):
                        error_message = str(e)
                        print(f"Validation error: {error_message}")
                    else:
                        print(f"Error receiving response: {e}")
                    break
                    
        except Exception as e:
            print(f"Response processing error: {e}")
        finally:
            self.is_active = False
```

------
#### [ Java ]

```
public class ResponseHandler implements InvokeModelWithBidirectionalStreamResponseHandler {
    @Override
    public void responseReceived(InvokeModelWithBidirectionalStreamResponse response) {
        // Handle initial response
        log.info("Bedrock Nova Sonic request id: {}", response.responseMetadata().requestId());
    }

    @Override
    public void onEventStream(SdkPublisher<InvokeModelWithBidirectionalStreamOutput> sdkPublisher) {
        log.info("Bedrock Nova S2S event stream received");
        var completableFuture = sdkPublisher.subscribe((output) -> output.accept(new Visitor() {
            @Override
            public void visitChunk(BidirectionalOutputPayloadPart event) {
                log.info("Bedrock S2S chunk received, converting to payload");
                String payloadString =
                        StandardCharsets.UTF_8.decode((event.bytes().asByteBuffer().rewind().duplicate())).toString();
                log.info("Bedrock S2S payload: {}", payloadString);
                    delegate.onNext(payloadString);
            }
        }));

        // if any of the chunks fail to parse or be handled ensure to send an error or they will get lost
        completableFuture.exceptionally(t -> {
            delegate.onError(new Exception(t));
            return null;
        });
    }

    @Override
    public void exceptionOccurred(Throwable throwable) {
        // Handle errors
        System.err.println("Error: " + throwable.getMessage());
        throwable.printStackTrace();
    }

    @Override
    public void complete() {
        // Handle completion
        System.out.println("Stream completed");
    }
}
```

------
#### [ Node.js ]

```
for await (const event of response.body) {
        if (!session.isActive) {
          console.log(`Session ${sessionId} is no longer active, stopping response processing`);
          break;
        }
        if (event.chunk?.bytes) {
          try {
            this.updateSessionActivity(sessionId);
            const textResponse = new TextDecoder().decode(event.chunk.bytes);

            try {
              const jsonResponse = JSON.parse(textResponse);
              if (jsonResponse.event?.contentStart) {
                this.dispatchEvent(sessionId, 'contentStart', jsonResponse.event.contentStart);
              } else if (jsonResponse.event?.textOutput) {
                this.dispatchEvent(sessionId, 'textOutput', jsonResponse.event.textOutput);
              } else if (jsonResponse.event?.audioOutput) {
                this.dispatchEvent(sessionId, 'audioOutput', jsonResponse.event.audioOutput);
              } else if (jsonResponse.event?.toolUse) {
                this.dispatchEvent(sessionId, 'toolUse', jsonResponse.event.toolUse);

                // Store tool use information for later
                session.toolUseContent = jsonResponse.event.toolUse;
                session.toolUseId = jsonResponse.event.toolUse.toolUseId;
                session.toolName = jsonResponse.event.toolUse.toolName;
              } else if (jsonResponse.event?.contentEnd &&
                jsonResponse.event?.contentEnd?.type === 'TOOL') {

                // Process tool use
                console.log(`Processing tool use for session ${sessionId}`);
                this.dispatchEvent(sessionId, 'toolEnd', {
                  toolUseContent: session.toolUseContent,
                  toolUseId: session.toolUseId,
                  toolName: session.toolName
                });

                console.log("calling tooluse");
                console.log("tool use content : ", session.toolUseContent)
                // function calling
                const toolResult = await this.processToolUse(session.toolName, session.toolUseContent);

                // Send tool result
                this.sendToolResult(sessionId, session.toolUseId, toolResult);

                // Also dispatch event about tool result
                this.dispatchEvent(sessionId, 'toolResult', {
                  toolUseId: session.toolUseId,
                  result: toolResult
                });
              } else {
                // Handle other events
                const eventKeys = Object.keys(jsonResponse.event || {});
                console.log(`Event keys for session ${sessionId}: `, eventKeys)
                console.log(`Handling other events`)
                if (eventKeys.length > 0) {
                  this.dispatchEvent(sessionId, eventKeys[0], jsonResponse.event);
                } else if (Object.keys(jsonResponse).length > 0) {
                  this.dispatchEvent(sessionId, 'unknown', jsonResponse);
                }
              }
            } catch (e) {
              console.log(`Raw text response for session ${sessionId}(parse error): `, textResponse);
            }
          } catch (e) {
            console.error(`Error processing response chunk for session ${sessionId}: `, e);
          }
        } else if (event.modelStreamErrorException) {
          console.error(`Model stream error for session ${sessionId}: `, event.modelStreamErrorException);
          this.dispatchEvent(sessionId, 'error', {
            type: 'modelStreamErrorException',
            details: event.modelStreamErrorException
          });
        } else if (event.internalServerException) {
          console.error(`Internal server error for session ${sessionId}: `, event.internalServerException);
          this.dispatchEvent(sessionId, 'error', {
            type: 'internalServerException',
            details: event.internalServerException
          });
        }
      }
```

------

## Membuat permintaan
<a name="create-request"></a>

Contoh berikut dapat digunakan untuk membuat permintaan dengan API dua arah.

------
#### [ Python ]

```
self.stream_response = await self.bedrock_client.invoke_model_with_bidirectional_stream(
                InvokeModelWithBidirectionalStreamInput(model_id="amazon.nova-sonic-v1:0")
            )
```

------
#### [ Java ]

```
InvokeModelWithBidirectionalStreamRequest request = 
   InvokeModelWithBidirectionalStreamRequest.builder()
   .modelId("amazon.nova-sonic-v1:0")
   .build();
```

------
#### [ Node.js ]

```
const request = new InvokeModelWithBidirectionalStreamCommand({
            modelId: "amazon.nova-sonic-v1:0",
            body: generateOrderedStream(), //initial request
        });
```

------

## Memulai permintaan
<a name="initiate-request"></a>

Contoh berikut dapat digunakan untuk memulai permintaan dengan API dua arah.

------
#### [ Python ]

```
    START_SESSION_EVENT = '''{
        "event": {
            "sessionStart": {
            "inferenceConfiguration": {
                "maxTokens": 1024,
                "topP": 0.9,
                "temperature": 0.7
                }
            }
        }
    }'''
    
    event = InvokeModelWithBidirectionalStreamInputChunk(
            value=BidirectionalInputPayloadPart(bytes_=START_SESSION_EVENT.encode('utf-8'))
    )  
    try:
        await self.stream_response.input_stream.send(event)
    except Exception as e:
        print(f"Error sending event: {str(e)}")
```

------
#### [ Java ]

```
// Create ReplayProcessor with time-based expiry (cleans up messages after 1 minute)
ReplayProcessor<InvokeModelWithBidirectionalStreamInput> publisher = ReplayProcessor.createWithTime(
                1, TimeUnit.MINUTES, Schedulers.io()
);

// Create response handler
ResponseHandler responseHandler = new ResponseHandler();

// Initiate bidirectional stream
CompletableFuture<Void> completableFuture = client.invokeModelWithBidirectionalStream(
    request, publisher, responseHandler);

// Handle completion and errors properly
completableFuture.exceptionally(throwable -> {
    publisher.onError(throwable);
    return null;
});

completableFuture.thenApply(result -> {
    publisher.onComplete();
    return result;
});

// Send session start event
String sessionStartJson = """
{
  "event": {
    "sessionStart": {
      "inferenceConfiguration": {
        "maxTokens": 1024,
        "topP": 0.9,
        "temperature": 0.7
      }
    }
  }
}""";

publisher.onNext(
    InvokeModelWithBidirectionalStreamInput.chunkBuilder()
        .bytes(SdkBytes.fromUtf8String(sessionStartJson))
        .build()
);
```

------
#### [ Node.js ]

```
const command = new InvokeModelWithBidirectionalStreamCommand({
        modelId: "amazon.nova-sonic-v1:0",
        body: generateChunks(),
    });
async function* generateChunks() {
        // Send initialization events
        for (const event of initEvents) {
            const eventJson = JSON.stringify(event);
            console.log(`Sending event: ${eventJson.substring(0, 50)}...`);
            yield {
                chunk: {
                    bytes: textEncoder.encode(eventJson),
                },
            };
            await new Promise(resolve => setTimeout(resolve, 30));
        }
}
const initEvents = [
        {
            event: {
                sessionStart: {
                    inferenceConfiguration: {
                        maxTokens: 1024,
                        topP: 0.9,
                        temperature: 0.7
                    }
                }
            }
        },
        {
        ...
        }
];
```

------