

# 音声変換の開始方法
<a name="sonic-getting-started"></a>

次のセクションでは、Amazon Nova 2 Sonic を使用してシンプルでリアルタイムのオーディオストリーミングアプリケーションを実装する具体例を示し、その方法をステップバイステップで説明します。この簡略化されたバージョンは、Amazon Nova 2 Sonic モデルとの音声会話を作成するために必要なコア機能を示しています。

[Nova サンプル GitHub リポジトリ](https://github.com/aws-samples/amazon-nova-samples)では、次の例にアクセスできます。

接続制限は 8 分で、接続の更新と会話の継続パターンは [GitHub](https://github.com/aws-samples/amazon-nova-samples/tree/main/speech-to-speech/amazon-nova-2-sonic/repeatable-patterns/session-continuation/console-python) で利用できます。

## インポートと設定を記述する
<a name="sonic-imports-config"></a>

このセクションでは、必要なライブラリをインポートし、オーディオ設定パラメータを設定します。
+ `asyncio`: 非同期プログラミング用
+ `base64`: オーディオデータのエンコードとデコード用
+ `pyaudio`: オーディオキャプチャと再生用
+ ストリーミング用の Amazon Bedrock SDK コンポーネント
+ オーディオ定数は、オーディオキャプチャの形式 (16kHz サンプルレート、モノチャネル) を定義します。

```
import os
import asyncio
import base64
import json
import uuid
import pyaudio
from aws_sdk_bedrock_runtime.client import BedrockRuntimeClient, InvokeModelWithBidirectionalStreamOperationInput
from aws_sdk_bedrock_runtime.models import InvokeModelWithBidirectionalStreamInputChunk, BidirectionalInputPayloadPart
from aws_sdk_bedrock_runtime.config import Config, HTTPAuthSchemeResolver, SigV4AuthScheme
from smithy_aws_core.identity import EnvironmentCredentialsResolver

# Audio configuration
INPUT_SAMPLE_RATE = 16000
OUTPUT_SAMPLE_RATE = 24000
CHANNELS = 1
FORMAT = pyaudio.paInt16
CHUNK_SIZE = 1024
```

## `SimpleNovaSonic` クラスを定義する
<a name="sonic-define-class"></a>

`SimpleNovaSonic` クラスは、Amazon Nova Sonic インタラクションを処理するメインクラスです。
+ `model_id`: Amazon Nova Sonic モデル ID (`amazon.nova-2-sonic-v1:0`)
+ `region`: AWS リージョン。デフォルトは `us-east-1` です
+ プロンプトとコンテンツの追跡用の一意の ID
+ オーディオ再生用の非同期キュー

```
class SimpleNovaSonic:
    def __init__(self, model_id='amazon.nova-2-sonic-v1:0', region='us-east-1'):
        self.model_id = model_id
        self.region = region
        self.client = None
        self.stream = None
        self.response = None
        self.is_active = False
        self.prompt_name = str(uuid.uuid4())
        self.content_name = str(uuid.uuid4())
        self.audio_content_name = str(uuid.uuid4())
        self.audio_queue = asyncio.Queue()
        self.display_assistant_text = False
```

## クライアントの初期化
<a name="sonic-initialize-client"></a>

この方法では、Amazon Bedrock クライアントを次のように設定します。
+ 指定されたリージョンに適したエンドポイント
+ AWS 認証情報の環境変数を使用した認証情報
+ AWS API コールの SigV4 認証スキーム

```
    def _initialize_client(self):
        """Initialize the Bedrock client."""
        config = Config(
            endpoint_uri=f"https://bedrock-runtime.{self.region}.amazonaws.com",
            region=self.region,
            aws_credentials_identity_resolver=EnvironmentCredentialsResolver(),
            auth_scheme_resolver=HTTPAuthSchemeResolver(),
            auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="bedrock")}
        )
        self.client = BedrockRuntimeClient(config=config)
```

## イベントを処理する
<a name="sonic-handle-events"></a>

次のヘルパーメソッドは、Amazon Nova Sonic モデルとのすべての通信に使用される双方向ストリームに JSON イベントを送信します。

```
    async def send_event(self, event_json):
        """Send an event to the stream."""
        event = InvokeModelWithBidirectionalStreamInputChunk(
            value=BidirectionalInputPayloadPart(bytes_=event_json.encode('utf-8'))
        )
        await self.stream.input_stream.send(event)
```

## セッションを開始する
<a name="sonic-start-session"></a>

このメソッドはセッションを開始し、残りのイベントを設定してオーディオストリーミングを開始します。これらのイベントは同じ順序で送信する必要があります。

```
    async def start_session(self):
        """Start a new session with Nova Sonic."""
        if not self.client:
            self._initialize_client()
            
        # Initialize the stream
        self.stream = await self.client.invoke_model_with_bidirectional_stream(
            InvokeModelWithBidirectionalStreamOperationInput(model_id=self.model_id)
        )
        self.is_active = True
        
        # Send session start event
        session_start = '''
        {
          "event": {
            "sessionStart": {
              "inferenceConfiguration": {
                "maxTokens": 1024,
                "topP": 0.9,
                "temperature": 0.7
              },
              "turnDetectionConfiguration": {
                "endpointingSensitivity": "HIGH"
              }
            }
          }
        }
        '''
        await self.send_event(session_start)
        
        # Send prompt start event
        prompt_start = f'''
        {{
          "event": {{
            "promptStart": {{
              "promptName": "{self.prompt_name}",
              "textOutputConfiguration": {{
                "mediaType": "text/plain"
              }},
              "audioOutputConfiguration": {{
                "mediaType": "audio/lpcm",
                "sampleRateHertz": 24000,
                "sampleSizeBits": 16,
                "channelCount": 1,
                "voiceId": "matthew",
                "encoding": "base64",
                "audioType": "SPEECH"
              }}
            }}
          }}
        }}
        '''
        await self.send_event(prompt_start)
        
        # Send system prompt
        text_content_start = f'''
        {{
            "event": {{
                "contentStart": {{
                    "promptName": "{self.prompt_name}",
                    "contentName": "{self.content_name}",
                    "type": "TEXT",
                    "interactive": true,
                    "role": "SYSTEM",
                    "textInputConfiguration": {{
                        "mediaType": "text/plain"
                    }}
                }}
            }}
        }}
        '''
        await self.send_event(text_content_start)
        
        system_prompt = "You are a friendly assistant. The user and you will engage in a spoken dialog " \
            "exchanging the transcripts of a natural real-time conversation. Keep your responses short, " \
            "generally two or three sentences for chatty scenarios."
        
        
        text_input = f'''
        {{
            "event": {{
                "textInput": {{
                    "promptName": "{self.prompt_name}",
                    "contentName": "{self.content_name}",
                    "content": "{system_prompt}"
                }}
            }}
        }}
        '''
        await self.send_event(text_input)
        
        text_content_end = f'''
        {{
            "event": {{
                "contentEnd": {{
                    "promptName": "{self.prompt_name}",
                    "contentName": "{self.content_name}"
                }}
            }}
        }}
        '''
        await self.send_event(text_content_end)
        
        # Start processing responses
        self.response = asyncio.create_task(self._process_responses())
```

## オーディオ入力を処理する
<a name="sonic-handle-audio-input"></a>

これらのメソッドは、オーディオ入力ライフサイクルを処理します。
+ `start_audio_input`: オーディオ入力ストリームを設定して開始します
+ `send_audio_chunk`: オーディオチャンクをエンコードしてモデルに送信します
+ `end_audio_input`: オーディオ入力ストリームを適切に閉じます

```
    async def start_audio_input(self):
        """Start audio input stream."""
        audio_content_start = f'''
        {{
            "event": {{
                "contentStart": {{
                    "promptName": "{self.prompt_name}",
                    "contentName": "{self.audio_content_name}",
                    "type": "AUDIO",
                    "interactive": true,
                    "role": "USER",
                    "audioInputConfiguration": {{
                        "mediaType": "audio/lpcm",
                        "sampleRateHertz": 16000,
                        "sampleSizeBits": 16,
                        "channelCount": 1,
                        "audioType": "SPEECH",
                        "encoding": "base64"
                    }}
                }}
            }}
        }}
        '''
        await self.send_event(audio_content_start)
    
    async def send_audio_chunk(self, audio_bytes):
        """Send an audio chunk to the stream."""
        if not self.is_active:
            return
            
        blob = base64.b64encode(audio_bytes)
        audio_event = f'''
        {{
            "event": {{
                "audioInput": {{
                    "promptName": "{self.prompt_name}",
                    "contentName": "{self.audio_content_name}",
                    "content": "{blob.decode('utf-8')}"
                }}
            }}
        }}
        '''
        await self.send_event(audio_event)
    
    async def end_audio_input(self):
        """End audio input stream."""
        audio_content_end = f'''
        {{
            "event": {{
                "contentEnd": {{
                    "promptName": "{self.prompt_name}",
                    "contentName": "{self.audio_content_name}"
                }}
            }}
        }}
        '''
        await self.send_event(audio_content_end)
```

## セッションを終了する
<a name="sonic-end-session"></a>

このメソッドは、次の方法でセッションを適切に閉じます。
+ `promptEnd` イベントの送信
+ `sessionEnd` イベントの送信
+ 入力ストリームを閉じる

```
    async def end_session(self):
        """End the session."""
        if not self.is_active:
            return
            
        prompt_end = f'''
        {{
            "event": {{
                "promptEnd": {{
                    "promptName": "{self.prompt_name}"
                }}
            }}
        }}
        '''
        await self.send_event(prompt_end)
        
        session_end = '''
        {
            "event": {
                "sessionEnd": {}
            }
        }
        '''
        await self.send_event(session_end)
        # close the stream
        await self.stream.input_stream.close()
```

## レスポンスを処理する
<a name="sonic-handle-responses"></a>

このメソッドは、モデルからのレスポンスを継続的に処理し、以下を実行します。
+ ストリームからの出力を待ちます。
+ JSON レスポンスを解析します。
+ 自動音声認識と文字起こしを使用してコンソールに表示することで、テキスト出力を処理します。
+ デコードして再生キューに入れることで、オーディオ出力を処理します。

```
    async def _process_responses(self):
        """Process responses from the stream."""
        try:
            while self.is_active:
                output = await self.stream.await_output()
                result = await output[1].receive()
                
                if result.value and result.value.bytes_:
                    response_data = result.value.bytes_.decode('utf-8')
                    json_data = json.loads(response_data)
                    
                    if 'event' in json_data:
                        # Handle content start event
                        if 'contentStart' in json_data['event']:
                            content_start = json_data['event']['contentStart'] 
                            # set role
                            self.role = content_start['role']
                            # Check for speculative content
                            if 'additionalModelFields' in content_start:
                                additional_fields = json.loads(content_start['additionalModelFields'])
                                if additional_fields.get('generationStage') == 'SPECULATIVE':
                                    self.display_assistant_text = True
                                else:
                                    self.display_assistant_text = False
                                
                        # Handle text output event
                        elif 'textOutput' in json_data['event']:
                            text = json_data['event']['textOutput']['content']    
                           
                            if (self.role == "ASSISTANT" and self.display_assistant_text):
                                print(f"Assistant: {text}")
                            elif self.role == "USER":
                                print(f"User: {text}")
                        
                        # Handle audio output
                        elif 'audioOutput' in json_data['event']:
                            audio_content = json_data['event']['audioOutput']['content']
                            audio_bytes = base64.b64decode(audio_content)
                            await self.audio_queue.put(audio_bytes)
        except Exception as e:
            print(f"Error processing responses: {e}")
```

## オーディオを再生する
<a name="sonic-playback-audio"></a>

このメソッドは、次のタスクを実行します。
+ `PyAudio` 入力ストリームを初期化する
+ キューからオーディオデータを継続的に取得する
+ スピーカーを介して音声を再生する
+ 完了時にリソースを適切にクリーンアップする

```
    async def play_audio(self):
        """Play audio responses."""
        p = pyaudio.PyAudio()
        stream = p.open(
            format=FORMAT,
            channels=CHANNELS,
            rate=OUTPUT_SAMPLE_RATE,
            output=True
        )
```

次を試してみましょう:

```
            while self.is_active:
                audio_data = await self.audio_queue.get()
                stream.write(audio_data)
        except Exception as e:
            print(f"Error playing audio: {e}")
        finally:
            stream.stop_stream()
            stream.close()
            p.terminate()
```

## 音声をキャプチャする
<a name="sonic-capture-audio"></a>

このメソッドは、次のタスクを実行します。
+ `PyAudio` 出力ストリームを初期化する
+ オーディオ入力セッションを開始する
+ マイクからオーディオチャンクを継続的にキャプチャする
+ 各チャンクを Amazon Nova Sonic モデルに送信する
+ 完了時にリソースを適切にクリーンアップする

```
    async def capture_audio(self):
        """Capture audio from microphone and send to Nova Sonic."""
        p = pyaudio.PyAudio()
        stream = p.open(
            format=FORMAT,
            channels=CHANNELS,
            rate=INPUT_SAMPLE_RATE,
            input=True,
            frames_per_buffer=CHUNK_SIZE
        )
        
        print("Starting audio capture. Speak into your microphone...")
        print("Press Enter to stop...")
        
        await self.start_audio_input()
```

次を試してみましょう:

```
            while self.is_active:
                audio_data = stream.read(CHUNK_SIZE, exception_on_overflow=False)
                await self.send_audio_chunk(audio_data)
                await asyncio.sleep(0.01)
        except Exception as e:
            print(f"Error capturing audio: {e}")
        finally:
            stream.stop_stream()
            stream.close()
            p.terminate()
            print("Audio capture stopped.")
            await self.end_audio_input()
```

## メイン関数を実行する
<a name="sonic-run-main"></a>

メイン関数は、以下を実行してプロセス全体をオーケストレーションします。
+ Amazon Nova 2 Sonic クライアントを作成する
+ セッションを開始する
+ オーディオ再生とキャプチャの同時タスクを作成する
+ ユーザーが **Enter** を押して停止するのを待つ
+ セッションを適切に終了し、タスクをクリーンアップする

```
async def main():
    # Create Nova Sonic client
    nova_client = SimpleNovaSonic()
    
    # Start session
    await nova_client.start_session()
    
    # Start audio playback task
    playback_task = asyncio.create_task(nova_client.play_audio())
    
    # Start audio capture task
    capture_task = asyncio.create_task(nova_client.capture_audio())
    
    # Wait for user to press Enter to stop
    await asyncio.get_event_loop().run_in_executor(None, input)
    
    # End session
    nova_client.is_active = False
    
    # First cancel the tasks
    tasks = []
    if not playback_task.done():
        tasks.append(playback_task)
    if not capture_task.done():
        tasks.append(capture_task)
    for task in tasks:
        task.cancel()
    if tasks:
        await asyncio.gather(*tasks, return_exceptions=True)
    
    # cancel the response task
    if nova_client.response and not nova_client.response.done():
        nova_client.response.cancel()
    
    await nova_client.end_session()
    print("Session ended")

if __name__ == "__main__":
    # Set AWS credentials if not using environment variables
    # os.environ['AWS_ACCESS_KEY_ID'] = "your-access-key"
    # os.environ['AWS_SECRET_ACCESS_KEY'] = "your-secret-key"
    # os.environ['AWS_DEFAULT_REGION'] = "us-east-1"

    asyncio.run(main())
```