View a markdown version of this page

Erste Schritte mit speech-to-speech - Amazon Nova

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Erste Schritte mit speech-to-speech

Die folgenden Abschnitte enthalten ein Beispiel und eine step-by-step Erklärung für die Implementierung einer einfachen Audio-Streaming-Anwendung in Echtzeit mit Amazon Nova 2 Sonic. Diese vereinfachte Version demonstriert die Kernfunktionen, die für die Erstellung einer Audiokonversation mit dem Amazon Nova 2 Sonic-Modell erforderlich sind.

Sie können auf das folgende Beispiel in unserem GitHubNova-Beispiel-Repo zugreifen.

Es besteht ein Verbindungslimit von 8 Minuten. Das Muster für Verbindungserneuerung und Gesprächsfortsetzung ist verfügbar GitHub.

Importe und Konfiguration angeben

In diesem Abschnitt werden die erforderlichen Bibliotheken importiert und die Audiokonfigurationsparameter festgelegt:

  • asyncio: Für die asynchrone Programmierung

  • base64: Zum Kodieren und Dekodieren von Audiodaten

  • pyaudio: Für Audioaufnahme und -wiedergabe

  • Amazon-Bedrock-SDK-Komponenten für Streaming

  • Audiokonstanten definieren das Format der Audioaufnahme (16 kHz Samplerate, Monokanal)

import os import asyncio import base64 import json import uuid import pyaudio from aws_sdk_bedrock_runtime.client import BedrockRuntimeClient, InvokeModelWithBidirectionalStreamOperationInput from aws_sdk_bedrock_runtime.models import InvokeModelWithBidirectionalStreamInputChunk, BidirectionalInputPayloadPart from aws_sdk_bedrock_runtime.config import Config, HTTPAuthSchemeResolver, SigV4AuthScheme from smithy_aws_core.identity import EnvironmentCredentialsResolver # Audio configuration INPUT_SAMPLE_RATE = 16000 OUTPUT_SAMPLE_RATE = 24000 CHANNELS = 1 FORMAT = pyaudio.paInt16 CHUNK_SIZE = 1024

Definieren der SimpleNovaSonic-Klasse

Die SimpleNovaSonic-Klasse ist die Hauptklasse, welche die Interaktion mit Amazon Nova Sonic verwaltet:

  • model_id: Die Modell-ID von Amazon Nova Sonic (amazon.nova-2-sonic-v1:0)

  • region: Die AWS Region, die Standardeinstellung ist us-east-1

  • Einzigartig IDs für die Nachverfolgung von Anfragen und Inhalten

  • Eine asynchrone Warteschlange für die Audiowiedergabe

class SimpleNovaSonic: def __init__(self, model_id='amazon.nova-2-sonic-v1:0', region='us-east-1'): self.model_id = model_id self.region = region self.client = None self.stream = None self.response = None self.is_active = False self.prompt_name = str(uuid.uuid4()) self.content_name = str(uuid.uuid4()) self.audio_content_name = str(uuid.uuid4()) self.audio_queue = asyncio.Queue() self.display_assistant_text = False

Initialisieren des -Client

Diese Methode konfiguriert den Amazon-Bedrock-Client wie folgt:

  • Den entsprechenden Endpunkt für die angegebene Region

  • Authentifizierungsinformationen unter Verwendung von Umgebungsvariablen für AWS -Anmeldeinformationen

  • Das SigV4-Authentifizierungsschema für die AWS API-Aufrufe

def _initialize_client(self): """Initialize the Bedrock client.""" config = Config( endpoint_uri=f"https://bedrock-runtime.{self.region}.amazonaws.com", region=self.region, aws_credentials_identity_resolver=EnvironmentCredentialsResolver(), auth_scheme_resolver=HTTPAuthSchemeResolver(), auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="bedrock")} ) self.client = BedrockRuntimeClient(config=config)

Behandlung von Ereignissen

Diese Hilfsmethode sendet JSON-Ereignisse an den bidirektionalen Stream, der für die gesamte Kommunikation mit dem Amazon-Nova-Sonic-Modell verwendet wird:

async def send_event(self, event_json): """Send an event to the stream.""" event = InvokeModelWithBidirectionalStreamInputChunk( value=BidirectionalInputPayloadPart(bytes_=event_json.encode('utf-8')) ) await self.stream.input_stream.send(event)

Die Sitzung starten

Diese Methode initiiert die Sitzung und richtet die verbleibenden Ereignisse ein, um das Audio-Streaming zu starten. Das Senden dieser Ereignisse muss in der gleichen Reihenfolge erfolgen.

async def start_session(self): """Start a new session with Nova Sonic.""" if not self.client: self._initialize_client() # Initialize the stream self.stream = await self.client.invoke_model_with_bidirectional_stream( InvokeModelWithBidirectionalStreamOperationInput(model_id=self.model_id) ) self.is_active = True # Send session start event session_start = ''' { "event": { "sessionStart": { "inferenceConfiguration": { "maxTokens": 1024, "topP": 0.9, "temperature": 0.7 }, "turnDetectionConfiguration": { "endpointingSensitivity": "HIGH" } } } } ''' await self.send_event(session_start) # Send prompt start event prompt_start = f''' {{ "event": {{ "promptStart": {{ "promptName": "{self.prompt_name}", "textOutputConfiguration": {{ "mediaType": "text/plain" }}, "audioOutputConfiguration": {{ "mediaType": "audio/lpcm", "sampleRateHertz": 24000, "sampleSizeBits": 16, "channelCount": 1, "voiceId": "matthew", "encoding": "base64", "audioType": "SPEECH" }} }} }} }} ''' await self.send_event(prompt_start) # Send system prompt text_content_start = f''' {{ "event": {{ "contentStart": {{ "promptName": "{self.prompt_name}", "contentName": "{self.content_name}", "type": "TEXT", "interactive": true, "role": "SYSTEM", "textInputConfiguration": {{ "mediaType": "text/plain" }} }} }} }} ''' await self.send_event(text_content_start) system_prompt = "You are a friendly assistant. The user and you will engage in a spoken dialog " \ "exchanging the transcripts of a natural real-time conversation. Keep your responses short, " \ "generally two or three sentences for chatty scenarios." text_input = f''' {{ "event": {{ "textInput": {{ "promptName": "{self.prompt_name}", "contentName": "{self.content_name}", "content": "{system_prompt}" }} }} }} ''' await self.send_event(text_input) text_content_end = f''' {{ "event": {{ "contentEnd": {{ "promptName": "{self.prompt_name}", "contentName": "{self.content_name}" }} }} }} ''' await self.send_event(text_content_end) # Start processing responses self.response = asyncio.create_task(self._process_responses())

Behandlung der Audioeingabe

Diese Methoden behandeln den Lebenszyklus der Audioeingabe:

  • start_audio_input: Konfiguriert und startet den Audioeingangsstream

  • send_audio_chunk: Kodiert Audioblöcke und sendet sie an das Modell

  • end_audio_input: Schließt den Audioeingangsstream ordnungsgemäß

async def start_audio_input(self): """Start audio input stream.""" audio_content_start = f''' {{ "event": {{ "contentStart": {{ "promptName": "{self.prompt_name}", "contentName": "{self.audio_content_name}", "type": "AUDIO", "interactive": true, "role": "USER", "audioInputConfiguration": {{ "mediaType": "audio/lpcm", "sampleRateHertz": 16000, "sampleSizeBits": 16, "channelCount": 1, "audioType": "SPEECH", "encoding": "base64" }} }} }} }} ''' await self.send_event(audio_content_start) async def send_audio_chunk(self, audio_bytes): """Send an audio chunk to the stream.""" if not self.is_active: return blob = base64.b64encode(audio_bytes) audio_event = f''' {{ "event": {{ "audioInput": {{ "promptName": "{self.prompt_name}", "contentName": "{self.audio_content_name}", "content": "{blob.decode('utf-8')}" }} }} }} ''' await self.send_event(audio_event) async def end_audio_input(self): """End audio input stream.""" audio_content_end = f''' {{ "event": {{ "contentEnd": {{ "promptName": "{self.prompt_name}", "contentName": "{self.audio_content_name}" }} }} }} ''' await self.send_event(audio_content_end)

Beenden der Sitzung

Diese Methode schließt die Sitzung ordnungsgemäß durch:

  • Das Senden von promptEnd-Ereignissen

  • Das Senden von sessionEnd-Ereignissen

  • Das Schließen des Eingabestreams

async def end_session(self): """End the session.""" if not self.is_active: return prompt_end = f''' {{ "event": {{ "promptEnd": {{ "promptName": "{self.prompt_name}" }} }} }} ''' await self.send_event(prompt_end) session_end = ''' { "event": { "sessionEnd": {} } } ''' await self.send_event(session_end) # close the stream await self.stream.input_stream.close()

Umgang mit Antworten

Diese Methode verarbeitet kontinuierlich Antworten aus dem Modell und führt Folgendes aus:

  • Warten auf die Ausgabe aus dem Stream.

  • Analysieren der JSON-Antwort.

  • Verarbeiten der Textausgabe durch Drucken auf der Konsole mit automatischer Spracherkennung und Transkription.

  • Verwalten der Audioausgabe durch Dekodierung und Warteschleife für die Wiedergabe.

async def _process_responses(self): """Process responses from the stream.""" try: while self.is_active: output = await self.stream.await_output() result = await output[1].receive() if result.value and result.value.bytes_: response_data = result.value.bytes_.decode('utf-8') json_data = json.loads(response_data) if 'event' in json_data: # Handle content start event if 'contentStart' in json_data['event']: content_start = json_data['event']['contentStart'] # set role self.role = content_start['role'] # Check for speculative content if 'additionalModelFields' in content_start: additional_fields = json.loads(content_start['additionalModelFields']) if additional_fields.get('generationStage') == 'SPECULATIVE': self.display_assistant_text = True else: self.display_assistant_text = False # Handle text output event elif 'textOutput' in json_data['event']: text = json_data['event']['textOutput']['content'] if (self.role == "ASSISTANT" and self.display_assistant_text): print(f"Assistant: {text}") elif self.role == "USER": print(f"User: {text}") # Handle audio output elif 'audioOutput' in json_data['event']: audio_content = json_data['event']['audioOutput']['content'] audio_bytes = base64.b64decode(audio_content) await self.audio_queue.put(audio_bytes) except Exception as e: print(f"Error processing responses: {e}")

Audio wiedergeben

Das Verfahren führt die folgenden Aufgaben durch:

  • Initialisieren eines PyAudio-Eingabestreams

  • Empfängt kontinuierlich Audiodaten aus der Warteschlange

  • Spielt den Ton über die Lautsprecher ab

  • Bereinigt Ressourcen ordnungsgemäß nach Abschluss

async def play_audio(self): """Play audio responses.""" p = pyaudio.PyAudio() stream = p.open( format=FORMAT, channels=CHANNELS, rate=OUTPUT_SAMPLE_RATE, output=True )

versuche:

while self.is_active: audio_data = await self.audio_queue.get() stream.write(audio_data) except Exception as e: print(f"Error playing audio: {e}") finally: stream.stop_stream() stream.close() p.terminate()

Audio aufnehmen

Das Verfahren führt die folgenden Aufgaben durch:

  • Initialisiert einen PyAudio Ausgabestrom

  • Startet die Audioeingabesitzung

  • Nimmt kontinuierlich Audiodateien vom Mikrofon auf

  • Sendet jeden Block an das Amazon-Nova-Sonic-Modell

  • Bereinigt Ressourcen ordnungsgemäß nach Abschluss

async def capture_audio(self): """Capture audio from microphone and send to Nova Sonic.""" p = pyaudio.PyAudio() stream = p.open( format=FORMAT, channels=CHANNELS, rate=INPUT_SAMPLE_RATE, input=True, frames_per_buffer=CHUNK_SIZE ) print("Starting audio capture. Speak into your microphone...") print("Press Enter to stop...") await self.start_audio_input()

versuche:

while self.is_active: audio_data = stream.read(CHUNK_SIZE, exception_on_overflow=False) await self.send_audio_chunk(audio_data) await asyncio.sleep(0.01) except Exception as e: print(f"Error capturing audio: {e}") finally: stream.stop_stream() stream.close() p.terminate() print("Audio capture stopped.") await self.end_audio_input()

Die Hauptfunktion ausführen

Die Hauptfunktion orchestriert den gesamten Prozess, indem sie Folgendes ausführt:

  • Erzeugt einen Amazon Nova 2 Sonic Client

  • Sitzung starten

  • Erstellt gleichzeitige Aufgaben für die Audiowiedergabe und -aufnahme

  • Wartet darauf, dass der Benutzer Enter zum Beenden drückt

  • Beendet die Sitzung ordnungsgemäß und bereinigt Aufgaben

async def main(): # Create Nova Sonic client nova_client = SimpleNovaSonic() # Start session await nova_client.start_session() # Start audio playback task playback_task = asyncio.create_task(nova_client.play_audio()) # Start audio capture task capture_task = asyncio.create_task(nova_client.capture_audio()) # Wait for user to press Enter to stop await asyncio.get_event_loop().run_in_executor(None, input) # End session nova_client.is_active = False # First cancel the tasks tasks = [] if not playback_task.done(): tasks.append(playback_task) if not capture_task.done(): tasks.append(capture_task) for task in tasks: task.cancel() if tasks: await asyncio.gather(*tasks, return_exceptions=True) # cancel the response task if nova_client.response and not nova_client.response.done(): nova_client.response.cancel() await nova_client.end_session() print("Session ended") if __name__ == "__main__": # Set AWS credentials if not using environment variables # os.environ['AWS_ACCESS_KEY_ID'] = "your-access-key" # os.environ['AWS_SECRET_ACCESS_KEY'] = "your-secret-key" # os.environ['AWS_DEFAULT_REGION'] = "us-east-1" asyncio.run(main())