

# Ejemplo de voz a voz
<a name="s2s-example"></a>

**nota**  
Esta documentación corresponde a la versión 1 de Amazon Nova. Para ver la guía de Amazon Nova 2 Sonic, consulte [Getting started](https://docs.aws.amazon.com/nova/latest/nova2-userguide/sonic-getting-started.html).

En este ejemplo, se brinda una explicación paso a paso de cómo implementar una aplicación sencilla de transmisión de audio en tiempo real mediante el modelo Amazon Nova Sonic. Esta versión simplificada demuestra la funcionalidad básica necesaria para crear una conversación de audio con el modelo Amazon Nova Sonic.

Puede acceder al siguiente ejemplo en nuestro [repositorio de muestras de Amazon Nova en GitHub](https://github.com/aws-samples/amazon-nova-samples/blob/main/speech-to-speech/sample-codes/console-python/nova_sonic_simple.py).

1. 

**Establecimiento de las importaciones y la configuración**

   En esta sección, se importan las bibliotecas necesarias y se establecen los parámetros de configuración de audio:
   + `asyncio`: para la programación asíncrona.
   + `base64`: para la codificación y decodificación de datos de audio.
   + `pyaudio`: para la captura y reproducción de audio.
   + Componentes del SDK de Amazon Bedrock para la transmisión
   + Las constantes de audio definen el formato de captura de audio (frecuencia de muestreo de 16 kHz, canal mono)

   ```
   import os
   import asyncio
   import base64
   import json
   import uuid
   import pyaudio
   from aws_sdk_bedrock_runtime.client import BedrockRuntimeClient, InvokeModelWithBidirectionalStreamOperationInput
   from aws_sdk_bedrock_runtime.models import InvokeModelWithBidirectionalStreamInputChunk, BidirectionalInputPayloadPart
   from aws_sdk_bedrock_runtime.config import Config, HTTPAuthSchemeResolver, SigV4AuthScheme
   from smithy_aws_core.credentials_resolvers.environment import EnvironmentCredentialsResolver
   
   # Audio configuration
   INPUT_SAMPLE_RATE = 16000
   OUTPUT_SAMPLE_RATE = 24000
   CHANNELS = 1
   FORMAT = pyaudio.paInt16
   CHUNK_SIZE = 1024
   ```

1. 

**Definición de la clase `SimpleNovaSonic`**

   La clase `SimpleNovaSonic` es la clase principal que gestiona la interacción de Amazon Nova Sonic:
   + `model_id`: el ID de modelo de Amazon Nova Sonic (`amazon.nova-sonic-v1:0`).
   + `region`: la Región de AWS; la predeterminada es `us-east-1`.
   + ID únicos para el seguimiento de las peticiones y del contenido
   + Una cola asíncrona para la reproducción de audio

   ```
   class SimpleNovaSonic:
       def __init__(self, model_id='amazon.nova-sonic-v1:0', region='us-east-1'):
           self.model_id = model_id
           self.region = region
           self.client = None
           self.stream = None
           self.response = None
           self.is_active = False
           self.prompt_name = str(uuid.uuid4())
           self.content_name = str(uuid.uuid4())
           self.audio_content_name = str(uuid.uuid4())
           self.audio_queue = asyncio.Queue()
           self.display_assistant_text = False
   ```

1. 

**Inicialización del cliente**

   Este método configura el cliente de Amazon Bedrock con lo siguiente:
   + El punto de conexión adecuado para la región especificada.
   + Información de autenticación mediante variables de entorno para las credenciales de AWS.
   + El esquema de autenticación SigV4 para las llamadas a la API de AWS.

   ```
       def _initialize_client(self):
           """Initialize the Bedrock client."""
           config = Config(
               endpoint_uri=f"https://bedrock-runtime.{self.region}.amazonaws.com",
               region=self.region,
               aws_credentials_identity_resolver=EnvironmentCredentialsResolver(),
               http_auth_scheme_resolver=HTTPAuthSchemeResolver(),
               http_auth_schemes={"aws.auth#sigv4": SigV4AuthScheme()}
           )
           self.client = BedrockRuntimeClient(config=config)
   ```

1. 

**Gestión de eventos**

   Este método auxiliar envía los eventos JSON a la transmisión bidireccional, que se utiliza para todas las comunicaciones con el modelo Amazon Nova Sonic:

   ```
       async def send_event(self, event_json):
           """Send an event to the stream."""
           event = InvokeModelWithBidirectionalStreamInputChunk(
               value=BidirectionalInputPayloadPart(bytes_=event_json.encode('utf-8'))
           )
           await self.stream.input_stream.send(event)
   ```

1. 

**Inicio de sesión**

   Este método inicia la sesión y configura los eventos restantes para iniciar la transmisión de audio. Estos eventos deben enviarse en el mismo orden.

   ```
       async def start_session(self):
           """Start a new session with Nova Sonic."""
           if not self.client:
               self._initialize_client()
               
           # Initialize the stream
           self.stream = await self.client.invoke_model_with_bidirectional_stream(
               InvokeModelWithBidirectionalStreamOperationInput(model_id=self.model_id)
           )
           self.is_active = True
           
           # Send session start event
           session_start = '''
           {
             "event": {
               "sessionStart": {
                 "inferenceConfiguration": {
                   "maxTokens": 1024,
                   "topP": 0.9,
                   "temperature": 0.7
                 }
               }
             }
           }
           '''
           await self.send_event(session_start)
           
           # Send prompt start event
           prompt_start = f'''
           {{
             "event": {{
               "promptStart": {{
                 "promptName": "{self.prompt_name}",
                 "textOutputConfiguration": {{
                   "mediaType": "text/plain"
                 }},
                 "audioOutputConfiguration": {{
                   "mediaType": "audio/lpcm",
                   "sampleRateHertz": 24000,
                   "sampleSizeBits": 16,
                   "channelCount": 1,
                   "voiceId": "matthew",
                   "encoding": "base64",
                   "audioType": "SPEECH"
                 }}
               }}
             }}
           }}
           '''
           await self.send_event(prompt_start)
           
           # Send system prompt
           text_content_start = f'''
           {{
               "event": {{
                   "contentStart": {{
                       "promptName": "{self.prompt_name}",
                       "contentName": "{self.content_name}",
                       "type": "TEXT",
                       "interactive": true,
                       "role": "SYSTEM",
                       "textInputConfiguration": {{
                           "mediaType": "text/plain"
                       }}
                   }}
               }}
           }}
           '''
           await self.send_event(text_content_start)
           
           system_prompt = "You are a friendly assistant. The user and you will engage in a spoken dialog " \
               "exchanging the transcripts of a natural real-time conversation. Keep your responses short, " \
               "generally two or three sentences for chatty scenarios."
           
   
   
           text_input = f'''
           {{
               "event": {{
                   "textInput": {{
                       "promptName": "{self.prompt_name}",
                       "contentName": "{self.content_name}",
                       "content": "{system_prompt}"
                   }}
               }}
           }}
           '''
           await self.send_event(text_input)
           
           text_content_end = f'''
           {{
               "event": {{
                   "contentEnd": {{
                       "promptName": "{self.prompt_name}",
                       "contentName": "{self.content_name}"
                   }}
               }}
           }}
           '''
           await self.send_event(text_content_end)
           
           # Start processing responses
           self.response = asyncio.create_task(self._process_responses())
   ```

1. 

**Gestión de la entrada de audio**

   Estos métodos gestionan el ciclo de vida de la entrada de audio:
   + `start_audio_input`: configura e inicia el flujo de entrada de audio.
   + `send_audio_chunk`: codifica y envía fragmentos de audio al modelo.
   + `end_audio_input`: cierra correctamente el flujo de entrada de audio.

   ```
      async def start_audio_input(self):
           """Start audio input stream."""
           audio_content_start = f'''
           {{
               "event": {{
                   "contentStart": {{
                       "promptName": "{self.prompt_name}",
                       "contentName": "{self.audio_content_name}",
                       "type": "AUDIO",
                       "interactive": true,
                       "role": "USER",
                       "audioInputConfiguration": {{
                           "mediaType": "audio/lpcm",
                           "sampleRateHertz": 16000,
                           "sampleSizeBits": 16,
                           "channelCount": 1,
                           "audioType": "SPEECH",
                           "encoding": "base64"
                       }}
                   }}
               }}
           }}
           '''
           await self.send_event(audio_content_start)
       
       async def send_audio_chunk(self, audio_bytes):
           """Send an audio chunk to the stream."""
           if not self.is_active:
               return
               
           blob = base64.b64encode(audio_bytes)
           audio_event = f'''
           {{
               "event": {{
                   "audioInput": {{
                       "promptName": "{self.prompt_name}",
                       "contentName": "{self.audio_content_name}",
                       "content": "{blob.decode('utf-8')}"
                   }}
               }}
           }}
           '''
           await self.send_event(audio_event)
       
       async def end_audio_input(self):
           """End audio input stream."""
           audio_content_end = f'''
           {{
               "event": {{
                   "contentEnd": {{
                       "promptName": "{self.prompt_name}",
                       "contentName": "{self.audio_content_name}"
                   }}
               }}
           }}
           '''
           await self.send_event(audio_content_end)
   ```

1. 

**Finalización de la sesión**

   Este método cierra correctamente la sesión mediante lo siguiente:
   + envío de un evento `promptEnd`,
   + envío de un evento `sessionEnd`,
   + cierre del flujo de entrada.

   ```
       async def end_session(self):
           """End the session."""
           if not self.is_active:
               return
               
           prompt_end = f'''
           {{
               "event": {{
                   "promptEnd": {{
                       "promptName": "{self.prompt_name}"
                   }}
               }}
           }}
           '''
           await self.send_event(prompt_end)
           
           session_end = '''
           {
               "event": {
                   "sessionEnd": {}
               }
           }
           '''
           await self.send_event(session_end)
           # close the stream
           await self.stream.input_stream.close()
   ```

1. 

**Tratamiento de respuestas**

   Este método procesa continuamente las respuestas del modelo y hace lo siguiente:
   + Espera la salida del flujo.
   + Analiza la respuesta JSON.
   + Gestiona la salida de texto imprimiéndola en la consola con reconocimiento de voz y transcripción automáticos.
   + Gestiona la salida de audio decodificándola y poniéndola en cola para su reproducción.

   ```
       async def _process_responses(self):
           """Process responses from the stream."""
           try:
               while self.is_active:
                   output = await self.stream.await_output()
                   result = await output[1].receive()
                   
                   if result.value and result.value.bytes_:
                       response_data = result.value.bytes_.decode('utf-8')
                       json_data = json.loads(response_data)
                       
                       if 'event' in json_data:
                           # Handle content start event
                           if 'contentStart' in json_data['event']:
                               content_start = json_data['event']['contentStart'] 
                               # set role
                               self.role = content_start['role']
                               # Check for speculative content
                               if 'additionalModelFields' in content_start:
                                   additional_fields = json.loads(content_start['additionalModelFields'])
                                   if additional_fields.get('generationStage') == 'SPECULATIVE':
                                       self.display_assistant_text = True
                                   else:
                                       self.display_assistant_text = False
                                   
                           # Handle text output event
                           elif 'textOutput' in json_data['event']:
                               text = json_data['event']['textOutput']['content']    
                              
                               if (self.role == "ASSISTANT" and self.display_assistant_text):
                                   print(f"Assistant: {text}")
                               elif self.role == "USER":
                                   print(f"User: {text}")
                           
                           # Handle audio output
                           elif 'audioOutput' in json_data['event']:
                               audio_content = json_data['event']['audioOutput']['content']
                               audio_bytes = base64.b64decode(audio_content)
                               await self.audio_queue.put(audio_bytes)
           except Exception as e:
               print(f"Error processing responses: {e}")
   ```

1. 

**Reproducción de audio**

   Este método realizará las siguientes tareas:
   + Inicializa un flujo de entrada `PyAudio`.
   + Recupera continuamente los datos de audio de la cola.
   + Reproduce el audio a través de los parlantes.
   + Limpia adecuadamente los recursos al terminar.

   ```
      async def play_audio(self):
           """Play audio responses."""
           p = pyaudio.PyAudio()
           stream = p.open(
               format=FORMAT,
               channels=CHANNELS,
               rate=OUTPUT_SAMPLE_RATE,
               output=True
           )
           
           try:
               while self.is_active:
                   audio_data = await self.audio_queue.get()
                   stream.write(audio_data)
           except Exception as e:
               print(f"Error playing audio: {e}")
           finally:
               stream.stop_stream()
               stream.close()
               p.terminate()
   ```

1. 

**Captura de audio**

   Este método realizará las siguientes tareas:
   + Inicializa un flujo de salida `PyAudio`.
   + Inicia la sesión de entrada de audio.
   + Captura fragmentos de audio del micrófono de forma continua.
   + Envía cada fragmento al modelo Amazon Nova Sonic.
   + Limpia adecuadamente los recursos al terminar.

   ```
       async def capture_audio(self):
           """Capture audio from microphone and send to Nova Sonic."""
           p = pyaudio.PyAudio()
           stream = p.open(
               format=FORMAT,
               channels=CHANNELS,
               rate=INPUT_SAMPLE_RATE,
               input=True,
               frames_per_buffer=CHUNK_SIZE
           )
           
           print("Starting audio capture. Speak into your microphone...")
           print("Press Enter to stop...")
           
           await self.start_audio_input()
           
           try:
               while self.is_active:
                   audio_data = stream.read(CHUNK_SIZE, exception_on_overflow=False)
                   await self.send_audio_chunk(audio_data)
                   await asyncio.sleep(0.01)
           except Exception as e:
               print(f"Error capturing audio: {e}")
           finally:
               stream.stop_stream()
               stream.close()
               p.terminate()
               print("Audio capture stopped.")
               await self.end_audio_input()
   ```

1. 

**Ejecución de la función principal**

   La función principal orquesta todo el proceso con las siguientes acciones:
   + Crea un cliente de Amazon Nova Sonic.
   + Inicia la sesión.
   + Crea tareas concurrentes para la reproducción y captura de audio.
   + Espera a que el usuario presione **Intro** para detenerse.
   + Finaliza correctamente la sesión y limpia las tareas.

   ```
   async def main():
       # Create Nova Sonic client
       nova_client = SimpleNovaSonic()
       
       # Start session
       await nova_client.start_session()
       
       # Start audio playback task
       playback_task = asyncio.create_task(nova_client.play_audio())
       
       # Start audio capture task
       capture_task = asyncio.create_task(nova_client.capture_audio())
       
       # Wait for user to press Enter to stop
       await asyncio.get_event_loop().run_in_executor(None, input)
       
       # End session
       nova_client.is_active = False
       
       # First cancel the tasks
       tasks = []
       if not playback_task.done():
           tasks.append(playback_task)
       if not capture_task.done():
           tasks.append(capture_task)
       for task in tasks:
           task.cancel()
       if tasks:
           await asyncio.gather(*tasks, return_exceptions=True)
       
       # cancel the response task
       if nova_client.response and not nova_client.response.done():
           nova_client.response.cancel()
       
       await nova_client.end_session()
       print("Session ended")
   
   if __name__ == "__main__":
       # Set AWS credentials if not using environment variables
       # os.environ['AWS_ACCESS_KEY_ID'] = "your-access-key"
       # os.environ['AWS_SECRET_ACCESS_KEY'] = "your-secret-key"
       # os.environ['AWS_DEFAULT_REGION'] = "us-east-1"
   
       asyncio.run(main())
   ```