

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 在 Amazon Bedrock 代理程式中撰寫自訂剖析器 Lambda 函數
<a name="lambda-parser"></a>

每個提示範本都包含剖析器 Lambda 函數。您可以撰寫自己的自訂剖析器 Lambda 函數，並指定要覆寫其預設剖析器函數的範本。為了撰寫自訂剖析器 Lambda 函數，您必須了解代理程式傳送的輸入事件，以及代理程式預期作為 Lambda 函數的輸出的回應。您撰寫處理常式函數來操作輸入事件中的變數，並傳回回應。如需有關 如何AWS Lambda運作的詳細資訊，請參閱《 AWS Lambda開發人員指南》中的[事件驅動叫用](https://docs.aws.amazon.com/lambda/latest/dg/lambda-services.html#event-driven-invocation)。

**Topics**
+ [剖析器 Lambda 輸入事件](#lambda-parser-input)
+ [剖析器 Lambda 回應](#lambda-parser-response)
+ [剖析器 Lambda 範例](#lambda-parser-example)

## 剖析器 Lambda 輸入事件
<a name="lambda-parser-input"></a>

以下是來自代理程式的輸入事件的一般結構。使用這些欄位來撰寫 Lambda 處理常式函數。

```
{
    "messageVersion": "1.0",
    "agent": {
        "name": "string",
        "id": "string",
        "alias": "string",
        "version": "string"
    },
    "invokeModelRawResponse": "string",
    "promptType": "ORCHESTRATION | ROUTING_CLASSIFIER | POST_PROCESSING | PRE_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION | MEMORY_SUMMARIZATION",
    "overrideType": "OUTPUT_PARSER"
}
```

下列清單說明輸入事件欄位：
+ `messageVersion` – 訊息版本，用於識別進入 Lambda 函數的事件資料的格式，以及來自 Lambda 函數的回應的預期格式。Amazon Bedrock 代理程式僅支援 1.0 版。
+ `agent`— 包含提示所屬代理程式的名稱、ID、別名和版本的相關資訊。
+ `invokeModelRawResponse`— 要剖析其輸出之提示的原始基礎模型輸出。
+ `promptType`— 要剖析其輸出的提示類型。
+ `overrideType`- 此 Lambda 函數覆寫的成品。目前僅支援 `OUTPUT_PARSER`，這表示要覆寫預設剖析器。

## 剖析器 Lambda 回應
<a name="lambda-parser-response"></a>

您的代理程式預期 Lambda 函數回應，並使用該回應採取進一步動作或協助將回應傳回給使用者。您的代理程式會執行代理程式模型建議的下一個動作。下一個動作可以序列順序或平行執行，具體取決於代理程式的模型以及代理程式的建立和準備時間。

如果您已在 *2024 年 10 月 4 日之前*建立並準備代理程式，且您的代理程式正在使用 Anthropic Claude 3 Sonnet 或 Anthropic Claude 3.5 Sonnet 模型，則根據預設，代理程式模型建議的下一個最佳動作將按序列順序執行。

如果您已在 *2024 年 10 月 10 日之後*建立新的代理程式或準備現有的代理程式，且您的代理程式正在使用 Anthropic Claude 3 Sonnet、Anthropic Claude 3.5 Sonnet 或任何 non-Anthropic 模型，則代理程式模型建議的下一個步驟動作將平行執行。這表示多個動作會平行執行，例如動作群組函數和知識庫的混合。這可減少對模型進行的呼叫次數，進而減少整體延遲。

您可以透過呼叫 [PrepareAgent](https://docs.aws.amazon.com//bedrock/latest/APIReference/API_agent_PrepareAgent.html) API 或在主控台的代理程式建置器中選取**準備**，為在 *2024 年 10 月 4 日之前*建立和準備的代理程式啟用平行動作。代理程式準備好後，您會看到更新的提示範本和新版本的剖析器 Lambda 結構描述。

**範例剖析器 Lambda 回應**

以下是代理程式以序列順序執行下一個優先建議動作，以及代理程式平行執行下一個動作之回應的一般結構範例。使用 Lambda 函數回應欄位來設定輸出的傳回方式。

**代理程式以序列順序執行下一個優先建議動作的回應範例**

選取對應於您是否使用 OpenAPI 結構描述或函數詳細資訊定義動作群組的索引標籤：

**注意**  
`MessageVersion 1.0` 表示代理程式正在依序列順序執行下一個優先建議的動作。

------
#### [ OpenAPI schema ]

```
{
    "messageVersion": "1.0",
    "promptType": "ORCHESTRATION | PRE_PROCESSING | ROUTING_CLASSIFIER | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION",
    "preProcessingParsedResponse": {
        "isValidInput": "boolean",
        "rationale": "string"
    },
    "orchestrationParsedResponse": {
        "rationale": "string",
        "parsingErrorDetails": {
            "repromptResponse": "string"
        },
        "responseDetails": {
            "invocationType": "AGENT_COLLABORATOR | ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER",
            "agentAskUser": {
                "responseText": "string",
                "id": "string"
            },
             "agentCollaboratorInvocation": {
                "agentCollaboratorName": "string",
                "input": {
                    "text": "string"                    
                }
            }
            ...
        }
    },
    "routingClassifierParsedResponse": {
        "parsingErrorDetails": {
            "repromptResponse": "string"
        },
        "responseDetails": {
            "type": "AGENT | LAST_AGENT | UNDECIDED",
            "agentCollaboratorInvocation": {
                "agentCollaboratorName": "string",
                "input": {
                    "text": "string"                    
                    }
            }
        }
    }
}
            "actionGroupInvocation": {
                "actionGroupName": "string",
                "apiName": "string",
                "id": "string",
                "verb": "string",
                "actionGroupInput": {
                    "<parameter>": {
                        "value": "string"
                    },
                    ...
                }
            },
            "agentKnowledgeBase": {
                "knowledgeBaseId": "string",
                "id": "string",
                "searchQuery": {
                    "value": "string"
                }
            },
            "agentFinalResponse": {
                "responseText": "string",
                "citations": {
                    "generatedResponseParts": [{
                        "text": "string",
                        "references": [{"sourceId": "string"}]
                    }]
                }
            },
        }
    },
    "knowledgeBaseResponseGenerationParsedResponse": { 
       "generatedResponse": {
            "generatedResponseParts": [
                {
                    "text": "string",
                    "references": [
                        {"sourceId": "string"},
                        ...
                    ]
                }
            ]
        }
    },
    "postProcessingParsedResponse": {
        "responseText": "string",
        "citations": {
            "generatedResponseParts": [{
                "text": "string",
                "references": [{
                    "sourceId": "string"
                }]
            }]
        }
    }
}
```

------
#### [ Function details ]

```
{
    "messageVersion": "1.0",
    "promptType": "ORCHESTRATION | PRE_PROCESSING | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION",
    "preProcessingParsedResponse": {
        "isValidInput": "boolean",
        "rationale": "string"
    },
    "orchestrationParsedResponse": {
        "rationale": "string",
        "parsingErrorDetails": {
            "repromptResponse": "string"
        },
        "responseDetails": {
            "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER",
            "agentAskUser": {
                "responseText": "string",
                "id": "string"
            },
            "actionGroupInvocation": {
                "actionGroupName": "string",
                "functionName": "string",
                "id": "string",
                "actionGroupInput": {
                    "<parameter>": {
                        "value": "string"
                    },
                    ...
                }
            },
            "agentKnowledgeBase": {
                "knowledgeBaseId": "string",
                "id": "string",
                "searchQuery": {
                    "value": "string"
                }
            },
            "agentFinalResponse": {
                "responseText": "string",
                "citations": {
                    "generatedResponseParts": [{
                        "text": "string",
                        "references": [{"sourceId": "string"}]
                    }]
                }
            },
        }
    },
    "knowledgeBaseResponseGenerationParsedResponse": { 
       "generatedResponse": {
            "generatedResponseParts": [
                {
                    "text": "string",
                    "references": [
                        {"sourceId": "string"},
                        ...
                    ]
                }
            ]
        }
    },
    "postProcessingParsedResponse": {
        "responseText": "string",
        "citations": {
            "generatedResponseParts": [{
                "text": "string",
                "references": [{
                    "sourceId": "string"
                }]
            }]
        }
    }
}
```

------

**代理程式平行執行下一個動作的範例回應 **

選取對應於您是否使用 OpenAPI 結構描述或函數詳細資訊定義動作群組的索引標籤：

**注意**  
`MessageVersion 2.0` 表示代理程式正在平行執行下一個建議的動作 

------
#### [ OpenAPI schema ]

```
{
    "messageVersion": "2.0",
    "promptType": "ORCHESTRATION | PRE_PROCESSING | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION",
    "preProcessingParsedResponse": {
        "isValidInput": "boolean",
        "rationale": "string"
    },
    "orchestrationParsedResponse": {
        "rationale": "string",
        "parsingErrorDetails": {
            "repromptResponse": "string"
        },
        "responseDetails": {
            "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER",
            "agentAskUser": {
                "responseText": "string"
            },
            "actionGroupInvocations": [
                {
                    "actionGroupName": "string",
                    "apiName": "string",
                    "verb": "string",
                    "actionGroupInput": {
                        "<parameter>": {
                            "value": "string"
                        },
                        ...
                    }
                }
            ],
            "agentKnowledgeBases": [
                {
                    "knowledgeBaseId": "string",
                    "searchQuery": {
                        "value": "string"
                    }
                }
            ],
            "agentFinalResponse": {
                "responseText": "string",
                "citations": {
                    "generatedResponseParts": [{
                        "text": "string",
                        "references": [{"sourceId": "string"}]
                    }]
                }
            },
        }
    },
    "knowledgeBaseResponseGenerationParsedResponse": { 
       "generatedResponse": {
            "generatedResponseParts": [
                {
                    "text": "string",
                    "references": [
                        {"sourceId": "string"},
                        ...
                    ]
                }
            ]
        }
    },
    "postProcessingParsedResponse": {
        "responseText": "string",
        "citations": {
            "generatedResponseParts": [{
                "text": "string",
                "references": [{
                    "sourceId": "string"
                }]
            }]
        }
    }
}
```

------
#### [ Function details ]

```
{
    "messageVersion": "2.0",
    "promptType": "ORCHESTRATION | PRE_PROCESSING | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION",
    "preProcessingParsedResponse": {
        "isValidInput": "boolean",
        "rationale": "string"
    },
    "orchestrationParsedResponse": {
        "rationale": "string",
        "parsingErrorDetails": {
            "repromptResponse": "string"
        },
        "responseDetails": {
            "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER",
            "agentAskUser": {
                "responseText": "string"
            },
            "actionGroupInvocations": [
                {
                    "actionGroupName": "string",
                    "functionName": "string",
                    "actionGroupInput": {
                        "<parameter>"": {
                            "value": "string"
                        },
                        ...
                    }
                }
            ],
            "agentKnowledgeBases": [
                {
                    "knowledgeBaseId": "string",
                    "searchQuery": {
                        "value": "string"
                    }
                }
            ],
            "agentFinalResponse": {
                "responseText": "string",
                "citations": {
                    "generatedResponseParts": [{
                        "text": "string",
                        "references": [{"sourceId": "string"}]
                    }]
                }
            },
        }
    },
    "knowledgeBaseResponseGenerationParsedResponse": { 
       "generatedResponse": {
            "generatedResponseParts": [
                {
                    "text": "string",
                    "references": [
                        {"sourceId": "string"},
                        ...
                    ]
                }
            ]
        }
    },
    "postProcessingParsedResponse": {
        "responseText": "string",
        "citations": {
            "generatedResponseParts": [{
                "text": "string",
                "references": [{
                    "sourceId": "string"
                }]
            }]
        }
    }
}
```

------

下列清單描述 Lambda 回應欄位：
+ `messageVersion` – 訊息版本，用於識別進入 Lambda 函數的事件資料的格式，以及來自 Lambda 函數的回應的預期格式。
+ `promptType` – 目前回合的提示類型。
+ `preProcessingParsedResponse`— `PRE_PROCESSING` 提示類型的已剖析回應。
+ `orchestrationParsedResponse`— `ORCHESTRATION` 提示類型的已剖析回應。如需詳細資訊，請參閱下述。
+ `knowledgeBaseResponseGenerationParsedResponse`— `KNOWLEDGE_BASE_RESPONSE_GENERATION` 提示類型的已剖析回應。
+ `postProcessingParsedResponse`— `POST_PROCESSING` 提示類型的已剖析回應。

如需四個提示範本剖析回應的詳細資訊，請參閱下列索引標籤。

------
#### [ preProcessingParsedResponse ]

```
{
    "isValidInput": "boolean",
    "rationale": "string"
}
```

`preProcessingParsedResponse` 包含下列欄位。
+ `isValidInput`— 指定使用者輸入是否有效。您可以定義函數來決定如何表徵使用者輸入的有效性。
+ `rationale`— 使用者輸入分類的推論。這個基本原理由原始回應中的模型提供，Lambda 函數會進行剖析，而代理程式會在追蹤中將其呈現以進行預先處理。

------
#### [ orchestrationResponse ]

`orchestrationResponse` 的格式取決於您是否使用 OpenAPI 結構描述或函數詳細資訊定義動作群組：
+ 如果您使用 OpenAPI 結構描述定義動作群組，回應必須採用下列格式：

  ```
  {
      "rationale": "string",
      "parsingErrorDetails": {
          "repromptResponse": "string"
      },
      "responseDetails": {
          "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER",
          "agentAskUser": {
              "responseText": "string",
              "id": "string"
          },
          "actionGroupInvocation": {
              "actionGroupName": "string",
              "apiName": "string",
              "id": "string",
              "verb": "string",
              "actionGroupInput": {
                  "<parameter>": {
                      "value": "string"
                  },
                  ...
              }
          },
          "agentKnowledgeBase": {
              "knowledgeBaseId": "string",
              "id": "string",
              "searchQuery": {
                  "value": "string"
              }
          },
          "agentFinalResponse": {
              "responseText": "string",
              "citations": {
                  "generatedResponseParts": [
                      {
                          "text": "string",
                          "references": [
                              {"sourceId": "string"},
                              ...
                          ]
                      },
                      ...
                  ]
              }
          },
      }
  }
  ```
+ 如果您使用函數詳細資訊定義動作群組，回應必須採用下列格式：

  ```
  {
      "rationale": "string",
      "parsingErrorDetails": {
          "repromptResponse": "string"
      },
      "responseDetails": {
          "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER",
          "agentAskUser": {
              "responseText": "string",
              "id": "string"
          },
          "actionGroupInvocation": {
              "actionGroupName": "string",
              "functionName": "string",
              "id": "string",
              "actionGroupInput": {
                  "<parameter>": {
                      "value": "string"
                  },
                  ...
              }
          },
          "agentKnowledgeBase": {
              "knowledgeBaseId": "string",
              "id": "string",
              "searchQuery": {
                  "value": "string"
              }
          },
          "agentFinalResponse": {
              "responseText": "string",
              "citations": {
                  "generatedResponseParts": [
                      {
                          "text": "string",
                          "references": [
                              {"sourceId": "string"},
                              ...
                          ]
                      },
                      ...
                  ]
              }
          },
      }
  }
  ```

`orchestrationParsedResponse` 包含下列欄位：
+ `rationale`— 依照基礎模型輸出來推論下一步做什麼。您可以定義要從模型輸出剖析的函數。
+ `parsingErrorDetails`— 包含 `repromptResponse`，這是在無法剖析模型回應時，重新提示模型以更新其原始回應的訊息。您可以定義函數來操控如何重新提示模型。
+ `responseDetails`— 包含如何處理基礎模型輸出的詳細資訊。包含 `invocationType`，這是代理程式要採用的下一個步驟，以及第二個應符合 `invocationType` 的欄位。以下是可能的物件。
  + `agentAskUser` - 與 `ASK_USER` 調用類型相容。此調用類型會結束協同運作步驟。包含 `responseText` 以要求使用者提供更多資訊。您可以定義函數來操控此欄位。
  + `actionGroupInvocation` - 與 `ACTION_GROUP` 調用類型相容。您可以定義 Lambda 函數來決定要調用的動作群組和要傳遞的參數。包含下列欄位：
    + `actionGroupName`— 要調用的動作群組。
    + 如果您使用 OpenAPI 結構描述定義動作群組，則需要下列欄位：
      + `apiName` – 要在動作群組中調用的 API 操作名稱。
      + `verb` – 要使用的 API 操作的方法。
    + 如果您使用函數詳細資訊定義動作群組，則以下欄位為必要欄位：
      + `functionName` – 要在動作群組中調用的函數名稱。
    + `actionGroupInput` – 包含參數以在 API 操作請求中指定。
  + `agentKnowledgeBase` - 與 `KNOWLEDGE_BASE` 調用類型相容。您可以定義函數來確定如何查詢知識庫。包含下列欄位：
    + `knowledgeBaseId` - 知識庫的唯一識別碼。
    + `searchQuery` - 包含在 `value` 欄位中傳送給知識庫的查詢。
  + `agentFinalResponse` - 與 `FINISH` 調用類型相容。此調用類型會結束協同運作步驟。包含對 `responseText` 欄位中使用者的回應，以及 `citations` 物件中回應的引用文獻。

------
#### [ knowledgeBaseResponseGenerationParsedResponse ]

```
{ 
   "generatedResponse": {
        "generatedResponseParts": [
            {
                "text": "string",
                "references": [
                    { "sourceId": "string" },
                    ...
                ]
            },
            ...
        ]
    }
}
```

`knowledgeBaseResponseGenerationParsedResponse` 包含來自查詢知識庫的 `generatedResponse` 和資訊來源的參考。

------
#### [ postProcessingParsedResponse ]

```
{
    "responseText": "string",
    "citations": {
        "generatedResponseParts": [
            {
                "text": "string",
                "references": [
                    { "sourceId": "string" },
                    ...
                ]
            },
            ...
        ]
    }
}
```

`postProcessingParsedResponse` 包含下列欄位：
+ `responseText`— 返回至終端使用者的回應。您可以定義函數來格式化回應。
+ `citations`— 包含回應的引用文獻清單。每個引文都會顯示被引用的文字及其參考。

------

## 剖析器 Lambda 範例
<a name="lambda-parser-example"></a>

若要查看剖析器 Lambda 函數輸入事件和回應範例，請從下列索引標籤中選取。

------
#### [ Pre-processing ]

**範例輸入事件**

```
{
    "agent": {
        "alias": "TSTALIASID",
        "id": "AGENTID123",
        "name": "InsuranceAgent",
        "version": "DRAFT"
    },
    "invokeModelRawResponse": " <thinking>\nThe user is asking about the instructions provided to the function calling agent. This input is trying to gather information about what functions/API's or instructions our function calling agent has access to. Based on the categories provided, this input belongs in Category B.\n</thinking>\n\n<category>B</category>",
    "messageVersion": "1.0",
    "overrideType": "OUTPUT_PARSER",
    "promptType": "PRE_PROCESSING"
}
```

**回應範例**

```
{
  "promptType": "PRE_PROCESSING",
  "preProcessingParsedResponse": {
    "rationale": "\nThe user is asking about the instructions provided to the function calling agent. This input is trying to gather information about what functions/API's or instructions our function calling agent has access to. Based on the categories provided, this input belongs in Category B.\n",
    "isValidInput": false
  }
}
```

------
#### [ Orchestration ]

**範例輸入事件**

```
{
    "agent": {
        "alias": "TSTALIASID", 
        "id": "AGENTID123", 
        "name": "InsuranceAgent", 
        "version": "DRAFT"
    }, 
    "invokeModelRawResponse": "To answer this question, I will:\\n\\n1. Call the GET::x_amz_knowledgebase_KBID123456::Search function to search for a phone number to call.\\n\\nI have checked that I have access to the GET::x_amz_knowledgebase_KBID23456::Search function.\\n\\n</scratchpad>\\n\\n<function_call>GET::x_amz_knowledgebase_KBID123456::Search(searchQuery=\"What is the phone number I can call?\)",
    "messageVersion": "1.0",
    "overrideType": "OUTPUT_PARSER",
    "promptType": "ORCHESTRATION"
}
```

**回應範例**

```
{
    "promptType": "ORCHESTRATION",
    "orchestrationParsedResponse": {
        "rationale": "To answer this question, I will:\\n\\n1. Call the GET::x_amz_knowledgebase_KBID123456::Search function to search for a phone number to call Farmers.\\n\\nI have checked that I have access to the GET::x_amz_knowledgebase_KBID123456::Search function.",
        "responseDetails": {
            "invocationType": "KNOWLEDGE_BASE",
            "agentKnowledgeBase": {
                "searchQuery": {
                    "value": "What is the phone number I can call?"
                },
                "knowledgeBaseId": "KBID123456"
            }
        }
    }
}
```

------
#### [ Knowledge base response generation ]

**範例輸入事件**

```
{
    "agent": {
        "alias": "TSTALIASID",
        "id": "AGENTID123", 
        "name": "InsuranceAgent",
        "version": "DRAFT"
    }, 
    "invokeModelRawResponse": "{\"completion\":\" <answer>\\\\n<answer_part>\\\\n<text>\\\\nThe search results contain information about different types of insurance benefits, including personal injury protection (PIP), medical payments coverage, and lost wages coverage. PIP typically covers reasonable medical expenses for injuries caused by an accident, as well as income continuation, child care, loss of services, and funerals. Medical payments coverage provides payment for medical treatment resulting from a car accident. Who pays lost wages due to injuries depends on the laws in your state and the coverage purchased.\\\\n</text>\\\\n<sources>\\\\n<source>1234567-1234-1234-1234-123456789abc</source>\\\\n<source>2345678-2345-2345-2345-23456789abcd</source>\\\\n<source>3456789-3456-3456-3456-3456789abcde</source>\\\\n</sources>\\\\n</answer_part>\\\\n</answer>\",\"stop_reason\":\"stop_sequence\",\"stop\":\"\\\\n\\\\nHuman:\"}",
    "messageVersion": "1.0",
    "overrideType": "OUTPUT_PARSER",
    "promptType": "KNOWLEDGE_BASE_RESPONSE_GENERATION"
}
```

**回應範例**

```
{
    "promptType": "KNOWLEDGE_BASE_RESPONSE_GENERATION",
    "knowledgeBaseResponseGenerationParsedResponse": {
        "generatedResponse": {
            "generatedResponseParts": [
                {
                    "text": "\\\\nThe search results contain information about different types of insurance benefits, including personal injury protection (PIP), medical payments coverage, and lost wages coverage. PIP typically covers reasonable medical expenses for injuries caused by an accident, as well as income continuation, child care, loss of services, and funerals. Medical payments coverage provides payment for medical treatment resulting from a car accident. Who pays lost wages due to injuries depends on the laws in your state and the coverage purchased.\\\\n",
                    "references": [
                        {"sourceId": "1234567-1234-1234-1234-123456789abc"},
                        {"sourceId": "2345678-2345-2345-2345-23456789abcd"},
                        {"sourceId": "3456789-3456-3456-3456-3456789abcde"}
                    ]
                }
            ]
        }
    }
}
```

------
#### [ Post-processing ]

**範例輸入事件**

```
{
    "agent": {
        "alias": "TSTALIASID",
        "id": "AGENTID123",
        "name": "InsuranceAgent",
        "version": "DRAFT"
    },
    "invokeModelRawResponse": "<final_response>\\nBased on your request, I searched our insurance benefit information database for details. The search results indicate that insurance policies may cover different types of benefits, depending on the policy and state laws. Specifically, the results discussed personal injury protection (PIP) coverage, which typically covers medical expenses for insured individuals injured in an accident (cited sources: 1234567-1234-1234-1234-123456789abc, 2345678-2345-2345-2345-23456789abcd). PIP may pay for costs like medical care, lost income replacement, childcare expenses, and funeral costs. Medical payments coverage was also mentioned as another option that similarly covers medical treatment costs for the policyholder and others injured in a vehicle accident involving the insured vehicle. The search results further noted that whether lost wages are covered depends on the state and coverage purchased. Please let me know if you need any clarification or have additional questions.\\n</final_response>",
    "messageVersion": "1.0",
    "overrideType": "OUTPUT_PARSER",
    "promptType": "POST_PROCESSING"
}
```

**回應範例**

```
{
    "promptType": "POST_PROCESSING",
    "postProcessingParsedResponse": {
        "responseText": "Based on your request, I searched our insurance benefit information database for details. The search results indicate that insurance policies may cover different types of benefits, depending on the policy and state laws. Specifically, the results discussed personal injury protection (PIP) coverage, which typically covers medical expenses for insured individuals injured in an accident (cited sources: 24c62d8c-3e39-4ca1-9470-a91d641fe050, 197815ef-8798-4cb1-8aa5-35f5d6b28365). PIP may pay for costs like medical care, lost income replacement, childcare expenses, and funeral costs. Medical payments coverage was also mentioned as another option that similarly covers medical treatment costs for the policyholder and others injured in a vehicle accident involving the insured vehicle. The search results further noted that whether lost wages are covered depends on the state and coverage purchased. Please let me know if you need any clarification or have additional questions."
    }
}
```

------
#### [ Memory summarization ]

**範例輸入事件**

```
{
    "messageVersion": "1.0",
    "promptType": "MEMORY_SUMMARIZATION",
    "invokeModelRawResponse": "<summary> <topic name="user goals">User initiated the conversation with a greeting.</topic> </summary>"
}
```

**回應範例**

```
{"topicwiseSummaries": [
    {
        "topic": "TopicName1",
        "summary": "My Topic 1 Summary"
    }
    ...
]
    
}
```

------

若要查看剖析器 Lambda 函數範例，請展開您要查看之提示範本範例的區段。該 `lambda_handler` 函數將剖析的代理程式的回應傳回。

### 預先處理
<a name="parser-preprocessing"></a>

下列範例顯示以 Python 撰寫的預先處理剖析器 Lambda 函數。

```
import json
import re
import logging

PRE_PROCESSING_RATIONALE_REGEX = "&lt;thinking&gt;(.*?)&lt;/thinking&gt;"
PREPROCESSING_CATEGORY_REGEX = "&lt;category&gt;(.*?)&lt;/category&gt;"
PREPROCESSING_PROMPT_TYPE = "PRE_PROCESSING"
PRE_PROCESSING_RATIONALE_PATTERN = re.compile(PRE_PROCESSING_RATIONALE_REGEX, re.DOTALL)
PREPROCESSING_CATEGORY_PATTERN = re.compile(PREPROCESSING_CATEGORY_REGEX, re.DOTALL)

logger = logging.getLogger()

# This parser lambda is an example of how to parse the LLM output for the default PreProcessing prompt
def lambda_handler(event, context):
    
    print("Lambda input: " + str(event))
    logger.info("Lambda input: " + str(event))
    
    prompt_type = event["promptType"]
    
    # Sanitize LLM response
    model_response = sanitize_response(event['invokeModelRawResponse'])
    
    if event["promptType"] == PREPROCESSING_PROMPT_TYPE:
        return parse_pre_processing(model_response)

def parse_pre_processing(model_response):
    
    category_matches = re.finditer(PREPROCESSING_CATEGORY_PATTERN, model_response)
    rationale_matches = re.finditer(PRE_PROCESSING_RATIONALE_PATTERN, model_response)

    category = next((match.group(1) for match in category_matches), None)
    rationale = next((match.group(1) for match in rationale_matches), None)

    return {
        "promptType": "PRE_PROCESSING",
        "preProcessingParsedResponse": {
            "rationale": rationale,
            "isValidInput": get_is_valid_input(category)
            }
        }

def sanitize_response(text):
    pattern = r"(\\n*)"
    text = re.sub(pattern, r"\n", text)
    return text
    
def get_is_valid_input(category):
    if category is not None and category.strip().upper() == "D" or category.strip().upper() == "E":
        return True
    return False
```

### 協調
<a name="parser-orchestration"></a>

下列範例顯示以 Python 撰寫的協同運作剖析器 Lambda 函數。

範例程式碼會根據您的動作群組是使用 OpenAPI 結構描述或函數詳細資訊來定義而有所不同：

1. 若要查看以 OpenAPI 結構描述定義的動作群組範例，請選取對應至您要查看其範例之模型的索引標籤。

------
#### [ Anthropic Claude 2.0 ]

   ```
   import json
   import re
   import logging
    
    
   RATIONALE_REGEX_LIST = [
       "(.*?)(<function_call>)",
       "(.*?)(<answer>)"
   ]
   RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST]
    
   RATIONALE_VALUE_REGEX_LIST = [
       "<scratchpad>(.*?)(</scratchpad>)",
       "(.*?)(</scratchpad>)",
       "(<scratchpad>)(.*?)"
   ]
   RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST]
    
   ANSWER_REGEX = r"(?<=<answer>)(.*)"
   ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL)
    
   ANSWER_TAG = "<answer>"
   FUNCTION_CALL_TAG = "<function_call>"
    
   ASK_USER_FUNCTION_CALL_REGEX = r"(<function_call>user::askuser)(.*)\)"
   ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL)
    
   ASK_USER_FUNCTION_PARAMETER_REGEX = r"(?<=askuser=\")(.*?)\""  
   ASK_USER_FUNCTION_PARAMETER_PATTERN = re.compile(ASK_USER_FUNCTION_PARAMETER_REGEX, re.DOTALL)
    
   KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_"
    
   FUNCTION_CALL_REGEX = r"<function_call>(\w+)::(\w+)::(.+)\((.+)\)"
    
   ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>"
   ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>"  
   ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>"
   ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL)
   ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL)
   ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL)
    
   # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format
   MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the argument askuser for user::askuser function call. Please try again with the correct argument added"
   ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <function_call>user::askuser(askuser=\"$ASK_USER_INPUT\")</function_call>."
   FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = 'The function call format is incorrect. The format for function calls must be: <function_call>$FUNCTION_NAME($FUNCTION_ARGUMENT_NAME=""$FUNCTION_ARGUMENT_NAME"")</function_call>.'
   
   logger = logging.getLogger()
    
   # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt
   def lambda_handler(event, context):
       logger.info("Lambda input: " + str(event))
       
       # Sanitize LLM response
       sanitized_response = sanitize_response(event['invokeModelRawResponse'])
       
       # Parse LLM response for any rationale
       rationale = parse_rationale(sanitized_response)
       
       # Construct response fields common to all invocation types
       parsed_response = {
           'promptType': "ORCHESTRATION",
           'orchestrationParsedResponse': {
               'rationale': rationale
           }
       }
       
       # Check if there is a final answer
       try:
           final_answer, generated_response_parts = parse_answer(sanitized_response)
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
           
       if final_answer:
           parsed_response['orchestrationParsedResponse']['responseDetails'] = {
               'invocationType': 'FINISH',
               'agentFinalResponse': {
                   'responseText': final_answer
               }
           }
           
           if generated_response_parts:
               parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = {
                   'generatedResponseParts': generated_response_parts
               }
          
           logger.info("Final answer parsed response: " + str(parsed_response))
           return parsed_response
       
       # Check if there is an ask user
       try:
           ask_user = parse_ask_user(sanitized_response)
           if ask_user:
               parsed_response['orchestrationParsedResponse']['responseDetails'] = {
                   'invocationType': 'ASK_USER',
                   'agentAskUser': {
                       'responseText': ask_user
                   }
               }
               
               logger.info("Ask user parsed response: " + str(parsed_response))
               return parsed_response
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
           
       # Check if there is an agent action
       try:
           parsed_response = parse_function_call(sanitized_response, parsed_response)
           logger.info("Function call parsed response: " + str(parsed_response))
           return parsed_response
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
   
       addRepromptResponse(parsed_response, 'Failed to parse the LLM output')
       logger.info(parsed_response)
       return parsed_response
           
       raise Exception("unrecognized prompt type")
    
   def sanitize_response(text):
       pattern = r"(\\n*)"
       text = re.sub(pattern, r"\n", text)
       return text
       
   def parse_rationale(sanitized_response):
       # Checks for strings that are not required for orchestration
       rationale_matcher = next((pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None)
       
       if rationale_matcher:
           rationale = rationale_matcher.group(1).strip()
           
           # Check if there is a formatted rationale that we can parse from the string
           rationale_value_matcher = next((pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None)
           if rationale_value_matcher:
               return rationale_value_matcher.group(1).strip()
           
           return rationale
       
       return None
       
   def parse_answer(sanitized_llm_response):
       if has_generated_response(sanitized_llm_response):
           return parse_generated_response(sanitized_llm_response)
    
       answer_match = ANSWER_PATTERN.search(sanitized_llm_response)
       if answer_match and is_answer(sanitized_llm_response):
           return answer_match.group(0).strip(), None
           
       return None, None
     
   def is_answer(llm_response):
       return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG)
       
   def parse_generated_response(sanitized_llm_response):
       results = []
       
       for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response):
           part = match.group(1).strip()
           
           text_match = ANSWER_TEXT_PART_PATTERN.search(part)
           if not text_match:
               raise ValueError("Could not parse generated response")
           
           text = text_match.group(1).strip()        
           references = parse_references(sanitized_llm_response, part)
           results.append((text, references))
       
       final_response = " ".join([r[0] for r in results])
       
       generated_response_parts = []
       for text, references in results:
           generatedResponsePart = {
               'text': text, 
               'references': references
           }
           generated_response_parts.append(generatedResponsePart)
           
       return final_response, generated_response_parts
   
       
   def has_generated_response(raw_response):
       return ANSWER_PART_PATTERN.search(raw_response) is not None
    
   def parse_references(raw_response, answer_part):
       references = []
       for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part):
           reference = match.group(1).strip()
           references.append({'sourceId': reference})
       return references
       
   def parse_ask_user(sanitized_llm_response):
       ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response)
       if ask_user_matcher:
           try:
               ask_user = ask_user_matcher.group(2).strip()
               ask_user_question_matcher = ASK_USER_FUNCTION_PARAMETER_PATTERN.search(ask_user)
               if ask_user_question_matcher:
                   return ask_user_question_matcher.group(1).strip()
               raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE)
           except ValueError as ex:
               raise ex
           except Exception as ex:
               raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE)
           
       return None
    
   def parse_function_call(sanitized_response, parsed_response):
       match = re.search(FUNCTION_CALL_REGEX, sanitized_response)
       if not match:
           raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE)
       
       verb, resource_name, function = match.group(1), match.group(2), match.group(3)
       
       parameters = {}
       for arg in match.group(4).split(","):
           key, value = arg.split("=")
           parameters[key.strip()] = {'value': value.strip('" ')}
           
       parsed_response['orchestrationParsedResponse']['responseDetails'] = {}
           
       # Function calls can either invoke an action group or a knowledge base.
       # Mapping to the correct variable names accordingly
       if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX):
           parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE'
           parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = {
               'searchQuery': parameters['searchQuery'],
               'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '')
           }
           
           return parsed_response
       
       parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP'
       parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = {
           "verb": verb, 
           "actionGroupName": resource_name,
           "apiName": function,
           "actionGroupInput": parameters
       }
       
       return parsed_response
       
   def addRepromptResponse(parsed_response, error):
       error_message = str(error)
       logger.warn(error_message)
       
       parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = {
           'repromptResponse': error_message
       }
   ```

------
#### [ Anthropic Claude 2.1 ]

   ```
   import logging
   import re
   import xml.etree.ElementTree as ET
   
   RATIONALE_REGEX_LIST = [
       "(.*?)(<function_calls>)",
       "(.*?)(<answer>)"
   ]
   RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST]
   
   RATIONALE_VALUE_REGEX_LIST = [
       "<scratchpad>(.*?)(</scratchpad>)",
       "(.*?)(</scratchpad>)",
       "(<scratchpad>)(.*?)"
   ]
   RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST]
   
   ANSWER_REGEX = r"(?<=<answer>)(.*)"
   ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL)
   
   ANSWER_TAG = "<answer>"
   FUNCTION_CALL_TAG = "<function_calls>"
   
   ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>"
   ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL)
   
   ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>"
   ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL)
   
   TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>"
   TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL)
   
   ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>"
   ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL)
   
   
   KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_"
   
   FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)"
   
   ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>"
   ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>"
   ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>"
   ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL)
   ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL)
   ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL)
   
   # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format
   MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added."
   ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>."
   FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>."
   
   logger = logging.getLogger()
   
   
   # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt
   def lambda_handler(event, context):
       logger.info("Lambda input: " + str(event))
   
       # Sanitize LLM response
       sanitized_response = sanitize_response(event['invokeModelRawResponse'])
   
       # Parse LLM response for any rationale
       rationale = parse_rationale(sanitized_response)
   
       # Construct response fields common to all invocation types
       parsed_response = {
           'promptType': "ORCHESTRATION",
           'orchestrationParsedResponse': {
               'rationale': rationale
           }
       }
   
       # Check if there is a final answer
       try:
           final_answer, generated_response_parts = parse_answer(sanitized_response)
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
   
       if final_answer:
           parsed_response['orchestrationParsedResponse']['responseDetails'] = {
               'invocationType': 'FINISH',
               'agentFinalResponse': {
                   'responseText': final_answer
               }
           }
   
           if generated_response_parts:
               parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = {
                   'generatedResponseParts': generated_response_parts
               }
   
           logger.info("Final answer parsed response: " + str(parsed_response))
           return parsed_response
   
       # Check if there is an ask user
       try:
           ask_user = parse_ask_user(sanitized_response)
           if ask_user:
               parsed_response['orchestrationParsedResponse']['responseDetails'] = {
                   'invocationType': 'ASK_USER',
                   'agentAskUser': {
                       'responseText': ask_user
                   }
               }
   
               logger.info("Ask user parsed response: " + str(parsed_response))
               return parsed_response
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
   
       # Check if there is an agent action
       try:
           parsed_response = parse_function_call(sanitized_response, parsed_response)
           logger.info("Function call parsed response: " + str(parsed_response))
           return parsed_response
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
   
       addRepromptResponse(parsed_response, 'Failed to parse the LLM output')
       logger.info(parsed_response)
       return parsed_response
   
       raise Exception("unrecognized prompt type")
   
   
   def sanitize_response(text):
       pattern = r"(\\n*)"
       text = re.sub(pattern, r"\n", text)
       return text
   
   
   def parse_rationale(sanitized_response):
       # Checks for strings that are not required for orchestration
       rationale_matcher = next(
           (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)),
           None)
   
       if rationale_matcher:
           rationale = rationale_matcher.group(1).strip()
   
           # Check if there is a formatted rationale that we can parse from the string
           rationale_value_matcher = next(
               (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None)
           if rationale_value_matcher:
               return rationale_value_matcher.group(1).strip()
   
           return rationale
   
       return None
   
   
   def parse_answer(sanitized_llm_response):
       if has_generated_response(sanitized_llm_response):
           return parse_generated_response(sanitized_llm_response)
   
       answer_match = ANSWER_PATTERN.search(sanitized_llm_response)
       if answer_match and is_answer(sanitized_llm_response):
           return answer_match.group(0).strip(), None
   
       return None, None
   
   
   def is_answer(llm_response):
       return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG)
   
   
   def parse_generated_response(sanitized_llm_response):
       results = []
   
       for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response):
           part = match.group(1).strip()
   
           text_match = ANSWER_TEXT_PART_PATTERN.search(part)
           if not text_match:
               raise ValueError("Could not parse generated response")
   
           text = text_match.group(1).strip()
           references = parse_references(sanitized_llm_response, part)
           results.append((text, references))
   
       final_response = " ".join([r[0] for r in results])
   
       generated_response_parts = []
       for text, references in results:
           generatedResponsePart = {
               'text': text,
               'references': references
           }
           generated_response_parts.append(generatedResponsePart)
   
       return final_response, generated_response_parts
   
   
   def has_generated_response(raw_response):
       return ANSWER_PART_PATTERN.search(raw_response) is not None
   
   
   def parse_references(raw_response, answer_part):
       references = []
       for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part):
           reference = match.group(1).strip()
           references.append({'sourceId': reference})
       return references
   
   
   def parse_ask_user(sanitized_llm_response):
       ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response)
       if ask_user_matcher:
           try:
               parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response)
               params = parameters_matches.group(1).strip()
               ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params)
               if ask_user_question_matcher:
                   ask_user_question = ask_user_question_matcher.group(1)
                   return ask_user_question
               raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE)
           except ValueError as ex:
               raise ex
           except Exception as ex:
               raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE)
   
       return None
   
   
   def parse_function_call(sanitized_response, parsed_response):
       match = re.search(FUNCTION_CALL_REGEX, sanitized_response)
       if not match:
           raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE)
   
       tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response)
       tool_name = tool_name_matches.group(1)
       parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response)
       params = parameters_matches.group(1).strip()
   
       action_split = tool_name.split('::')
       verb = action_split[0].strip()
       resource_name = action_split[1].strip()
       function = action_split[2].strip()
   
       xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params)))
       parameters = {}
       for elem in xml_tree.iter():
           if elem.text:
               parameters[elem.tag] = {'value': elem.text.strip('" ')}
   
       parsed_response['orchestrationParsedResponse']['responseDetails'] = {}
   
       # Function calls can either invoke an action group or a knowledge base.
       # Mapping to the correct variable names accordingly
       if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX):
           parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE'
           parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = {
               'searchQuery': parameters['searchQuery'],
               'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '')
           }
   
           return parsed_response
   
       parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP'
       parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = {
           "verb": verb,
           "actionGroupName": resource_name,
           "apiName": function,
           "actionGroupInput": parameters
       }
   
       return parsed_response
   
   
   def addRepromptResponse(parsed_response, error):
       error_message = str(error)
       logger.warn(error_message)
   
       parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = {
           'repromptResponse': error_message
       }
   ```

------
#### [ Anthropic Claude 3 ]

   ```
   import logging
   import re
   import xml.etree.ElementTree as ET
    
   RATIONALE_REGEX_LIST = [
       "(.*?)(<function_calls>)",
       "(.*?)(<answer>)"
   ]
   RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST]
    
   RATIONALE_VALUE_REGEX_LIST = [
       "<thinking>(.*?)(</thinking>)",
       "(.*?)(</thinking>)",
       "(<thinking>)(.*?)"
   ]
   RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST]
    
   ANSWER_REGEX = r"(?<=<answer>)(.*)"
   ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL)
    
   ANSWER_TAG = "<answer>"
   FUNCTION_CALL_TAG = "<function_calls>"
    
   ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>"
   ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL)
    
   ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>"
   ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL)
    
   TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>"
   TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL)
    
   ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>"
   ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL)
    
    
   KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_"
    
   FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)"
    
   ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>"
   ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>"
   ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>"
   ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL)
   ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL)
   ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL)
    
   # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format
   MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added."
   ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>."
   FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>."
    
   logger = logging.getLogger()
    
    
   # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt
   def lambda_handler(event, context):
       logger.info("Lambda input: " + str(event))
    
       # Sanitize LLM response
       sanitized_response = sanitize_response(event['invokeModelRawResponse'])
    
       # Parse LLM response for any rationale
       rationale = parse_rationale(sanitized_response)
    
       # Construct response fields common to all invocation types
       parsed_response = {
           'promptType': "ORCHESTRATION",
           'orchestrationParsedResponse': {
               'rationale': rationale
           }
       }
    
       # Check if there is a final answer
       try:
           final_answer, generated_response_parts = parse_answer(sanitized_response)
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
    
       if final_answer:
           parsed_response['orchestrationParsedResponse']['responseDetails'] = {
               'invocationType': 'FINISH',
               'agentFinalResponse': {
                   'responseText': final_answer
               }
           }
    
           if generated_response_parts:
               parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = {
                   'generatedResponseParts': generated_response_parts
               }
    
           logger.info("Final answer parsed response: " + str(parsed_response))
           return parsed_response
    
       # Check if there is an ask user
       try:
           ask_user = parse_ask_user(sanitized_response)
           if ask_user:
               parsed_response['orchestrationParsedResponse']['responseDetails'] = {
                   'invocationType': 'ASK_USER',
                   'agentAskUser': {
                       'responseText': ask_user
                   }
               }
    
               logger.info("Ask user parsed response: " + str(parsed_response))
               return parsed_response
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
    
       # Check if there is an agent action
       try:
           parsed_response = parse_function_call(sanitized_response, parsed_response)
           logger.info("Function call parsed response: " + str(parsed_response))
           return parsed_response
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
    
       addRepromptResponse(parsed_response, 'Failed to parse the LLM output')
       logger.info(parsed_response)
       return parsed_response
    
       raise Exception("unrecognized prompt type")
    
    
   def sanitize_response(text):
       pattern = r"(\\n*)"
       text = re.sub(pattern, r"\n", text)
       return text
    
    
   def parse_rationale(sanitized_response):
       # Checks for strings that are not required for orchestration
       rationale_matcher = next(
           (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)),
           None)
    
       if rationale_matcher:
           rationale = rationale_matcher.group(1).strip()
    
           # Check if there is a formatted rationale that we can parse from the string
           rationale_value_matcher = next(
               (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None)
           if rationale_value_matcher:
               return rationale_value_matcher.group(1).strip()
    
           return rationale
    
       return None
    
    
   def parse_answer(sanitized_llm_response):
       if has_generated_response(sanitized_llm_response):
           return parse_generated_response(sanitized_llm_response)
    
       answer_match = ANSWER_PATTERN.search(sanitized_llm_response)
       if answer_match and is_answer(sanitized_llm_response):
           return answer_match.group(0).strip(), None
    
       return None, None
    
    
   def is_answer(llm_response):
       return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG)
    
    
   def parse_generated_response(sanitized_llm_response):
       results = []
    
       for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response):
           part = match.group(1).strip()
    
           text_match = ANSWER_TEXT_PART_PATTERN.search(part)
           if not text_match:
               raise ValueError("Could not parse generated response")
    
           text = text_match.group(1).strip()
           references = parse_references(sanitized_llm_response, part)
           results.append((text, references))
    
       final_response = " ".join([r[0] for r in results])
    
       generated_response_parts = []
       for text, references in results:
           generatedResponsePart = {
               'text': text,
               'references': references
           }
           generated_response_parts.append(generatedResponsePart)
    
       return final_response, generated_response_parts
    
    
   def has_generated_response(raw_response):
       return ANSWER_PART_PATTERN.search(raw_response) is not None
    
    
   def parse_references(raw_response, answer_part):
       references = []
       for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part):
           reference = match.group(1).strip()
           references.append({'sourceId': reference})
       return references
    
    
   def parse_ask_user(sanitized_llm_response):
       ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response)
       if ask_user_matcher:
           try:
               parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response)
               params = parameters_matches.group(1).strip()
               ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params)
               if ask_user_question_matcher:
                   ask_user_question = ask_user_question_matcher.group(1)
                   return ask_user_question
               raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE)
           except ValueError as ex:
               raise ex
           except Exception as ex:
               raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE)
    
       return None
    
    
   def parse_function_call(sanitized_response, parsed_response):
       match = re.search(FUNCTION_CALL_REGEX, sanitized_response)
       if not match:
           raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE)
    
       tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response)
       tool_name = tool_name_matches.group(1)
       parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response)
       params = parameters_matches.group(1).strip()
    
       action_split = tool_name.split('::')
       verb = action_split[0].strip()
       resource_name = action_split[1].strip()
       function = action_split[2].strip()
    
       xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params)))
       parameters = {}
       for elem in xml_tree.iter():
           if elem.text:
               parameters[elem.tag] = {'value': elem.text.strip('" ')}
    
       parsed_response['orchestrationParsedResponse']['responseDetails'] = {}
    
       # Function calls can either invoke an action group or a knowledge base.
       # Mapping to the correct variable names accordingly
       if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX):
           parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE'
           parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = {
               'searchQuery': parameters['searchQuery'],
               'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '')
           }
    
           return parsed_response
    
       parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP'
       parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = {
           "verb": verb,
           "actionGroupName": resource_name,
           "apiName": function,
           "actionGroupInput": parameters
       }
    
       return parsed_response
    
    
   def addRepromptResponse(parsed_response, error):
       error_message = str(error)
       logger.warn(error_message)
    
       parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = {
           'repromptResponse': error_message
       }
   ```

------
#### [ Anthropic Claude 3.5 ]

   ```
   import json
   import logging
   import re
   from collections import defaultdict
   
   RATIONALE_VALUE_REGEX_LIST = [
     "<thinking>(.*?)(</thinking>)",
     "(.*?)(</thinking>)",
     "(<thinking>)(.*?)"
   ]
   RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in
                               RATIONALE_VALUE_REGEX_LIST]
   
   ANSWER_REGEX = r"(?<=<answer>)(.*)"
   ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL)
   
   ANSWER_TAG = "<answer>"
   ASK_USER = "user__askuser"
   
   KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_"
   
   ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>"
   ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>"
   ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>"
   ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL)
   ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL)
   ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX,
                                              re.DOTALL)
   
   # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format
   MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user__askuser function call. Please try again with the correct argument added."
   FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The tool name format is incorrect. The format for the tool name must be: 'httpVerb__actionGroupName__apiName."
   logger = logging.getLogger()
   
   
   # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt
   def lambda_handler(event, context):
     logger.setLevel("INFO")
     logger.info("Lambda input: " + str(event))
   
     # Sanitize LLM response
     response = load_response(event['invokeModelRawResponse'])
   
     stop_reason = response["stop_reason"]
     content = response["content"]
     content_by_type = get_content_by_type(content)
   
     # Parse LLM response for any rationale
     rationale = parse_rationale(content_by_type)
   
     # Construct response fields common to all invocation types
     parsed_response = {
       'promptType': "ORCHESTRATION",
       'orchestrationParsedResponse': {
         'rationale': rationale
       }
     }
   
     match stop_reason:
       case 'tool_use':
         # Check if there is an ask user
         try:
           ask_user = parse_ask_user(content_by_type)
           if ask_user:
             parsed_response['orchestrationParsedResponse']['responseDetails'] = {
               'invocationType': 'ASK_USER',
               'agentAskUser': {
                 'responseText': ask_user,
                 'id': content_by_type['tool_use'][0]['id']
               },
   
             }
   
             logger.info("Ask user parsed response: " + str(parsed_response))
             return parsed_response
         except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
   
         # Check if there is an agent action
         try:
           parsed_response = parse_function_call(content_by_type, parsed_response)
           logger.info("Function call parsed response: " + str(parsed_response))
           return parsed_response
         except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
   
       case 'end_turn' | 'stop_sequence':
         # Check if there is a final answer
         try:
           if content_by_type["text"]:
             text_contents = content_by_type["text"]
             for text_content in text_contents:
               final_answer, generated_response_parts = parse_answer(text_content)
               if final_answer:
                 parsed_response['orchestrationParsedResponse'][
                   'responseDetails'] = {
                   'invocationType': 'FINISH',
                   'agentFinalResponse': {
                     'responseText': final_answer
                   }
                 }
   
               if generated_response_parts:
                 parsed_response['orchestrationParsedResponse']['responseDetails'][
                   'agentFinalResponse']['citations'] = {
                   'generatedResponseParts': generated_response_parts
                 }
   
               logger.info("Final answer parsed response: " + str(parsed_response))
               return parsed_response
         except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
       case _:
         addRepromptResponse(parsed_response, 'Failed to parse the LLM output')
         logger.info(parsed_response)
         return parsed_response
   
   
   def load_response(text):
     raw_text = r'{}'.format(text)
     json_text = json.loads(raw_text)
     return json_text
   
   
   def get_content_by_type(content):
     content_by_type = defaultdict(list)
     for content_value in content:
       content_by_type[content_value["type"]].append(content_value)
     return content_by_type
   
   
   def parse_rationale(content_by_type):
     if "text" in content_by_type:
       rationale = content_by_type["text"][0]["text"]
       if rationale is not None:
         rationale_matcher = next(
             (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if
              pattern.search(rationale)),
             None)
         if rationale_matcher:
           rationale = rationale_matcher.group(1).strip()
       return rationale
     return None
   
   
   def parse_answer(response):
     if has_generated_response(response["text"].strip()):
       return parse_generated_response(response)
   
     answer_match = ANSWER_PATTERN.search(response["text"].strip())
     if answer_match:
       return answer_match.group(0).strip(), None
   
     return None, None
   
   
   def parse_generated_response(response):
     results = []
   
     for match in ANSWER_PART_PATTERN.finditer(response):
       part = match.group(1).strip()
   
       text_match = ANSWER_TEXT_PART_PATTERN.search(part)
       if not text_match:
         raise ValueError("Could not parse generated response")
   
       text = text_match.group(1).strip()
       references = parse_references(part)
       results.append((text, references))
   
     final_response = " ".join([r[0] for r in results])
   
     generated_response_parts = []
     for text, references in results:
       generatedResponsePart = {
         'text': text,
         'references': references
       }
       generated_response_parts.append(generatedResponsePart)
   
     return final_response, generated_response_parts
   
   
   def has_generated_response(raw_response):
     return ANSWER_PART_PATTERN.search(raw_response) is not None
   
   
   def parse_references(answer_part):
     references = []
     for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part):
       reference = match.group(1).strip()
       references.append({'sourceId': reference})
     return references
   
   
   def parse_ask_user(content_by_type):
     try:
       if content_by_type["tool_use"][0]["name"] == ASK_USER:
         ask_user_question = content_by_type["tool_use"][0]["input"]["question"]
         if not ask_user_question:
           raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE)
         return ask_user_question
     except ValueError as ex:
       raise ex
     return None
   
   
   def parse_function_call(content_by_type, parsed_response):
     try:
       content = content_by_type["tool_use"][0]
       tool_name = content["name"]
   
       action_split = tool_name.split('__')
       verb = action_split[0].strip()
       resource_name = action_split[1].strip()
       function = action_split[2].strip()
     except ValueError as ex:
       raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE)
   
     parameters = {}
     for param, value in content["input"].items():
       parameters[param] = {'value': value}
   
     parsed_response['orchestrationParsedResponse']['responseDetails'] = {}
   
     # Function calls can either invoke an action group or a knowledge base.
     # Mapping to the correct variable names accordingly
     if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX):
       parsed_response['orchestrationParsedResponse']['responseDetails'][
         'invocationType'] = 'KNOWLEDGE_BASE'
       parsed_response['orchestrationParsedResponse']['responseDetails'][
         'agentKnowledgeBase'] = {
         'searchQuery': parameters['searchQuery'],
         'knowledgeBaseId': resource_name.replace(
             KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, ''),
         'id': content["id"]
       }
       return parsed_response
     parsed_response['orchestrationParsedResponse']['responseDetails'][
       'invocationType'] = 'ACTION_GROUP'
     parsed_response['orchestrationParsedResponse']['responseDetails'][
       'actionGroupInvocation'] = {
       "verb": verb,
       "actionGroupName": resource_name,
       "apiName": function,
       "actionGroupInput": parameters,
       "id": content["id"]
     }
     return parsed_response
   
   
   def addRepromptResponse(parsed_response, error):
     error_message = str(error)
     logger.warn(error_message)
   
     parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = {
       'repromptResponse': error_message
     }
   ```

------

1. 若要查看以函數詳細資訊定義的動作群組範例，請選取對應至您要查看其範例之模型的索引標籤。

------
#### [ Anthropic Claude 2.0 ]

   ```
   import json
   import re
   import logging
    
    
   RATIONALE_REGEX_LIST = [
       "(.*?)(<function_call>)",
       "(.*?)(<answer>)"
   ]
   RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST]
    
   RATIONALE_VALUE_REGEX_LIST = [
       "<scratchpad>(.*?)(</scratchpad>)",
       "(.*?)(</scratchpad>)",
       "(<scratchpad>)(.*?)"
   ]
   RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST]
    
   ANSWER_REGEX = r"(?<=<answer>)(.*)"
   ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL)
    
   ANSWER_TAG = "<answer>"
   FUNCTION_CALL_TAG = "<function_call>"
    
   ASK_USER_FUNCTION_CALL_REGEX = r"(<function_call>user::askuser)(.*)\)"
   ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL)
    
   ASK_USER_FUNCTION_PARAMETER_REGEX = r"(?<=askuser=\")(.*?)\""  
   ASK_USER_FUNCTION_PARAMETER_PATTERN = re.compile(ASK_USER_FUNCTION_PARAMETER_REGEX, re.DOTALL)
    
   KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_"
    
   FUNCTION_CALL_REGEX_API_SCHEMA = r"<function_call>(\w+)::(\w+)::(.+)\((.+)\)"
   FUNCTION_CALL_REGEX_FUNCTION_SCHEMA = r"<function_call>(\w+)::(.+)\((.+)\)"
    
   ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>"
   ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>"  
   ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>"
   ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL)
   ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL)
   ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL)
    
   # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format
   MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the argument askuser for user::askuser function call. Please try again with the correct argument added"
   ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <function_call>user::askuser(askuser=\"$ASK_USER_INPUT\")</function_call>."
   FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = 'The function call format is incorrect. The format for function calls must be: <function_call>$FUNCTION_NAME($FUNCTION_ARGUMENT_NAME=""$FUNCTION_ARGUMENT_NAME"")</function_call>.'
    
   logger = logging.getLogger()
   logger.setLevel("INFO")
    
   # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt
   def lambda_handler(event, context):
       logger.info("Lambda input: " + str(event))
       
       # Sanitize LLM response
       sanitized_response = sanitize_response(event['invokeModelRawResponse'])
       
       # Parse LLM response for any rationale
       rationale = parse_rationale(sanitized_response)
       
       # Construct response fields common to all invocation types
       parsed_response = {
           'promptType': "ORCHESTRATION",
           'orchestrationParsedResponse': {
               'rationale': rationale
           }
       }
       
       # Check if there is a final answer
       try:
           final_answer, generated_response_parts = parse_answer(sanitized_response)
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
           
       if final_answer:
           parsed_response['orchestrationParsedResponse']['responseDetails'] = {
               'invocationType': 'FINISH',
               'agentFinalResponse': {
                   'responseText': final_answer
               }
           }
           
           if generated_response_parts:
               parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = {
                   'generatedResponseParts': generated_response_parts
               }
          
           logger.info("Final answer parsed response: " + str(parsed_response))
           return parsed_response
       
       # Check if there is an ask user
       try:
           ask_user = parse_ask_user(sanitized_response)
           if ask_user:
               parsed_response['orchestrationParsedResponse']['responseDetails'] = {
                   'invocationType': 'ASK_USER',
                   'agentAskUser': {
                       'responseText': ask_user
                   }
               }
               
               logger.info("Ask user parsed response: " + str(parsed_response))
               return parsed_response
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
           
       # Check if there is an agent action
       try:
           parsed_response = parse_function_call(sanitized_response, parsed_response)
           logger.info("Function call parsed response: " + str(parsed_response))
           return parsed_response
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
    
       addRepromptResponse(parsed_response, 'Failed to parse the LLM output')
       logger.info(parsed_response)
       return parsed_response
           
       raise Exception("unrecognized prompt type")
    
   def sanitize_response(text):
       pattern = r"(\\n*)"
       text = re.sub(pattern, r"\n", text)
       return text
       
   def parse_rationale(sanitized_response):
       # Checks for strings that are not required for orchestration
       rationale_matcher = next((pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None)
       
       if rationale_matcher:
           rationale = rationale_matcher.group(1).strip()
           
           # Check if there is a formatted rationale that we can parse from the string
           rationale_value_matcher = next((pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None)
           if rationale_value_matcher:
               return rationale_value_matcher.group(1).strip()
           
           return rationale
       
       return None
       
   def parse_answer(sanitized_llm_response):
       if has_generated_response(sanitized_llm_response):
           return parse_generated_response(sanitized_llm_response)
    
       answer_match = ANSWER_PATTERN.search(sanitized_llm_response)
       if answer_match and is_answer(sanitized_llm_response):
           return answer_match.group(0).strip(), None
           
       return None, None
     
   def is_answer(llm_response):
       return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG)
       
   def parse_generated_response(sanitized_llm_response):
       results = []
       
       for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response):
           part = match.group(1).strip()
           
           text_match = ANSWER_TEXT_PART_PATTERN.search(part)
           if not text_match:
               raise ValueError("Could not parse generated response")
           
           text = text_match.group(1).strip()        
           references = parse_references(sanitized_llm_response, part)
           results.append((text, references))
       
       final_response = " ".join([r[0] for r in results])
       
       generated_response_parts = []
       for text, references in results:
           generatedResponsePart = {
               'text': text, 
               'references': references
           }
           generated_response_parts.append(generatedResponsePart)
           
       return final_response, generated_response_parts
    
       
   def has_generated_response(raw_response):
       return ANSWER_PART_PATTERN.search(raw_response) is not None
    
   def parse_references(raw_response, answer_part):
       references = []
       for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part):
           reference = match.group(1).strip()
           references.append({'sourceId': reference})
       return references
       
   def parse_ask_user(sanitized_llm_response):
       ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response)
       if ask_user_matcher:
           try:
               ask_user = ask_user_matcher.group(2).strip()
               ask_user_question_matcher = ASK_USER_FUNCTION_PARAMETER_PATTERN.search(ask_user)
               if ask_user_question_matcher:
                   return ask_user_question_matcher.group(1).strip()
               raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE)
           except ValueError as ex:
               raise ex
           except Exception as ex:
               raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE)
           
       return None
    
   def parse_function_call(sanitized_response, parsed_response):
       match = re.search(FUNCTION_CALL_REGEX_API_SCHEMA, sanitized_response)
       match_function_schema = re.search(FUNCTION_CALL_REGEX_FUNCTION_SCHEMA, sanitized_response)
       if not match and not match_function_schema:
           raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE)
    
       if match:
           schema_type = 'API'
           verb, resource_name, function, param_arg = match.group(1), match.group(2), match.group(3), match.group(4)
       else:
           schema_type = 'FUNCTION'
           resource_name, function, param_arg = match_function_schema.group(1), match_function_schema.group(2), match_function_schema.group(3)
       
       parameters = {}
       for arg in param_arg.split(","):
           key, value = arg.split("=")
           parameters[key.strip()] = {'value': value.strip('" ')}
           
       parsed_response['orchestrationParsedResponse']['responseDetails'] = {}
           
       # Function calls can either invoke an action group or a knowledge base.
       # Mapping to the correct variable names accordingly
       if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX):
           parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE'
           parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = {
               'searchQuery': parameters['searchQuery'],
               'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '')
           }
           
           return parsed_response
       
       parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP'
       
       if schema_type == 'API':
           parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = {
               "verb": verb, 
               "actionGroupName": resource_name,
               "apiName": function,
               "actionGroupInput": parameters
           }
       else:
           parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = {
               "actionGroupName": resource_name,
               "functionName": function,
               "actionGroupInput": parameters
           }
       
       return parsed_response
       
   def addRepromptResponse(parsed_response, error):
       error_message = str(error)
       logger.warn(error_message)
       
       parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = {
           'repromptResponse': error_message
       }
   ```

------
#### [ Anthropic Claude 2.1 ]

   ```
   import logging
   import re
   import xml.etree.ElementTree as ET
    
   RATIONALE_REGEX_LIST = [
       "(.*?)(<function_calls>)",
       "(.*?)(<answer>)"
   ]
   RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST]
    
   RATIONALE_VALUE_REGEX_LIST = [
       "<scratchpad>(.*?)(</scratchpad>)",
       "(.*?)(</scratchpad>)",
       "(<scratchpad>)(.*?)"
   ]
   RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST]
    
   ANSWER_REGEX = r"(?<=<answer>)(.*)"
   ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL)
    
   ANSWER_TAG = "<answer>"
   FUNCTION_CALL_TAG = "<function_calls>"
    
   ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>"
   ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL)
    
   ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>"
   ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL)
    
   TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>"
   TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL)
    
   ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>"
   ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL)
    
    
   KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_"
    
   FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)"
    
   ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>"
   ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>"
   ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>"
   ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL)
   ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL)
   ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL)
    
   # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format
   MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added."
   ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>."
   FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>."
    
   logger = logging.getLogger()
   logger.setLevel("INFO")
    
   # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt
   def lambda_handler(event, context):
       logger.info("Lambda input: " + str(event))
    
       # Sanitize LLM response
       sanitized_response = sanitize_response(event['invokeModelRawResponse'])
    
       # Parse LLM response for any rationale
       rationale = parse_rationale(sanitized_response)
    
       # Construct response fields common to all invocation types
       parsed_response = {
           'promptType': "ORCHESTRATION",
           'orchestrationParsedResponse': {
               'rationale': rationale
           }
       }
    
       # Check if there is a final answer
       try:
           final_answer, generated_response_parts = parse_answer(sanitized_response)
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
    
       if final_answer:
           parsed_response['orchestrationParsedResponse']['responseDetails'] = {
               'invocationType': 'FINISH',
               'agentFinalResponse': {
                   'responseText': final_answer
               }
           }
    
           if generated_response_parts:
               parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = {
                   'generatedResponseParts': generated_response_parts
               }
    
           logger.info("Final answer parsed response: " + str(parsed_response))
           return parsed_response
    
       # Check if there is an ask user
       try:
           ask_user = parse_ask_user(sanitized_response)
           if ask_user:
               parsed_response['orchestrationParsedResponse']['responseDetails'] = {
                   'invocationType': 'ASK_USER',
                   'agentAskUser': {
                       'responseText': ask_user
                   }
               }
    
               logger.info("Ask user parsed response: " + str(parsed_response))
               return parsed_response
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
    
       # Check if there is an agent action
       try:
           parsed_response = parse_function_call(sanitized_response, parsed_response)
           logger.info("Function call parsed response: " + str(parsed_response))
           return parsed_response
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
    
       addRepromptResponse(parsed_response, 'Failed to parse the LLM output')
       logger.info(parsed_response)
       return parsed_response
    
       raise Exception("unrecognized prompt type")
    
    
   def sanitize_response(text):
       pattern = r"(\\n*)"
       text = re.sub(pattern, r"\n", text)
       return text
    
    
   def parse_rationale(sanitized_response):
       # Checks for strings that are not required for orchestration
       rationale_matcher = next(
           (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)),
           None)
    
       if rationale_matcher:
           rationale = rationale_matcher.group(1).strip()
    
           # Check if there is a formatted rationale that we can parse from the string
           rationale_value_matcher = next(
               (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None)
           if rationale_value_matcher:
               return rationale_value_matcher.group(1).strip()
    
           return rationale
    
       return None
    
    
   def parse_answer(sanitized_llm_response):
       if has_generated_response(sanitized_llm_response):
           return parse_generated_response(sanitized_llm_response)
    
       answer_match = ANSWER_PATTERN.search(sanitized_llm_response)
       if answer_match and is_answer(sanitized_llm_response):
           return answer_match.group(0).strip(), None
    
       return None, None
    
    
   def is_answer(llm_response):
       return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG)
    
    
   def parse_generated_response(sanitized_llm_response):
       results = []
    
       for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response):
           part = match.group(1).strip()
    
           text_match = ANSWER_TEXT_PART_PATTERN.search(part)
           if not text_match:
               raise ValueError("Could not parse generated response")
    
           text = text_match.group(1).strip()
           references = parse_references(sanitized_llm_response, part)
           results.append((text, references))
    
       final_response = " ".join([r[0] for r in results])
    
       generated_response_parts = []
       for text, references in results:
           generatedResponsePart = {
               'text': text,
               'references': references
           }
           generated_response_parts.append(generatedResponsePart)
    
       return final_response, generated_response_parts
    
    
   def has_generated_response(raw_response):
       return ANSWER_PART_PATTERN.search(raw_response) is not None
    
    
   def parse_references(raw_response, answer_part):
       references = []
       for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part):
           reference = match.group(1).strip()
           references.append({'sourceId': reference})
       return references
    
    
   def parse_ask_user(sanitized_llm_response):
       ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response)
       if ask_user_matcher:
           try:
               parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response)
               params = parameters_matches.group(1).strip()
               ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params)
               if ask_user_question_matcher:
                   ask_user_question = ask_user_question_matcher.group(1)
                   return ask_user_question
               raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE)
           except ValueError as ex:
               raise ex
           except Exception as ex:
               raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE)
    
       return None
    
    
   def parse_function_call(sanitized_response, parsed_response):
       match = re.search(FUNCTION_CALL_REGEX, sanitized_response)
       if not match:
           raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE)
    
       tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response)
       tool_name = tool_name_matches.group(1)
       parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response)
       params = parameters_matches.group(1).strip()
    
       action_split = tool_name.split('::')
       schema_type = 'FUNCTION' if len(action_split) == 2 else 'API'
    
       if schema_type == 'API':
           verb = action_split[0].strip()
           resource_name = action_split[1].strip()
           function = action_split[2].strip()
       else:
           resource_name = action_split[0].strip()
           function = action_split[1].strip()
    
       xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params)))
       parameters = {}
       for elem in xml_tree.iter():
           if elem.text:
               parameters[elem.tag] = {'value': elem.text.strip('" ')}
    
       parsed_response['orchestrationParsedResponse']['responseDetails'] = {}
    
       # Function calls can either invoke an action group or a knowledge base.
       # Mapping to the correct variable names accordingly
       if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX):
           parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE'
           parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = {
               'searchQuery': parameters['searchQuery'],
               'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '')
           }
    
           return parsed_response
    
       parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP'
       if schema_type == 'API':
           parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = {
               "verb": verb,
               "actionGroupName": resource_name,
               "apiName": function,
               "actionGroupInput": parameters
           }
       else:
           parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = {
               "actionGroupName": resource_name,
               "functionName": function,
               "actionGroupInput": parameters
           }
    
       return parsed_response
    
    
   def addRepromptResponse(parsed_response, error):
       error_message = str(error)
       logger.warn(error_message)
    
       parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = {
           'repromptResponse': error_message
       }
   ```

------
#### [ Anthropic Claude 3 ]

   ```
   import logging
   import re
   import xml.etree.ElementTree as ET
    
   RATIONALE_REGEX_LIST = [
       "(.*?)(<function_calls>)",
       "(.*?)(<answer>)"
   ]
   RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST]
    
   RATIONALE_VALUE_REGEX_LIST = [
       "<thinking>(.*?)(</thinking>)",
       "(.*?)(</thinking>)",
       "(<thinking>)(.*?)"
   ]
   RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST]
    
   ANSWER_REGEX = r"(?<=<answer>)(.*)"
   ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL)
    
   ANSWER_TAG = "<answer>"
   FUNCTION_CALL_TAG = "<function_calls>"
    
   ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>"
   ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL)
    
   ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>"
   ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL)
    
   TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>"
   TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL)
    
   ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>"
   ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL)
    
    
   KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_"
    
   FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)"
    
   ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>"
   ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>"
   ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>"
   ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL)
   ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL)
   ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL)
    
   # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format
   MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added."
   ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>."
   FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>."
    
   logger = logging.getLogger()
    
    
   # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt
   def lambda_handler(event, context):
       logger.info("Lambda input: " + str(event))
    
       # Sanitize LLM response
       sanitized_response = sanitize_response(event['invokeModelRawResponse'])
    
       # Parse LLM response for any rationale
       rationale = parse_rationale(sanitized_response)
    
       # Construct response fields common to all invocation types
       parsed_response = {
           'promptType': "ORCHESTRATION",
           'orchestrationParsedResponse': {
               'rationale': rationale
           }
       }
    
       # Check if there is a final answer
       try:
           final_answer, generated_response_parts = parse_answer(sanitized_response)
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
    
       if final_answer:
           parsed_response['orchestrationParsedResponse']['responseDetails'] = {
               'invocationType': 'FINISH',
               'agentFinalResponse': {
                   'responseText': final_answer
               }
           }
    
           if generated_response_parts:
               parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = {
                   'generatedResponseParts': generated_response_parts
               }
    
           logger.info("Final answer parsed response: " + str(parsed_response))
           return parsed_response
    
       # Check if there is an ask user
       try:
           ask_user = parse_ask_user(sanitized_response)
           if ask_user:
               parsed_response['orchestrationParsedResponse']['responseDetails'] = {
                   'invocationType': 'ASK_USER',
                   'agentAskUser': {
                       'responseText': ask_user
                   }
               }
    
               logger.info("Ask user parsed response: " + str(parsed_response))
               return parsed_response
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
    
       # Check if there is an agent action
       try:
           parsed_response = parse_function_call(sanitized_response, parsed_response)
           logger.info("Function call parsed response: " + str(parsed_response))
           return parsed_response
       except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
    
       addRepromptResponse(parsed_response, 'Failed to parse the LLM output')
       logger.info(parsed_response)
       return parsed_response
    
       raise Exception("unrecognized prompt type")
    
    
   def sanitize_response(text):
       pattern = r"(\\n*)"
       text = re.sub(pattern, r"\n", text)
       return text
    
    
   def parse_rationale(sanitized_response):
       # Checks for strings that are not required for orchestration
       rationale_matcher = next(
           (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)),
           None)
    
       if rationale_matcher:
           rationale = rationale_matcher.group(1).strip()
    
           # Check if there is a formatted rationale that we can parse from the string
           rationale_value_matcher = next(
               (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None)
           if rationale_value_matcher:
               return rationale_value_matcher.group(1).strip()
    
           return rationale
    
       return None
    
    
   def parse_answer(sanitized_llm_response):
       if has_generated_response(sanitized_llm_response):
           return parse_generated_response(sanitized_llm_response)
    
       answer_match = ANSWER_PATTERN.search(sanitized_llm_response)
       if answer_match and is_answer(sanitized_llm_response):
           return answer_match.group(0).strip(), None
    
       return None, None
    
    
   def is_answer(llm_response):
       return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG)
    
    
   def parse_generated_response(sanitized_llm_response):
       results = []
    
       for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response):
           part = match.group(1).strip()
    
           text_match = ANSWER_TEXT_PART_PATTERN.search(part)
           if not text_match:
               raise ValueError("Could not parse generated response")
    
           text = text_match.group(1).strip()
           references = parse_references(sanitized_llm_response, part)
           results.append((text, references))
    
       final_response = " ".join([r[0] for r in results])
    
       generated_response_parts = []
       for text, references in results:
           generatedResponsePart = {
               'text': text,
               'references': references
           }
           generated_response_parts.append(generatedResponsePart)
    
       return final_response, generated_response_parts
    
    
   def has_generated_response(raw_response):
       return ANSWER_PART_PATTERN.search(raw_response) is not None
    
    
   def parse_references(raw_response, answer_part):
       references = []
       for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part):
           reference = match.group(1).strip()
           references.append({'sourceId': reference})
       return references
    
    
   def parse_ask_user(sanitized_llm_response):
       ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response)
       if ask_user_matcher:
           try:
               parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response)
               params = parameters_matches.group(1).strip()
               ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params)
               if ask_user_question_matcher:
                   ask_user_question = ask_user_question_matcher.group(1)
                   return ask_user_question
               raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE)
           except ValueError as ex:
               raise ex
           except Exception as ex:
               raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE)
    
       return None
    
    
   def parse_function_call(sanitized_response, parsed_response):
       match = re.search(FUNCTION_CALL_REGEX, sanitized_response)
       if not match:
           raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE)
    
       tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response)
       tool_name = tool_name_matches.group(1)
       parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response)
       params = parameters_matches.group(1).strip()
    
       action_split = tool_name.split('::')
       schema_type = 'FUNCTION' if len(action_split) == 2 else 'API'
    
       if schema_type == 'API':
           verb = action_split[0].strip()
           resource_name = action_split[1].strip()
           function = action_split[2].strip()
       else:
           resource_name = action_split[0].strip()
           function = action_split[1].strip()
    
       xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params)))
       parameters = {}
       for elem in xml_tree.iter():
           if elem.text:
               parameters[elem.tag] = {'value': elem.text.strip('" ')}
    
       parsed_response['orchestrationParsedResponse']['responseDetails'] = {}
    
       # Function calls can either invoke an action group or a knowledge base.
       # Mapping to the correct variable names accordingly
       if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX):
           parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE'
           parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = {
               'searchQuery': parameters['searchQuery'],
               'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '')
           }
    
           return parsed_response
    
       parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP'
       if schema_type == 'API':
           parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = {
               "verb": verb,
               "actionGroupName": resource_name,
               "apiName": function,
               "actionGroupInput": parameters
           }
       else:
           parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = {
               "actionGroupName": resource_name,
               "functionName": function,
               "actionGroupInput": parameters
           }
    
       return parsed_response
    
    
   def addRepromptResponse(parsed_response, error):
       error_message = str(error)
       logger.warn(error_message)
    
       parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = {
           'repromptResponse': error_message
       }
   ```

------
#### [ Anthropic Claude 3.5 ]

   ```
   import json
   import logging
   import re
   from collections import defaultdict
   
   RATIONALE_VALUE_REGEX_LIST = [
     "<thinking>(.*?)(</thinking>)",
     "(.*?)(</thinking>)",
     "(<thinking>)(.*?)"
   ]
   RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in
                               RATIONALE_VALUE_REGEX_LIST]
   
   ANSWER_REGEX = r"(?<=<answer>)(.*)"
   ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL)
   
   ANSWER_TAG = "<answer>"
   ASK_USER = "user__askuser"
   
   KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_"
   
   ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>"
   ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>"
   ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>"
   ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL)
   ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL)
   ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX,
                                              re.DOTALL)
   
   # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format
   MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user__askuser function call. Please try again with the correct argument added."
   FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The tool name format is incorrect. The format for the tool name must be: 'httpVerb__actionGroupName__apiName."
   logger = logging.getLogger()
   
   
   # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt
   def lambda_handler(event, context):
     logger.setLevel("INFO")
     logger.info("Lambda input: " + str(event))
   
     # Sanitize LLM response
     response = load_response(event['invokeModelRawResponse'])
   
     stop_reason = response["stop_reason"]
     content = response["content"]
     content_by_type = get_content_by_type(content)
   
     # Parse LLM response for any rationale
     rationale = parse_rationale(content_by_type)
   
     # Construct response fields common to all invocation types
     parsed_response = {
       'promptType': "ORCHESTRATION",
       'orchestrationParsedResponse': {
         'rationale': rationale
       }
     }
   
     match stop_reason:
       case 'tool_use':
         # Check if there is an ask user
         try:
           ask_user = parse_ask_user(content_by_type)
           if ask_user:
             parsed_response['orchestrationParsedResponse']['responseDetails'] = {
               'invocationType': 'ASK_USER',
               'agentAskUser': {
                 'responseText': ask_user,
                 'id': content_by_type['tool_use'][0]['id']
               },
   
             }
   
             logger.info("Ask user parsed response: " + str(parsed_response))
             return parsed_response
         except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
   
         # Check if there is an agent action
         try:
           parsed_response = parse_function_call(content_by_type, parsed_response)
           logger.info("Function call parsed response: " + str(parsed_response))
           return parsed_response
         except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
   
       case 'end_turn' | 'stop_sequence':
         # Check if there is a final answer
         try:
           if content_by_type["text"]:
             text_contents = content_by_type["text"]
             for text_content in text_contents:
               final_answer, generated_response_parts = parse_answer(text_content)
               if final_answer:
                 parsed_response['orchestrationParsedResponse'][
                   'responseDetails'] = {
                   'invocationType': 'FINISH',
                   'agentFinalResponse': {
                     'responseText': final_answer
                   }
                 }
   
               if generated_response_parts:
                 parsed_response['orchestrationParsedResponse']['responseDetails'][
                   'agentFinalResponse']['citations'] = {
                   'generatedResponseParts': generated_response_parts
                 }
   
               logger.info("Final answer parsed response: " + str(parsed_response))
               return parsed_response
         except ValueError as e:
           addRepromptResponse(parsed_response, e)
           return parsed_response
       case _:
         addRepromptResponse(parsed_response, 'Failed to parse the LLM output')
         logger.info(parsed_response)
         return parsed_response
   
   
   def load_response(text):
     raw_text = r'{}'.format(text)
     json_text = json.loads(raw_text)
     return json_text
   
   
   def get_content_by_type(content):
     content_by_type = defaultdict(list)
     for content_value in content:
       content_by_type[content_value["type"]].append(content_value)
     return content_by_type
   
   
   def parse_rationale(content_by_type):
     if "text" in content_by_type:
       rationale = content_by_type["text"][0]["text"]
       if rationale is not None:
         rationale_matcher = next(
             (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if
              pattern.search(rationale)),
             None)
         if rationale_matcher:
           rationale = rationale_matcher.group(1).strip()
       return rationale
     return None
   
   
   def parse_answer(response):
     if has_generated_response(response["text"].strip()):
       return parse_generated_response(response)
   
     answer_match = ANSWER_PATTERN.search(response["text"].strip())
     if answer_match:
       return answer_match.group(0).strip(), None
   
     return None, None
   
   
   def parse_generated_response(response):
     results = []
   
     for match in ANSWER_PART_PATTERN.finditer(response):
       part = match.group(1).strip()
   
       text_match = ANSWER_TEXT_PART_PATTERN.search(part)
       if not text_match:
         raise ValueError("Could not parse generated response")
   
       text = text_match.group(1).strip()
       references = parse_references(part)
       results.append((text, references))
   
     final_response = " ".join([r[0] for r in results])
   
     generated_response_parts = []
     for text, references in results:
       generatedResponsePart = {
         'text': text,
         'references': references
       }
       generated_response_parts.append(generatedResponsePart)
   
     return final_response, generated_response_parts
   
   
   def has_generated_response(raw_response):
     return ANSWER_PART_PATTERN.search(raw_response) is not None
   
   
   def parse_references(answer_part):
     references = []
     for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part):
       reference = match.group(1).strip()
       references.append({'sourceId': reference})
     return references
   
   
   def parse_ask_user(content_by_type):
     try:
       if content_by_type["tool_use"][0]["name"] == ASK_USER:
         ask_user_question = content_by_type["tool_use"][0]["input"]["question"]
         if not ask_user_question:
           raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE)
         return ask_user_question
     except ValueError as ex:
       raise ex
     return None
   
   
   def parse_function_call(content_by_type, parsed_response):
     try:
       content = content_by_type["tool_use"][0]
       tool_name = content["name"]
   
       action_split = tool_name.split('__')
   
       schema_type = 'FUNCTION' if len(action_split) == 2 else 'API'
       if schema_type == 'API':
         verb = action_split[0].strip()
         resource_name = action_split[1].strip()
         function = action_split[2].strip()
       else:
         resource_name = action_split[1].strip()
         function = action_split[2].strip()
   
     except ValueError as ex:
       raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE)
   
     parameters = {}
     for param, value in content["input"].items():
       parameters[param] = {'value': value}
   
     parsed_response['orchestrationParsedResponse']['responseDetails'] = {}
   
     # Function calls can either invoke an action group or a knowledge base.
     # Mapping to the correct variable names accordingly
     if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX):
       parsed_response['orchestrationParsedResponse']['responseDetails'][
         'invocationType'] = 'KNOWLEDGE_BASE'
       parsed_response['orchestrationParsedResponse']['responseDetails'][
         'agentKnowledgeBase'] = {
         'searchQuery': parameters['searchQuery'],
         'knowledgeBaseId': resource_name.replace(
             KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, ''),
         'id': content["id"]
       }
       return parsed_response
     parsed_response['orchestrationParsedResponse']['responseDetails'][
       'invocationType'] = 'ACTION_GROUP'
     if schema_type == 'API':
       parsed_response['orchestrationParsedResponse']['responseDetails'][
         'actionGroupInvocation'] = {
         "verb": verb,
         "actionGroupName": resource_name,
         "apiName": function,
         "actionGroupInput": parameters,
         "id": content["id"]
       }
     else:
       parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = {
         "actionGroupName": resource_name,
         "functionName": function,
         "actionGroupInput": parameters
        }
     return parsed_response
   
   
   def addRepromptResponse(parsed_response, error):
     error_message = str(error)
     logger.warn(error_message)
   
     parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = {
       'repromptResponse': error_message
     }
   ```

------

### 產生知識庫回應
<a name="parser-kb"></a>

下列範例顯示以 Python 撰寫的知識庫回應產生剖析器 Lambda 函數。

```
import json
import re
import logging
 
ANSWER_PART_REGEX = "&lt;answer_part\\s?>(.+?)&lt;/answer_part\\s?>"
ANSWER_TEXT_PART_REGEX = "&lt;text\\s?>(.+?)&lt;/text\\s?>"  
ANSWER_REFERENCE_PART_REGEX = "&lt;source\\s?>(.+?)&lt;/source\\s?>"
ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL)
ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL)
ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL)

logger = logging.getLogger()
 
# This parser lambda is an example of how to parse the LLM output for the default KB response generation prompt
def lambda_handler(event, context):
    logger.info("Lambda input: " + str(event))
    raw_response = event['invokeModelRawResponse']
    
    parsed_response = {
        'promptType': 'KNOWLEDGE_BASE_RESPONSE_GENERATION',
        'knowledgeBaseResponseGenerationParsedResponse': {
            'generatedResponse': parse_generated_response(raw_response)
        }
    }
    
    logger.info(parsed_response)
    return parsed_response
    
def parse_generated_response(sanitized_llm_response):
    results = []
    
    for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response):
        part = match.group(1).strip()
        
        text_match = ANSWER_TEXT_PART_PATTERN.search(part)
        if not text_match:
            raise ValueError("Could not parse generated response")
        
        text = text_match.group(1).strip()        
        references = parse_references(sanitized_llm_response, part)
        results.append((text, references))
    
    generated_response_parts = []
    for text, references in results:
        generatedResponsePart = {
            'text': text, 
            'references': references
        }
        generated_response_parts.append(generatedResponsePart)
        
    return {
        'generatedResponseParts': generated_response_parts
    }
    
def parse_references(raw_response, answer_part):
    references = []
    for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part):
        reference = match.group(1).strip()
        references.append({'sourceId': reference})
    return references
```

### 後續處理
<a name="parser-postprocessing"></a>

下列範例顯示以 Python 撰寫的後續處理剖析器 Lambda 函數。

```
import json
import re
import logging
 
FINAL_RESPONSE_REGEX = r"&lt;final_response>([\s\S]*?)&lt;/final_response>"
FINAL_RESPONSE_PATTERN = re.compile(FINAL_RESPONSE_REGEX, re.DOTALL)

logger = logging.getLogger()
 
# This parser lambda is an example of how to parse the LLM output for the default PostProcessing prompt
def lambda_handler(event, context):
    logger.info("Lambda input: " + str(event))
    raw_response = event['invokeModelRawResponse']
    
    parsed_response = {
        'promptType': 'POST_PROCESSING',
        'postProcessingParsedResponse': {}
    }
    
    matcher = FINAL_RESPONSE_PATTERN.search(raw_response)
    if not matcher:
        raise Exception("Could not parse raw LLM output")
    response_text = matcher.group(1).strip()
    
    parsed_response['postProcessingParsedResponse']['responseText'] = response_text
    
    logger.info(parsed_response)
    return parsed_response
```

### 記憶體摘要
<a name="parser-memory-summarization"></a>

下列範例顯示以 Python 撰寫的記憶體摘要剖析器 Lambda 函數。

```
import re
import logging

SUMMARY_TAG_PATTERN = r'<summary>(.*?)</summary>'
TOPIC_TAG_PATTERN = r'<topic name="(.+?)"\s*>(.+?)</topic>'
logger = logging.getLogger()

# This parser lambda is an example of how to parse the LLM output for the default LTM SUmmarization prompt
def lambda_handler(event, context):
    logger.info("Lambda input: " + str(event))
    
    # Sanitize LLM response
    model_response = sanitize_response(event['invokeModelRawResponse'])
    
    if event["promptType"] == "MEMORY_SUMMARIZATION":
        return format_response(parse_llm_response(model_response), event["promptType"])

def format_response(topic_summaries, prompt_type):
    return {
        "promptType": prompt_type,
        "memorySummarizationParsedResponse": {
            "topicwiseSummaries": topic_summaries
        }
    }
    
def parse_llm_response(output: str):
    # First extract content within summary tag
    summary_match = re.search(SUMMARY_TAG_PATTERN, output, re.DOTALL)
    if not summary_match:
        raise Exception("Error while parsing summarizer model output, no summary tag found!")
    
    summary_content = summary_match.group(1)
    topic_summaries = parse_topic_wise_summaries(summary_content)
        
    return topic_summaries

def parse_topic_wise_summaries(content):
    summaries = []
    # Then extract content within topic tag
    for match in re.finditer(TOPIC_TAG_PATTERN, content, re.DOTALL):
        topic_name = match.group(1)
        topic_summary = match.group(2).strip()
        summaries.append({
            'topic': topic_name,
            'summary': topic_summary
        })
    if not summaries:
        raise Exception("Error while parsing summarizer model output, no topics found!")
    return summaries

def sanitize_response(text):
    pattern = r"(\\n*)"
    text = re.sub(pattern, r"\n", text)
    return text
```