

# 语音转语音示例
<a name="s2s-example"></a>

**注意**  
本文档适用于 Amazon Nova 版本 1。如需 Amazon Nova 2 Sonic 指南，请访问[开始使用](https://docs.aws.amazon.com/nova/latest/nova2-userguide/sonic-getting-started.html)。

本示例分步说明了如何使用 Amazon Nova Sonic 模型实现简单、实时的音频流应用程序。这个简化版本演示了使用 Amazon Nova Sonic 模型创建音频对话所需的核心功能。

我们的 [Amazon Nova 示例 GitHub 存储库](https://github.com/aws-samples/amazon-nova-samples/blob/main/speech-to-speech/sample-codes/console-python/nova_sonic_simple.py)中列出了以下示例。

1. 

**说明导入和配置**

   本节会导入必要的库并设置音频配置参数：
   + `asyncio`：用于异步编程
   + `base64`：用于对音频数据进行编码和解码
   + `pyaudio`：用于音频采集和播放
   + Amazon Bedrock SDK 流组件
   + 音频常量定义了音频采集的格式（16 kHz 采样率，单声道）

   ```
   import os
   import asyncio
   import base64
   import json
   import uuid
   import pyaudio
   from aws_sdk_bedrock_runtime.client import BedrockRuntimeClient, InvokeModelWithBidirectionalStreamOperationInput
   from aws_sdk_bedrock_runtime.models import InvokeModelWithBidirectionalStreamInputChunk, BidirectionalInputPayloadPart
   from aws_sdk_bedrock_runtime.config import Config, HTTPAuthSchemeResolver, SigV4AuthScheme
   from smithy_aws_core.credentials_resolvers.environment import EnvironmentCredentialsResolver
   
   # Audio configuration
   INPUT_SAMPLE_RATE = 16000
   OUTPUT_SAMPLE_RATE = 24000
   CHANNELS = 1
   FORMAT = pyaudio.paInt16
   CHUNK_SIZE = 1024
   ```

1. 

**定义 `SimpleNovaSonic` 类**

   `SimpleNovaSonic` 类是处理 Amazon Nova Sonic 交互的主类：
   + `model_id`：Amazon Nova Sonic 模型 ID (`amazon.nova-sonic-v1:0`)
   + `region`：AWS 区域，默认值为 `us-east-1`
   + 用于提示和内容跟踪的唯一 ID
   + 用于音频播放的异步队列

   ```
   class SimpleNovaSonic:
       def __init__(self, model_id='amazon.nova-sonic-v1:0', region='us-east-1'):
           self.model_id = model_id
           self.region = region
           self.client = None
           self.stream = None
           self.response = None
           self.is_active = False
           self.prompt_name = str(uuid.uuid4())
           self.content_name = str(uuid.uuid4())
           self.audio_content_name = str(uuid.uuid4())
           self.audio_queue = asyncio.Queue()
           self.display_assistant_text = False
   ```

1. 

**初始化客户端**

   此方法使用以下信息来配置 Amazon Bedrock 客户端：
   + 指定区域的相应端点
   + 使用环境变量作为 AWS 凭证的身份证信息
   + 适用于 AWS API 调用的 SigV4 身份证方案

   ```
       def _initialize_client(self):
           """Initialize the Bedrock client."""
           config = Config(
               endpoint_uri=f"https://bedrock-runtime.{self.region}.amazonaws.com",
               region=self.region,
               aws_credentials_identity_resolver=EnvironmentCredentialsResolver(),
               http_auth_scheme_resolver=HTTPAuthSchemeResolver(),
               http_auth_schemes={"aws.auth#sigv4": SigV4AuthScheme()}
           )
           self.client = BedrockRuntimeClient(config=config)
   ```

1. 

**处理事件**

   这个辅助方法将 JSON 事件发送到双向流，该流用于与 Amazon Nova Sonic 模型的所有通信：

   ```
       async def send_event(self, event_json):
           """Send an event to the stream."""
           event = InvokeModelWithBidirectionalStreamInputChunk(
               value=BidirectionalInputPayloadPart(bytes_=event_json.encode('utf-8'))
           )
           await self.stream.input_stream.send(event)
   ```

1. 

**启动会话**

   这个方法可启动会话并设置其余事件来启动音频流。这些事件需要按相同的顺序发送。

   ```
       async def start_session(self):
           """Start a new session with Nova Sonic."""
           if not self.client:
               self._initialize_client()
               
           # Initialize the stream
           self.stream = await self.client.invoke_model_with_bidirectional_stream(
               InvokeModelWithBidirectionalStreamOperationInput(model_id=self.model_id)
           )
           self.is_active = True
           
           # Send session start event
           session_start = '''
           {
             "event": {
               "sessionStart": {
                 "inferenceConfiguration": {
                   "maxTokens": 1024,
                   "topP": 0.9,
                   "temperature": 0.7
                 }
               }
             }
           }
           '''
           await self.send_event(session_start)
           
           # Send prompt start event
           prompt_start = f'''
           {{
             "event": {{
               "promptStart": {{
                 "promptName": "{self.prompt_name}",
                 "textOutputConfiguration": {{
                   "mediaType": "text/plain"
                 }},
                 "audioOutputConfiguration": {{
                   "mediaType": "audio/lpcm",
                   "sampleRateHertz": 24000,
                   "sampleSizeBits": 16,
                   "channelCount": 1,
                   "voiceId": "matthew",
                   "encoding": "base64",
                   "audioType": "SPEECH"
                 }}
               }}
             }}
           }}
           '''
           await self.send_event(prompt_start)
           
           # Send system prompt
           text_content_start = f'''
           {{
               "event": {{
                   "contentStart": {{
                       "promptName": "{self.prompt_name}",
                       "contentName": "{self.content_name}",
                       "type": "TEXT",
                       "interactive": true,
                       "role": "SYSTEM",
                       "textInputConfiguration": {{
                           "mediaType": "text/plain"
                       }}
                   }}
               }}
           }}
           '''
           await self.send_event(text_content_start)
           
           system_prompt = "You are a friendly assistant. The user and you will engage in a spoken dialog " \
               "exchanging the transcripts of a natural real-time conversation. Keep your responses short, " \
               "generally two or three sentences for chatty scenarios."
           
   
   
           text_input = f'''
           {{
               "event": {{
                   "textInput": {{
                       "promptName": "{self.prompt_name}",
                       "contentName": "{self.content_name}",
                       "content": "{system_prompt}"
                   }}
               }}
           }}
           '''
           await self.send_event(text_input)
           
           text_content_end = f'''
           {{
               "event": {{
                   "contentEnd": {{
                       "promptName": "{self.prompt_name}",
                       "contentName": "{self.content_name}"
                   }}
               }}
           }}
           '''
           await self.send_event(text_content_end)
           
           # Start processing responses
           self.response = asyncio.create_task(self._process_responses())
   ```

1. 

**处理音频输入**

   以下方法可处理音频输入生命周期：
   + `start_audio_input`：配置并启动音频输入流
   + `send_audio_chunk`：编码音频片段并发送给模型
   + `end_audio_input`：正确关闭音频输入流

   ```
      async def start_audio_input(self):
           """Start audio input stream."""
           audio_content_start = f'''
           {{
               "event": {{
                   "contentStart": {{
                       "promptName": "{self.prompt_name}",
                       "contentName": "{self.audio_content_name}",
                       "type": "AUDIO",
                       "interactive": true,
                       "role": "USER",
                       "audioInputConfiguration": {{
                           "mediaType": "audio/lpcm",
                           "sampleRateHertz": 16000,
                           "sampleSizeBits": 16,
                           "channelCount": 1,
                           "audioType": "SPEECH",
                           "encoding": "base64"
                       }}
                   }}
               }}
           }}
           '''
           await self.send_event(audio_content_start)
       
       async def send_audio_chunk(self, audio_bytes):
           """Send an audio chunk to the stream."""
           if not self.is_active:
               return
               
           blob = base64.b64encode(audio_bytes)
           audio_event = f'''
           {{
               "event": {{
                   "audioInput": {{
                       "promptName": "{self.prompt_name}",
                       "contentName": "{self.audio_content_name}",
                       "content": "{blob.decode('utf-8')}"
                   }}
               }}
           }}
           '''
           await self.send_event(audio_event)
       
       async def end_audio_input(self):
           """End audio input stream."""
           audio_content_end = f'''
           {{
               "event": {{
                   "contentEnd": {{
                       "promptName": "{self.prompt_name}",
                       "contentName": "{self.audio_content_name}"
                   }}
               }}
           }}
           '''
           await self.send_event(audio_content_end)
   ```

1. 

**结束会话**

   这个方法通过以下方式正确关闭会话：
   + 发送 `promptEnd` 事件
   + 发送 `sessionEnd` 事件
   + 关闭输入流

   ```
       async def end_session(self):
           """End the session."""
           if not self.is_active:
               return
               
           prompt_end = f'''
           {{
               "event": {{
                   "promptEnd": {{
                       "promptName": "{self.prompt_name}"
                   }}
               }}
           }}
           '''
           await self.send_event(prompt_end)
           
           session_end = '''
           {
               "event": {
                   "sessionEnd": {}
               }
           }
           '''
           await self.send_event(session_end)
           # close the stream
           await self.stream.input_stream.close()
   ```

1. 

**处理回复**

   此方法可持续处理来自模型的回复并执行以下操作：
   + 等待流的输出。
   + 解析 JSON 回复。
   + 通过打印到具有自动语音识别和转录功能的控制台来处理文本输出。
   + 通过解码和排队等候播放来处理音频输出。

   ```
       async def _process_responses(self):
           """Process responses from the stream."""
           try:
               while self.is_active:
                   output = await self.stream.await_output()
                   result = await output[1].receive()
                   
                   if result.value and result.value.bytes_:
                       response_data = result.value.bytes_.decode('utf-8')
                       json_data = json.loads(response_data)
                       
                       if 'event' in json_data:
                           # Handle content start event
                           if 'contentStart' in json_data['event']:
                               content_start = json_data['event']['contentStart'] 
                               # set role
                               self.role = content_start['role']
                               # Check for speculative content
                               if 'additionalModelFields' in content_start:
                                   additional_fields = json.loads(content_start['additionalModelFields'])
                                   if additional_fields.get('generationStage') == 'SPECULATIVE':
                                       self.display_assistant_text = True
                                   else:
                                       self.display_assistant_text = False
                                   
                           # Handle text output event
                           elif 'textOutput' in json_data['event']:
                               text = json_data['event']['textOutput']['content']    
                              
                               if (self.role == "ASSISTANT" and self.display_assistant_text):
                                   print(f"Assistant: {text}")
                               elif self.role == "USER":
                                   print(f"User: {text}")
                           
                           # Handle audio output
                           elif 'audioOutput' in json_data['event']:
                               audio_content = json_data['event']['audioOutput']['content']
                               audio_bytes = base64.b64decode(audio_content)
                               await self.audio_queue.put(audio_bytes)
           except Exception as e:
               print(f"Error processing responses: {e}")
   ```

1. 

**播放音频**

   这个方法将执行以下任务：
   + 初始化 `PyAudio` 输入流
   + 持续从队列中检索音频数据
   + 通过扬声器播放音频
   + 完成后正确清理资源

   ```
      async def play_audio(self):
           """Play audio responses."""
           p = pyaudio.PyAudio()
           stream = p.open(
               format=FORMAT,
               channels=CHANNELS,
               rate=OUTPUT_SAMPLE_RATE,
               output=True
           )
           
           try:
               while self.is_active:
                   audio_data = await self.audio_queue.get()
                   stream.write(audio_data)
           except Exception as e:
               print(f"Error playing audio: {e}")
           finally:
               stream.stop_stream()
               stream.close()
               p.terminate()
   ```

1. 

**捕获音频**

   这个方法将执行以下任务：
   + 初始化 `PyAudio` 输出流
   + 启动音频输入会话
   + 持续捕获来自麦克风的音频片段
   + 将每个片段发送到 Amazon Nova Sonic 模型
   + 完成后正确清理资源

   ```
       async def capture_audio(self):
           """Capture audio from microphone and send to Nova Sonic."""
           p = pyaudio.PyAudio()
           stream = p.open(
               format=FORMAT,
               channels=CHANNELS,
               rate=INPUT_SAMPLE_RATE,
               input=True,
               frames_per_buffer=CHUNK_SIZE
           )
           
           print("Starting audio capture. Speak into your microphone...")
           print("Press Enter to stop...")
           
           await self.start_audio_input()
           
           try:
               while self.is_active:
                   audio_data = stream.read(CHUNK_SIZE, exception_on_overflow=False)
                   await self.send_audio_chunk(audio_data)
                   await asyncio.sleep(0.01)
           except Exception as e:
               print(f"Error capturing audio: {e}")
           finally:
               stream.stop_stream()
               stream.close()
               p.terminate()
               print("Audio capture stopped.")
               await self.end_audio_input()
   ```

1. 

**运行主函数**

   主函数通过执行以下操作来协调整个过程：
   + 创建 Amazon Nova Sonic 客户端
   + 启动会话
   + 创建用于音频播放和采集的并发任务
   + 等待用户按 **Enter** 键停止
   + 正确结束会话并清理任务

   ```
   async def main():
       # Create Nova Sonic client
       nova_client = SimpleNovaSonic()
       
       # Start session
       await nova_client.start_session()
       
       # Start audio playback task
       playback_task = asyncio.create_task(nova_client.play_audio())
       
       # Start audio capture task
       capture_task = asyncio.create_task(nova_client.capture_audio())
       
       # Wait for user to press Enter to stop
       await asyncio.get_event_loop().run_in_executor(None, input)
       
       # End session
       nova_client.is_active = False
       
       # First cancel the tasks
       tasks = []
       if not playback_task.done():
           tasks.append(playback_task)
       if not capture_task.done():
           tasks.append(capture_task)
       for task in tasks:
           task.cancel()
       if tasks:
           await asyncio.gather(*tasks, return_exceptions=True)
       
       # cancel the response task
       if nova_client.response and not nova_client.response.done():
           nova_client.response.cancel()
       
       await nova_client.end_session()
       print("Session ended")
   
   if __name__ == "__main__":
       # Set AWS credentials if not using environment variables
       # os.environ['AWS_ACCESS_KEY_ID'] = "your-access-key"
       # os.environ['AWS_SECRET_ACCESS_KEY'] = "your-secret-key"
       # os.environ['AWS_DEFAULT_REGION'] = "us-east-1"
   
       asyncio.run(main())
   ```