

# 스피치 투 스피치 예
<a name="s2s-example"></a>

**참고**  
이 설명서는 Amazon Nova 버전 1용입니다. Amazon Nova 2 Sonic 가이드는 [시작하기](https://docs.aws.amazon.com/nova/latest/nova2-userguide/sonic-getting-started.html)를 참조하세요.

이 예제에서는 Amazon Nova Sonic 모델을 사용하여 간단한 실시간 오디오 스트리밍 애플리케이션을 구현하는 방법을 단계별로 설명합니다. 이 간소화된 버전은 Amazon Nova Sonic 모델을 사용하여 오디오 대화를 생성하는 데 필요한 핵심 기능을 보여줍니다.

[Amazon Nova 샘플 GitHub 리포지토리](https://github.com/aws-samples/amazon-nova-samples/blob/main/speech-to-speech/sample-codes/console-python/nova_sonic_simple.py)에서 다음 예제에 액세스할 수 있습니다.

1. 

**가져오기 및 구성 설명**

   이 섹션에서는 필요한 라이브러리를 가져오고 오디오 구성 파라미터를 설정합니다.
   + `asyncio`: 비동기 프로그래밍용
   + `base64`: 오디오 데이터 인코딩 및 디코딩용
   + `pyaudio`: 오디오 캡처 및 재생용
   + 스트리밍을 위한 Amazon Bedrock SDK 구성 요소
   + 오디오 상수는 오디오 캡처 형식(16kHz 샘플 속도, 모노 채널)을 정의함

   ```
   import os
   import asyncio
   import base64
   import json
   import uuid
   import pyaudio
   from aws_sdk_bedrock_runtime.client import BedrockRuntimeClient, InvokeModelWithBidirectionalStreamOperationInput
   from aws_sdk_bedrock_runtime.models import InvokeModelWithBidirectionalStreamInputChunk, BidirectionalInputPayloadPart
   from aws_sdk_bedrock_runtime.config import Config, HTTPAuthSchemeResolver, SigV4AuthScheme
   from smithy_aws_core.credentials_resolvers.environment import EnvironmentCredentialsResolver
   
   # Audio configuration
   INPUT_SAMPLE_RATE = 16000
   OUTPUT_SAMPLE_RATE = 24000
   CHANNELS = 1
   FORMAT = pyaudio.paInt16
   CHUNK_SIZE = 1024
   ```

1. 

**`SimpleNovaSonic` 클래스 정의**

   `SimpleNovaSonic` 클래스는 Amazon Nova Sonic 상호 작용을 처리하는 기본 클래스입니다.
   + `model_id`: Amazon Nova Sonic 모델 ID(`amazon.nova-sonic-v1:0`)
   + `region`: AWS 리전, 기본값 `us-east-1`
   + 프롬프트 및 콘텐츠 추적을 위한 고유 ID
   + 오디오 재생을 위한 비동기 대기열

   ```
   class SimpleNovaSonic:
       def __init__(self, model_id='amazon.nova-sonic-v1:0', region='us-east-1'):
           self.model_id = model_id
           self.region = region
           self.client = None
           self.stream = None
           self.response = None
           self.is_active = False
           self.prompt_name = str(uuid.uuid4())
           self.content_name = str(uuid.uuid4())
           self.audio_content_name = str(uuid.uuid4())
           self.audio_queue = asyncio.Queue()
           self.display_assistant_text = False
   ```

1. 

**클라이언트 초기화**

   이 메서드는 다음으로 Amazon Bedrock 클라이언트를 구성합니다.
   + 지정된 리전에 적합한 엔드포인트
   + AWS 자격 증명에 환경 변수를 사용한 인증 정보
   + AWS API 직접 호출을 위한 SigV4 인증 체계

   ```
       def _initialize_client(self):
           """Initialize the Bedrock client."""
           config = Config(
               endpoint_uri=f"https://bedrock-runtime.{self.region}.amazonaws.com",
               region=self.region,
               aws_credentials_identity_resolver=EnvironmentCredentialsResolver(),
               http_auth_scheme_resolver=HTTPAuthSchemeResolver(),
               http_auth_schemes={"aws.auth#sigv4": SigV4AuthScheme()}
           )
           self.client = BedrockRuntimeClient(config=config)
   ```

1. 

**이벤트 처리**

   이 어시스턴트 메서드는 Amazon Nova Sonic 모델과의 모든 통신에 사용되는 양방향 스트림으로 JSON 이벤트를 전송합니다.

   ```
       async def send_event(self, event_json):
           """Send an event to the stream."""
           event = InvokeModelWithBidirectionalStreamInputChunk(
               value=BidirectionalInputPayloadPart(bytes_=event_json.encode('utf-8'))
           )
           await self.stream.input_stream.send(event)
   ```

1. 

**세션 시작**

   이 메서드는 세션을 시작하고 오디오 스트리밍을 시작하도록 나머지 이벤트를 설정합니다. 이러한 이벤트는 동일한 순서로 전송해야 합니다.

   ```
       async def start_session(self):
           """Start a new session with Nova Sonic."""
           if not self.client:
               self._initialize_client()
               
           # Initialize the stream
           self.stream = await self.client.invoke_model_with_bidirectional_stream(
               InvokeModelWithBidirectionalStreamOperationInput(model_id=self.model_id)
           )
           self.is_active = True
           
           # Send session start event
           session_start = '''
           {
             "event": {
               "sessionStart": {
                 "inferenceConfiguration": {
                   "maxTokens": 1024,
                   "topP": 0.9,
                   "temperature": 0.7
                 }
               }
             }
           }
           '''
           await self.send_event(session_start)
           
           # Send prompt start event
           prompt_start = f'''
           {{
             "event": {{
               "promptStart": {{
                 "promptName": "{self.prompt_name}",
                 "textOutputConfiguration": {{
                   "mediaType": "text/plain"
                 }},
                 "audioOutputConfiguration": {{
                   "mediaType": "audio/lpcm",
                   "sampleRateHertz": 24000,
                   "sampleSizeBits": 16,
                   "channelCount": 1,
                   "voiceId": "matthew",
                   "encoding": "base64",
                   "audioType": "SPEECH"
                 }}
               }}
             }}
           }}
           '''
           await self.send_event(prompt_start)
           
           # Send system prompt
           text_content_start = f'''
           {{
               "event": {{
                   "contentStart": {{
                       "promptName": "{self.prompt_name}",
                       "contentName": "{self.content_name}",
                       "type": "TEXT",
                       "interactive": true,
                       "role": "SYSTEM",
                       "textInputConfiguration": {{
                           "mediaType": "text/plain"
                       }}
                   }}
               }}
           }}
           '''
           await self.send_event(text_content_start)
           
           system_prompt = "You are a friendly assistant. The user and you will engage in a spoken dialog " \
               "exchanging the transcripts of a natural real-time conversation. Keep your responses short, " \
               "generally two or three sentences for chatty scenarios."
           
   
   
           text_input = f'''
           {{
               "event": {{
                   "textInput": {{
                       "promptName": "{self.prompt_name}",
                       "contentName": "{self.content_name}",
                       "content": "{system_prompt}"
                   }}
               }}
           }}
           '''
           await self.send_event(text_input)
           
           text_content_end = f'''
           {{
               "event": {{
                   "contentEnd": {{
                       "promptName": "{self.prompt_name}",
                       "contentName": "{self.content_name}"
                   }}
               }}
           }}
           '''
           await self.send_event(text_content_end)
           
           # Start processing responses
           self.response = asyncio.create_task(self._process_responses())
   ```

1. 

**오디오 입력 처리**

   다음 메서드는 오디오 입력 수명 주기를 처리합니다.
   + `start_audio_input`: 오디오 입력 스트림을 구성하고 시작합니다.
   + `send_audio_chunk`: 오디오 청크를 인코딩하고 모델로 전송합니다.
   + `end_audio_input`: 오디오 입력 스트림을 올바르게 닫습니다.

   ```
      async def start_audio_input(self):
           """Start audio input stream."""
           audio_content_start = f'''
           {{
               "event": {{
                   "contentStart": {{
                       "promptName": "{self.prompt_name}",
                       "contentName": "{self.audio_content_name}",
                       "type": "AUDIO",
                       "interactive": true,
                       "role": "USER",
                       "audioInputConfiguration": {{
                           "mediaType": "audio/lpcm",
                           "sampleRateHertz": 16000,
                           "sampleSizeBits": 16,
                           "channelCount": 1,
                           "audioType": "SPEECH",
                           "encoding": "base64"
                       }}
                   }}
               }}
           }}
           '''
           await self.send_event(audio_content_start)
       
       async def send_audio_chunk(self, audio_bytes):
           """Send an audio chunk to the stream."""
           if not self.is_active:
               return
               
           blob = base64.b64encode(audio_bytes)
           audio_event = f'''
           {{
               "event": {{
                   "audioInput": {{
                       "promptName": "{self.prompt_name}",
                       "contentName": "{self.audio_content_name}",
                       "content": "{blob.decode('utf-8')}"
                   }}
               }}
           }}
           '''
           await self.send_event(audio_event)
       
       async def end_audio_input(self):
           """End audio input stream."""
           audio_content_end = f'''
           {{
               "event": {{
                   "contentEnd": {{
                       "promptName": "{self.prompt_name}",
                       "contentName": "{self.audio_content_name}"
                   }}
               }}
           }}
           '''
           await self.send_event(audio_content_end)
   ```

1. 

**세션 끝내기**

   이 메서드는 다음을 수행하여 세션을 올바르게 닫습니다.
   + `promptEnd` 이벤트 전송
   + `sessionEnd` 이벤트 전송
   + 입력 스트림 닫기

   ```
       async def end_session(self):
           """End the session."""
           if not self.is_active:
               return
               
           prompt_end = f'''
           {{
               "event": {{
                   "promptEnd": {{
                       "promptName": "{self.prompt_name}"
                   }}
               }}
           }}
           '''
           await self.send_event(prompt_end)
           
           session_end = '''
           {
               "event": {
                   "sessionEnd": {}
               }
           }
           '''
           await self.send_event(session_end)
           # close the stream
           await self.stream.input_stream.close()
   ```

1. 

**응답 처리**

   이 메서드는 모델의 응답을 지속적으로 처리하고 다음을 수행합니다.
   + 스트림의 출력을 기다립니다.
   + JSON 응답을 구문 분석합니다.
   + 자동 음성 인식과 트랜스크립션으로 콘솔에 인쇄하여 텍스트 출력을 처리합니다.
   + 디코딩하고 재생을 위해 대기열에 추가하여 오디오 출력을 처리합니다.

   ```
       async def _process_responses(self):
           """Process responses from the stream."""
           try:
               while self.is_active:
                   output = await self.stream.await_output()
                   result = await output[1].receive()
                   
                   if result.value and result.value.bytes_:
                       response_data = result.value.bytes_.decode('utf-8')
                       json_data = json.loads(response_data)
                       
                       if 'event' in json_data:
                           # Handle content start event
                           if 'contentStart' in json_data['event']:
                               content_start = json_data['event']['contentStart'] 
                               # set role
                               self.role = content_start['role']
                               # Check for speculative content
                               if 'additionalModelFields' in content_start:
                                   additional_fields = json.loads(content_start['additionalModelFields'])
                                   if additional_fields.get('generationStage') == 'SPECULATIVE':
                                       self.display_assistant_text = True
                                   else:
                                       self.display_assistant_text = False
                                   
                           # Handle text output event
                           elif 'textOutput' in json_data['event']:
                               text = json_data['event']['textOutput']['content']    
                              
                               if (self.role == "ASSISTANT" and self.display_assistant_text):
                                   print(f"Assistant: {text}")
                               elif self.role == "USER":
                                   print(f"User: {text}")
                           
                           # Handle audio output
                           elif 'audioOutput' in json_data['event']:
                               audio_content = json_data['event']['audioOutput']['content']
                               audio_bytes = base64.b64decode(audio_content)
                               await self.audio_queue.put(audio_bytes)
           except Exception as e:
               print(f"Error processing responses: {e}")
   ```

1. 

**오디오 재생**

   이 방법은 다음 태스크를 수행합니다.
   + `PyAudio` 입력 스트림 초기화
   + 대기열에서 지속적으로 오디오 데이터 검색
   + 스피커를 통해 오디오를 재생합니다.
   + 완료되면 리소스를 적절하게 정리합니다.

   ```
      async def play_audio(self):
           """Play audio responses."""
           p = pyaudio.PyAudio()
           stream = p.open(
               format=FORMAT,
               channels=CHANNELS,
               rate=OUTPUT_SAMPLE_RATE,
               output=True
           )
           
           try:
               while self.is_active:
                   audio_data = await self.audio_queue.get()
                   stream.write(audio_data)
           except Exception as e:
               print(f"Error playing audio: {e}")
           finally:
               stream.stop_stream()
               stream.close()
               p.terminate()
   ```

1. 

**오디오 캡처**

   이 방법은 다음 태스크를 수행합니다.
   + `PyAudio` 출력 스트림 초기화
   + 오디오 입력 세션 시작
   + 마이크에서 지속적으로 오디오 청크 캡처
   + Amazon Nova Sonic 모델로 각 청크 전송
   + 완료 시 적절하게 리소스 정리

   ```
       async def capture_audio(self):
           """Capture audio from microphone and send to Nova Sonic."""
           p = pyaudio.PyAudio()
           stream = p.open(
               format=FORMAT,
               channels=CHANNELS,
               rate=INPUT_SAMPLE_RATE,
               input=True,
               frames_per_buffer=CHUNK_SIZE
           )
           
           print("Starting audio capture. Speak into your microphone...")
           print("Press Enter to stop...")
           
           await self.start_audio_input()
           
           try:
               while self.is_active:
                   audio_data = stream.read(CHUNK_SIZE, exception_on_overflow=False)
                   await self.send_audio_chunk(audio_data)
                   await asyncio.sleep(0.01)
           except Exception as e:
               print(f"Error capturing audio: {e}")
           finally:
               stream.stop_stream()
               stream.close()
               p.terminate()
               print("Audio capture stopped.")
               await self.end_audio_input()
   ```

1. 

**Main 함수 실행**

   Main 함수는 다음을 수행하여 전체 프로세스를 오케스트레이션합니다.
   + Amazon Nova Sonic 클라이언트 생성
   + 세션 시작
   + 오디오 재생 및 캡처를 위한 동시 태스크 생성
   + 사용자가 **Enter** 키를 눌러 중지할 때까지 대기
   + 올바르게 세션 끝내기 및 태스크 정리

   ```
   async def main():
       # Create Nova Sonic client
       nova_client = SimpleNovaSonic()
       
       # Start session
       await nova_client.start_session()
       
       # Start audio playback task
       playback_task = asyncio.create_task(nova_client.play_audio())
       
       # Start audio capture task
       capture_task = asyncio.create_task(nova_client.capture_audio())
       
       # Wait for user to press Enter to stop
       await asyncio.get_event_loop().run_in_executor(None, input)
       
       # End session
       nova_client.is_active = False
       
       # First cancel the tasks
       tasks = []
       if not playback_task.done():
           tasks.append(playback_task)
       if not capture_task.done():
           tasks.append(capture_task)
       for task in tasks:
           task.cancel()
       if tasks:
           await asyncio.gather(*tasks, return_exceptions=True)
       
       # cancel the response task
       if nova_client.response and not nova_client.response.done():
           nova_client.response.cancel()
       
       await nova_client.end_session()
       print("Session ended")
   
   if __name__ == "__main__":
       # Set AWS credentials if not using environment variables
       # os.environ['AWS_ACCESS_KEY_ID'] = "your-access-key"
       # os.environ['AWS_SECRET_ACCESS_KEY'] = "your-secret-key"
       # os.environ['AWS_DEFAULT_REGION'] = "us-east-1"
   
       asyncio.run(main())
   ```