

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 語音轉語音範例
<a name="s2s-example"></a>

**注意**  
本文件適用於 Amazon Nova 第 1 版。如需 Amazon Nova 2 Sonic 指南，請參閱[入門](https://docs.aws.amazon.com/nova/latest/nova2-userguide/sonic-getting-started.html)。

此範例逐步說明如何使用 Amazon Nova Sonic 模型實作簡單的即時音訊串流應用程式。此簡化版本示範透過 Amazon Nova Sonic 模型建立音訊對話所需的核心功能。

您可以在 [Amazon Nova 範例 GitHub 儲存庫](https://github.com/aws-samples/amazon-nova-samples/blob/main/speech-to-speech/sample-codes/console-python/nova_sonic_simple.py)中存取下列範例。

1. 

**陳述匯入和組態**

   本節匯入必要的程式庫並設定音訊組態參數：
   + `asyncio`：用於非同步程式設計
   + `base64`：用於音訊資料的編碼和解碼
   + `pyaudio`：用於音訊擷取和播放
   + 用於串流的 Amazon Bedrock SDK 元件
   + 音訊常數定義音訊擷取的格式 (16kHz 取樣率、單聲道)

   ```
   import os
   import asyncio
   import base64
   import json
   import uuid
   import pyaudio
   from aws_sdk_bedrock_runtime.client import BedrockRuntimeClient, InvokeModelWithBidirectionalStreamOperationInput
   from aws_sdk_bedrock_runtime.models import InvokeModelWithBidirectionalStreamInputChunk, BidirectionalInputPayloadPart
   from aws_sdk_bedrock_runtime.config import Config, HTTPAuthSchemeResolver, SigV4AuthScheme
   from smithy_aws_core.credentials_resolvers.environment import EnvironmentCredentialsResolver
   
   # Audio configuration
   INPUT_SAMPLE_RATE = 16000
   OUTPUT_SAMPLE_RATE = 24000
   CHANNELS = 1
   FORMAT = pyaudio.paInt16
   CHUNK_SIZE = 1024
   ```

1. 

**定義 `SimpleNovaSonic` 類別**

   `SimpleNovaSonic` 類別是處理 Amazon Nova Sonic 互動的主要類別：
   + `model_id`：Amazon Nova Sonic 模型 ID (`amazon.nova-sonic-v1:0`)
   + `region`： AWS 區域，預設值為 `us-east-1`
   + 用於提示詞和內容追蹤的唯一 ID
   + 用於音訊播放的非同步佇列

   ```
   class SimpleNovaSonic:
       def __init__(self, model_id='amazon.nova-sonic-v1:0', region='us-east-1'):
           self.model_id = model_id
           self.region = region
           self.client = None
           self.stream = None
           self.response = None
           self.is_active = False
           self.prompt_name = str(uuid.uuid4())
           self.content_name = str(uuid.uuid4())
           self.audio_content_name = str(uuid.uuid4())
           self.audio_queue = asyncio.Queue()
           self.display_assistant_text = False
   ```

1. 

**初始化用戶端**

   此方法透過下列項目設定 Amazon Bedrock 用戶端：
   + 指定區域的適當端點
   + 使用 AWS 憑證的環境變數進行身分驗證資訊
   +  AWS API 呼叫的 SigV4 身分驗證機制

   ```
       def _initialize_client(self):
           """Initialize the Bedrock client."""
           config = Config(
               endpoint_uri=f"https://bedrock-runtime.{self.region}.amazonaws.com",
               region=self.region,
               aws_credentials_identity_resolver=EnvironmentCredentialsResolver(),
               http_auth_scheme_resolver=HTTPAuthSchemeResolver(),
               http_auth_schemes={"aws.auth#sigv4": SigV4AuthScheme()}
           )
           self.client = BedrockRuntimeClient(config=config)
   ```

1. 

**處理事件**

   此協助程式方法會將 JSON 事件傳送至雙向串流，用於與 Amazon Nova Sonic 模型的所有通訊：

   ```
       async def send_event(self, event_json):
           """Send an event to the stream."""
           event = InvokeModelWithBidirectionalStreamInputChunk(
               value=BidirectionalInputPayloadPart(bytes_=event_json.encode('utf-8'))
           )
           await self.stream.input_stream.send(event)
   ```

1. 

**啟動工作階段**

   此方法會啟動工作階段，並設定其餘事件以開始音訊串流。這些事件必須以相同的順序傳送。

   ```
       async def start_session(self):
           """Start a new session with Nova Sonic."""
           if not self.client:
               self._initialize_client()
               
           # Initialize the stream
           self.stream = await self.client.invoke_model_with_bidirectional_stream(
               InvokeModelWithBidirectionalStreamOperationInput(model_id=self.model_id)
           )
           self.is_active = True
           
           # Send session start event
           session_start = '''
           {
             "event": {
               "sessionStart": {
                 "inferenceConfiguration": {
                   "maxTokens": 1024,
                   "topP": 0.9,
                   "temperature": 0.7
                 }
               }
             }
           }
           '''
           await self.send_event(session_start)
           
           # Send prompt start event
           prompt_start = f'''
           {{
             "event": {{
               "promptStart": {{
                 "promptName": "{self.prompt_name}",
                 "textOutputConfiguration": {{
                   "mediaType": "text/plain"
                 }},
                 "audioOutputConfiguration": {{
                   "mediaType": "audio/lpcm",
                   "sampleRateHertz": 24000,
                   "sampleSizeBits": 16,
                   "channelCount": 1,
                   "voiceId": "matthew",
                   "encoding": "base64",
                   "audioType": "SPEECH"
                 }}
               }}
             }}
           }}
           '''
           await self.send_event(prompt_start)
           
           # Send system prompt
           text_content_start = f'''
           {{
               "event": {{
                   "contentStart": {{
                       "promptName": "{self.prompt_name}",
                       "contentName": "{self.content_name}",
                       "type": "TEXT",
                       "interactive": true,
                       "role": "SYSTEM",
                       "textInputConfiguration": {{
                           "mediaType": "text/plain"
                       }}
                   }}
               }}
           }}
           '''
           await self.send_event(text_content_start)
           
           system_prompt = "You are a friendly assistant. The user and you will engage in a spoken dialog " \
               "exchanging the transcripts of a natural real-time conversation. Keep your responses short, " \
               "generally two or three sentences for chatty scenarios."
           
   
   
           text_input = f'''
           {{
               "event": {{
                   "textInput": {{
                       "promptName": "{self.prompt_name}",
                       "contentName": "{self.content_name}",
                       "content": "{system_prompt}"
                   }}
               }}
           }}
           '''
           await self.send_event(text_input)
           
           text_content_end = f'''
           {{
               "event": {{
                   "contentEnd": {{
                       "promptName": "{self.prompt_name}",
                       "contentName": "{self.content_name}"
                   }}
               }}
           }}
           '''
           await self.send_event(text_content_end)
           
           # Start processing responses
           self.response = asyncio.create_task(self._process_responses())
   ```

1. 

**處理音訊輸入**

   下列方法處理音訊輸入生命週期：
   + `start_audio_input`：設定和啟動音訊輸入串流
   + `send_audio_chunk`：編碼音訊區塊並將其傳送至模型
   + `end_audio_input`：正確關閉音訊輸入串流

   ```
      async def start_audio_input(self):
           """Start audio input stream."""
           audio_content_start = f'''
           {{
               "event": {{
                   "contentStart": {{
                       "promptName": "{self.prompt_name}",
                       "contentName": "{self.audio_content_name}",
                       "type": "AUDIO",
                       "interactive": true,
                       "role": "USER",
                       "audioInputConfiguration": {{
                           "mediaType": "audio/lpcm",
                           "sampleRateHertz": 16000,
                           "sampleSizeBits": 16,
                           "channelCount": 1,
                           "audioType": "SPEECH",
                           "encoding": "base64"
                       }}
                   }}
               }}
           }}
           '''
           await self.send_event(audio_content_start)
       
       async def send_audio_chunk(self, audio_bytes):
           """Send an audio chunk to the stream."""
           if not self.is_active:
               return
               
           blob = base64.b64encode(audio_bytes)
           audio_event = f'''
           {{
               "event": {{
                   "audioInput": {{
                       "promptName": "{self.prompt_name}",
                       "contentName": "{self.audio_content_name}",
                       "content": "{blob.decode('utf-8')}"
                   }}
               }}
           }}
           '''
           await self.send_event(audio_event)
       
       async def end_audio_input(self):
           """End audio input stream."""
           audio_content_end = f'''
           {{
               "event": {{
                   "contentEnd": {{
                       "promptName": "{self.prompt_name}",
                       "contentName": "{self.audio_content_name}"
                   }}
               }}
           }}
           '''
           await self.send_event(audio_content_end)
   ```

1. 

**結束工作階段**

   此方法透過下列動作正確關閉工作階段：
   + 傳送 `promptEnd` 事件
   + 傳送 `sessionEnd` 事件
   + 關閉輸入串流

   ```
       async def end_session(self):
           """End the session."""
           if not self.is_active:
               return
               
           prompt_end = f'''
           {{
               "event": {{
                   "promptEnd": {{
                       "promptName": "{self.prompt_name}"
                   }}
               }}
           }}
           '''
           await self.send_event(prompt_end)
           
           session_end = '''
           {
               "event": {
                   "sessionEnd": {}
               }
           }
           '''
           await self.send_event(session_end)
           # close the stream
           await self.stream.input_stream.close()
   ```

1. 

**處理回應**

   此方法會持續處理模型的回應，並執行下列動作：
   + 等待來自串流的輸出。
   + 解析 JSON 回應。
   + 透過使用自動語音辨識和轉錄列印到主控台來處理文字輸出。
   + 透過解碼和排入播放佇列來處理音訊輸出。

   ```
       async def _process_responses(self):
           """Process responses from the stream."""
           try:
               while self.is_active:
                   output = await self.stream.await_output()
                   result = await output[1].receive()
                   
                   if result.value and result.value.bytes_:
                       response_data = result.value.bytes_.decode('utf-8')
                       json_data = json.loads(response_data)
                       
                       if 'event' in json_data:
                           # Handle content start event
                           if 'contentStart' in json_data['event']:
                               content_start = json_data['event']['contentStart'] 
                               # set role
                               self.role = content_start['role']
                               # Check for speculative content
                               if 'additionalModelFields' in content_start:
                                   additional_fields = json.loads(content_start['additionalModelFields'])
                                   if additional_fields.get('generationStage') == 'SPECULATIVE':
                                       self.display_assistant_text = True
                                   else:
                                       self.display_assistant_text = False
                                   
                           # Handle text output event
                           elif 'textOutput' in json_data['event']:
                               text = json_data['event']['textOutput']['content']    
                              
                               if (self.role == "ASSISTANT" and self.display_assistant_text):
                                   print(f"Assistant: {text}")
                               elif self.role == "USER":
                                   print(f"User: {text}")
                           
                           # Handle audio output
                           elif 'audioOutput' in json_data['event']:
                               audio_content = json_data['event']['audioOutput']['content']
                               audio_bytes = base64.b64decode(audio_content)
                               await self.audio_queue.put(audio_bytes)
           except Exception as e:
               print(f"Error processing responses: {e}")
   ```

1. 

**播放音訊**

   此方法將執行下列任務：
   + 初始化 `PyAudio` 輸入串流
   + 持續從佇列擷取音訊資料
   + 透過揚聲器播放音訊
   + 完成後適當清理資源

   ```
      async def play_audio(self):
           """Play audio responses."""
           p = pyaudio.PyAudio()
           stream = p.open(
               format=FORMAT,
               channels=CHANNELS,
               rate=OUTPUT_SAMPLE_RATE,
               output=True
           )
           
           try:
               while self.is_active:
                   audio_data = await self.audio_queue.get()
                   stream.write(audio_data)
           except Exception as e:
               print(f"Error playing audio: {e}")
           finally:
               stream.stop_stream()
               stream.close()
               p.terminate()
   ```

1. 

**擷取音訊**

   此方法將執行下列任務：
   + 初始化 `PyAudio` 輸出串流
   + 啟動音訊輸入工作階段
   + 從麥克風持續擷取音訊區塊
   + 將每個區塊傳送至 Amazon Nova Sonic 模型
   + 完成後適當清理資源

   ```
       async def capture_audio(self):
           """Capture audio from microphone and send to Nova Sonic."""
           p = pyaudio.PyAudio()
           stream = p.open(
               format=FORMAT,
               channels=CHANNELS,
               rate=INPUT_SAMPLE_RATE,
               input=True,
               frames_per_buffer=CHUNK_SIZE
           )
           
           print("Starting audio capture. Speak into your microphone...")
           print("Press Enter to stop...")
           
           await self.start_audio_input()
           
           try:
               while self.is_active:
                   audio_data = stream.read(CHUNK_SIZE, exception_on_overflow=False)
                   await self.send_audio_chunk(audio_data)
                   await asyncio.sleep(0.01)
           except Exception as e:
               print(f"Error capturing audio: {e}")
           finally:
               stream.stop_stream()
               stream.close()
               p.terminate()
               print("Audio capture stopped.")
               await self.end_audio_input()
   ```

1. 

**執行主要函數**

   主要函數會透過執行下列動作來協調整個程序：
   + 建立 Amazon Nova Sonic 用戶端
   + 啟動工作階段
   + 建立音訊播放和擷取的並行任務
   + 等待使用者按 **Enter** 停止
   + 適當地結束工作階段並清理任務

   ```
   async def main():
       # Create Nova Sonic client
       nova_client = SimpleNovaSonic()
       
       # Start session
       await nova_client.start_session()
       
       # Start audio playback task
       playback_task = asyncio.create_task(nova_client.play_audio())
       
       # Start audio capture task
       capture_task = asyncio.create_task(nova_client.capture_audio())
       
       # Wait for user to press Enter to stop
       await asyncio.get_event_loop().run_in_executor(None, input)
       
       # End session
       nova_client.is_active = False
       
       # First cancel the tasks
       tasks = []
       if not playback_task.done():
           tasks.append(playback_task)
       if not capture_task.done():
           tasks.append(capture_task)
       for task in tasks:
           task.cancel()
       if tasks:
           await asyncio.gather(*tasks, return_exceptions=True)
       
       # cancel the response task
       if nova_client.response and not nova_client.response.done():
           nova_client.response.cancel()
       
       await nova_client.end_session()
       print("Session ended")
   
   if __name__ == "__main__":
       # Set AWS credentials if not using environment variables
       # os.environ['AWS_ACCESS_KEY_ID'] = "your-access-key"
       # os.environ['AWS_SECRET_ACCESS_KEY'] = "your-secret-key"
       # os.environ['AWS_DEFAULT_REGION'] = "us-east-1"
   
       asyncio.run(main())
   ```