本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
每個提示範本都包含剖析器 Lambda 函數。您可以撰寫自己的自訂剖析器 Lambda 函數,並指定要覆寫其預設剖析器函數的範本。若要寫入自訂剖析器 Lambda 函數,您必須了解代理程式傳送的輸入事件,以及代理程式預期做為 Lambda 函數輸出的回應。您撰寫處理常式函數來操作輸入事件中的變數,並傳回回應。如需如何 AWS Lambda 運作的詳細資訊,請參閱《 AWS Lambda 開發人員指南》中的事件驅動調用。
剖析器 Lambda 輸入事件
以下是來自代理程式的輸入事件的一般結構。使用這些欄位來撰寫 Lambda 處理常式函數。
{
"messageVersion": "1.0",
"agent": {
"name": "string",
"id": "string",
"alias": "string",
"version": "string"
},
"invokeModelRawResponse": "string",
"promptType": "ORCHESTRATION | ROUTING_CLASSIFIER | POST_PROCESSING | PRE_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION | MEMORY_SUMMARIZATION",
"overrideType": "OUTPUT_PARSER"
}
下列清單說明輸入事件欄位:
-
messageVersion
– 訊息版本,用於識別進入 Lambda 函數的事件資料的格式,以及來自 Lambda 函數的回應的預期格式。Amazon Bedrock Agents 僅支援 1.0 版。 -
agent
— 包含提示所屬代理程式的名稱、ID、別名和版本的相關資訊。 -
invokeModelRawResponse
— 要剖析其輸出之提示的原始基礎模型輸出。 -
promptType
— 要剖析其輸出的提示類型。 -
overrideType
- 此 Lambda 函數覆寫的成品。目前僅OUTPUT_PARSER
支援 ,這表示要覆寫預設剖析器。
剖析器 Lambda 回應
您的客服人員預期 Lambda 函數會回應,並使用回應採取進一步動作或協助傳回回應給使用者。您的代理程式會執行代理程式模型建議的下一個動作。下一個動作可以序列順序或平行執行,具體取決於客服人員的模型以及客服人員的建立和準備時間。
如果您在 2024 年 10 月 4 日之前建立並準備代理程式,且您的代理程式正在使用 AnthropicClaude 3 Sonnet或 AnthropicClaude 3.5 Sonnet模型,則根據預設,代理程式模型建議的下一個首要動作將依序列順序執行。
如果您在 2024 年 10 月 10 日之後建立新的代理程式或準備現有的代理程式Claude 3.5 Sonnet,且您的代理程式正在使用 Anthropic Claude 3 Sonnet、 Anthropic 或任何non-Anthropic模型,則代理程式模型建議的下一個步驟動作將平行執行。這表示多個動作,例如動作群組函數和知識庫的混合,將平行執行。這可減少對模型進行的呼叫次數,進而降低整體延遲。
您可以透過呼叫 PrepareAgent API 或在 主控台的代理程式建置器中選取準備,為在 2024 年 10 月 4 日之前建立和準備的代理程式啟用平行動作。客服人員準備好後,您會看到更新的提示範本和新版本的剖析器 Lambda 結構描述。
剖析器 Lambda 回應範例
以下是代理程式以序列順序執行下一個建議動作,以及代理程式平行執行下一個動作之回應的一般結構範例。使用 Lambda 函數回應欄位來設定輸出傳回的方式。
代理程式以序列順序執行下一個建議動作的回應範例
選取對應於您是否使用OpenAPI結構描述或函數詳細資訊定義動作群組的標籤:
注意
MessageVersion 1.0
表示代理程式正在依序列順序執行下一個最建議的動作。
- OpenAPI schema
-
{ "messageVersion": "1.0", "promptType": "ORCHESTRATION | PRE_PROCESSING | ROUTING_CLASSIFIER | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION", "preProcessingParsedResponse": { "isValidInput": "boolean", "rationale": "string" }, "orchestrationParsedResponse": { "rationale": "string", "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "invocationType": "AGENT_COLLABORATOR | ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER", "agentAskUser": { "responseText": "string", "id": "string" }, "agentCollaboratorInvocation": { "agentCollaboratorName": "string", "input": { "text": "string" } } ... } }, "routingClassifierParsedResponse": { "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "type": "AGENT | LAST_AGENT | UNDECIDED", "agentCollaboratorInvocation": { "agentCollaboratorName": "string", "input": { "text": "string" } } } } } "actionGroupInvocation": { "actionGroupName": "string", "apiName": "string", "id": "string", "verb": "string", "actionGroupInput": { "
<parameter>
": { "value": "string" }, ... } }, "agentKnowledgeBase": { "knowledgeBaseId": "string", "id": "string", "searchQuery": { "value": "string" } }, "agentFinalResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{"sourceId": "string"}] }] } }, } }, "knowledgeBaseResponseGenerationParsedResponse": { "generatedResponse": { "generatedResponseParts": [ { "text": "string", "references": [ {"sourceId": "string"}, ... ] } ] } }, "postProcessingParsedResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{ "sourceId": "string" }] }] } } } - Function details
-
{ "messageVersion": "1.0", "promptType": "ORCHESTRATION | PRE_PROCESSING | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION", "preProcessingParsedResponse": { "isValidInput": "boolean", "rationale": "string" }, "orchestrationParsedResponse": { "rationale": "string", "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER", "agentAskUser": { "responseText": "string", "id": "string" }, "actionGroupInvocation": { "actionGroupName": "string", "functionName": "string", "id": "string", "actionGroupInput": { "
<parameter>
": { "value": "string" }, ... } }, "agentKnowledgeBase": { "knowledgeBaseId": "string", "id": "string", "searchQuery": { "value": "string" } }, "agentFinalResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{"sourceId": "string"}] }] } }, } }, "knowledgeBaseResponseGenerationParsedResponse": { "generatedResponse": { "generatedResponseParts": [ { "text": "string", "references": [ {"sourceId": "string"}, ... ] } ] } }, "postProcessingParsedResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{ "sourceId": "string" }] }] } } }
{
"messageVersion": "1.0",
"promptType": "ORCHESTRATION | PRE_PROCESSING | ROUTING_CLASSIFIER | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION",
"preProcessingParsedResponse": {
"isValidInput": "boolean",
"rationale": "string"
},
"orchestrationParsedResponse": {
"rationale": "string",
"parsingErrorDetails": {
"repromptResponse": "string"
},
"responseDetails": {
"invocationType": "AGENT_COLLABORATOR | ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER",
"agentAskUser": {
"responseText": "string",
"id": "string"
},
"agentCollaboratorInvocation": {
"agentCollaboratorName": "string",
"input": {
"text": "string"
}
}
...
}
},
"routingClassifierParsedResponse": {
"parsingErrorDetails": {
"repromptResponse": "string"
},
"responseDetails": {
"type": "AGENT | LAST_AGENT | UNDECIDED",
"agentCollaboratorInvocation": {
"agentCollaboratorName": "string",
"input": {
"text": "string"
}
}
}
}
}
"actionGroupInvocation": {
"actionGroupName": "string",
"apiName": "string",
"id": "string",
"verb": "string",
"actionGroupInput": {
"<parameter>
": {
"value": "string"
},
...
}
},
"agentKnowledgeBase": {
"knowledgeBaseId": "string",
"id": "string",
"searchQuery": {
"value": "string"
}
},
"agentFinalResponse": {
"responseText": "string",
"citations": {
"generatedResponseParts": [{
"text": "string",
"references": [{"sourceId": "string"}]
}]
}
},
}
},
"knowledgeBaseResponseGenerationParsedResponse": {
"generatedResponse": {
"generatedResponseParts": [
{
"text": "string",
"references": [
{"sourceId": "string"},
...
]
}
]
}
},
"postProcessingParsedResponse": {
"responseText": "string",
"citations": {
"generatedResponseParts": [{
"text": "string",
"references": [{
"sourceId": "string"
}]
}]
}
}
}
同時執行下一個動作之代理程式的範例回應
選取對應於您是否使用OpenAPI結構描述或函數詳細資訊定義動作群組的標籤:
注意
MessageVersion 2.0
表示代理程式正在平行執行下一個建議的動作
- OpenAPI schema
-
{ "messageVersion": "2.0", "promptType": "ORCHESTRATION | PRE_PROCESSING | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION", "preProcessingParsedResponse": { "isValidInput": "boolean", "rationale": "string" }, "orchestrationParsedResponse": { "rationale": "string", "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER", "agentAskUser": { "responseText": "string" }, "actionGroupInvocations": [ { "actionGroupName": "string", "apiName": "string", "verb": "string", "actionGroupInput": { "
<parameter>
": { "value": "string" }, ... } } ], "agentKnowledgeBases": [ { "knowledgeBaseId": "string", "searchQuery": { "value": "string" } } ], "agentFinalResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{"sourceId": "string"}] }] } }, } }, "knowledgeBaseResponseGenerationParsedResponse": { "generatedResponse": { "generatedResponseParts": [ { "text": "string", "references": [ {"sourceId": "string"}, ... ] } ] } }, "postProcessingParsedResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{ "sourceId": "string" }] }] } } } - Function details
-
{ "messageVersion": "2.0", "promptType": "ORCHESTRATION | PRE_PROCESSING | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION", "preProcessingParsedResponse": { "isValidInput": "boolean", "rationale": "string" }, "orchestrationParsedResponse": { "rationale": "string", "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER", "agentAskUser": { "responseText": "string" }, "actionGroupInvocations": [ { "actionGroupName": "string", "functionName": "string", "actionGroupInput": { "
<parameter>
"": { "value": "string" }, ... } } ], "agentKnowledgeBases": [ { "knowledgeBaseId": "string", "searchQuery": { "value": "string" } } ], "agentFinalResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{"sourceId": "string"}] }] } }, } }, "knowledgeBaseResponseGenerationParsedResponse": { "generatedResponse": { "generatedResponseParts": [ { "text": "string", "references": [ {"sourceId": "string"}, ... ] } ] } }, "postProcessingParsedResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{ "sourceId": "string" }] }] } } }
{
"messageVersion": "2.0",
"promptType": "ORCHESTRATION | PRE_PROCESSING | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION",
"preProcessingParsedResponse": {
"isValidInput": "boolean",
"rationale": "string"
},
"orchestrationParsedResponse": {
"rationale": "string",
"parsingErrorDetails": {
"repromptResponse": "string"
},
"responseDetails": {
"invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER",
"agentAskUser": {
"responseText": "string"
},
"actionGroupInvocations": [
{
"actionGroupName": "string",
"apiName": "string",
"verb": "string",
"actionGroupInput": {
"<parameter>
": {
"value": "string"
},
...
}
}
],
"agentKnowledgeBases": [
{
"knowledgeBaseId": "string",
"searchQuery": {
"value": "string"
}
}
],
"agentFinalResponse": {
"responseText": "string",
"citations": {
"generatedResponseParts": [{
"text": "string",
"references": [{"sourceId": "string"}]
}]
}
},
}
},
"knowledgeBaseResponseGenerationParsedResponse": {
"generatedResponse": {
"generatedResponseParts": [
{
"text": "string",
"references": [
{"sourceId": "string"},
...
]
}
]
}
},
"postProcessingParsedResponse": {
"responseText": "string",
"citations": {
"generatedResponseParts": [{
"text": "string",
"references": [{
"sourceId": "string"
}]
}]
}
}
}
下列清單說明 Lambda 回應欄位:
-
messageVersion
– 訊息版本,用於識別進入 Lambda 函數的事件資料的格式,以及來自 Lambda 函數的回應的預期格式。 -
promptType
— 目前輪次的提示類型。 -
preProcessingParsedResponse
—PRE_PROCESSING
提示類型的已剖析回應。 -
orchestrationParsedResponse
—ORCHESTRATION
提示類型的已剖析回應。如需詳細資訊,請參閱下述。 -
knowledgeBaseResponseGenerationParsedResponse
—KNOWLEDGE_BASE_RESPONSE_GENERATION
提示類型的已剖析回應。 -
postProcessingParsedResponse
—POST_PROCESSING
提示類型的已剖析回應。
如需四個提示範本剖析回應的詳細資訊,請參閱下列標籤。
- preProcessingParsedResponse
-
{ "isValidInput": "boolean", "rationale": "string" }
preProcessingParsedResponse
包含下列欄位。-
isValidInput
— 指定使用者輸入是否有效。您可以定義函數來決定如何表徵使用者輸入的有效性。 -
rationale
— 使用者輸入分類的推論。此原理由模型在原始回應中提供,Lambda 函數會剖析它,而代理程式會在追蹤中呈現,以進行預先處理。
-
- orchestrationResponse
-
的格式
orchestrationResponse
取決於您是否使用OpenAPI結構描述或函數詳細資訊定義動作群組:-
如果您使用OpenAPI結構描述定義動作群組,回應的格式必須如下:
{ "rationale": "string", "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER", "agentAskUser": { "responseText": "string", "id": "string" }, "actionGroupInvocation": { "actionGroupName": "string", "apiName": "string", "id": "string", "verb": "string", "actionGroupInput": { "
<parameter>
": { "value": "string" }, ... } }, "agentKnowledgeBase": { "knowledgeBaseId": "string", "id": "string", "searchQuery": { "value": "string" } }, "agentFinalResponse": { "responseText": "string", "citations": { "generatedResponseParts": [ { "text": "string", "references": [ {"sourceId": "string"}, ... ] }, ... ] } }, } } -
如果您使用函數詳細資訊定義動作群組,回應的格式必須如下:
{ "rationale": "string", "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER", "agentAskUser": { "responseText": "string", "id": "string" }, "actionGroupInvocation": { "actionGroupName": "string", "functionName": "string", "id": "string", "actionGroupInput": { "
<parameter>
": { "value": "string" }, ... } }, "agentKnowledgeBase": { "knowledgeBaseId": "string", "id": "string", "searchQuery": { "value": "string" } }, "agentFinalResponse": { "responseText": "string", "citations": { "generatedResponseParts": [ { "text": "string", "references": [ {"sourceId": "string"}, ... ] }, ... ] } }, } }
orchestrationParsedResponse
包含下列欄位:-
rationale
— 依照基礎模型輸出來推論下一步做什麼。您可以定義要從模型輸出剖析的函數。 -
parsingErrorDetails
— 包含repromptResponse
,這是在無法剖析模型回應時,重新提示模型以更新其原始回應的訊息。您可以定義函數來操控如何重新提示模型。 -
responseDetails
— 包含如何處理基礎模型輸出的詳細資訊。包含invocationType
,這是代理程式要採用的下一個步驟,以及第二個應符合invocationType
的欄位。以下是可能的物件。-
agentAskUser
- 與ASK_USER
調用類型相容。此調用類型會結束協同運作步驟。包含responseText
以要求使用者提供更多資訊。您可以定義函數來操控此欄位。 -
actionGroupInvocation
- 與ACTION_GROUP
調用類型相容。您可以定義 Lambda 函數,以決定要叫用的動作群組和要傳遞的參數。包含下列欄位:-
actionGroupName
— 要調用的動作群組。 -
如果您使用OpenAPI結構描述定義動作群組,則以下欄位為必要欄位:
-
apiName
– 在動作群組中呼叫的 API 操作名稱。 -
verb
– 要使用的 API 操作方法。
-
-
如果您使用函數詳細資訊定義動作群組,則以下欄位為必要欄位:
-
functionName
– 在動作群組中呼叫的函數名稱。
-
-
actionGroupInput
– 包含要在 API 操作請求中指定的參數。
-
-
agentKnowledgeBase
- 與KNOWLEDGE_BASE
調用類型相容。您可以定義函數來確定如何查詢知識庫。包含下列欄位:-
knowledgeBaseId
- 知識庫的唯一識別碼。 -
searchQuery
– 包含要傳送至value
欄位中知識庫的查詢。
-
-
agentFinalResponse
- 與FINISH
調用類型相容。此調用類型會結束協同運作步驟。包含對responseText
欄位中使用者的回應,以及citations
物件中回應的引用文獻。
-
-
- knowledgeBaseResponseGenerationParsedResponse
-
{ "generatedResponse": { "generatedResponseParts": [ { "text": "string", "references": [ { "sourceId": "string" }, ... ] }, ... ] } }
knowledgeBaseResponseGenerationParsedResponse
包含generatedResponse
查詢知識庫和資料來源參考的 。 - postProcessingParsedResponse
-
{ "responseText": "string", "citations": { "generatedResponseParts": [ { "text": "string", "references": [ { "sourceId": "string" }, ... ] }, ... ] } }
postProcessingParsedResponse
包含下列欄位:-
responseText
— 返回至終端使用者的回應。您可以定義函數來格式化回應。 -
citations
— 包含回應的引用文獻清單。每個引文都會顯示引用的文字及其參考。
-
{
"isValidInput": "boolean",
"rationale": "string"
}
preProcessingParsedResponse
包含下列欄位。
-
isValidInput
— 指定使用者輸入是否有效。您可以定義函數來決定如何表徵使用者輸入的有效性。 -
rationale
— 使用者輸入分類的推論。此原理由模型在原始回應中提供,Lambda 函數會剖析它,而代理程式會在追蹤中呈現,以進行預先處理。
剖析器 Lambda 範例
若要查看剖析器 Lambda 函數輸入事件和回應範例,請從下列索引標籤中選取 。
- Pre-processing
-
輸入事件範例
{ "agent": { "alias": "TSTALIASID", "id": "AGENTID123", "name": "InsuranceAgent", "version": "DRAFT" }, "invokeModelRawResponse": " <thinking>\nThe user is asking about the instructions provided to the function calling agent. This input is trying to gather information about what functions/API's or instructions our function calling agent has access to. Based on the categories provided, this input belongs in Category B.\n</thinking>\n\n<category>B</category>", "messageVersion": "1.0", "overrideType": "OUTPUT_PARSER", "promptType": "PRE_PROCESSING" }
回應範例
{ "promptType": "PRE_PROCESSING", "preProcessingParsedResponse": { "rationale": "\nThe user is asking about the instructions provided to the function calling agent. This input is trying to gather information about what functions/API's or instructions our function calling agent has access to. Based on the categories provided, this input belongs in Category B.\n", "isValidInput": false } }
- Orchestration
-
輸入事件範例
{ "agent": { "alias": "TSTALIASID", "id": "AGENTID123", "name": "InsuranceAgent", "version": "DRAFT" }, "invokeModelRawResponse": "To answer this question, I will:\\n\\n1. Call the GET::x_amz_knowledgebase_KBID123456::Search function to search for a phone number to call.\\n\\nI have checked that I have access to the GET::x_amz_knowledgebase_KBID23456::Search function.\\n\\n</scratchpad>\\n\\n<function_call>GET::x_amz_knowledgebase_KBID123456::Search(searchQuery=\"What is the phone number I can call?\)", "messageVersion": "1.0", "overrideType": "OUTPUT_PARSER", "promptType": "ORCHESTRATION" }
回應範例
{ "promptType": "ORCHESTRATION", "orchestrationParsedResponse": { "rationale": "To answer this question, I will:\\n\\n1. Call the GET::x_amz_knowledgebase_KBID123456::Search function to search for a phone number to call Farmers.\\n\\nI have checked that I have access to the GET::x_amz_knowledgebase_KBID123456::Search function.", "responseDetails": { "invocationType": "KNOWLEDGE_BASE", "agentKnowledgeBase": { "searchQuery": { "value": "What is the phone number I can call?" }, "knowledgeBaseId": "KBID123456" } } } }
- Knowledge base response generation
-
輸入事件範例
{ "agent": { "alias": "TSTALIASID", "id": "AGENTID123", "name": "InsuranceAgent", "version": "DRAFT" }, "invokeModelRawResponse": "{\"completion\":\" <answer>\\\\n<answer_part>\\\\n<text>\\\\nThe search results contain information about different types of insurance benefits, including personal injury protection (PIP), medical payments coverage, and lost wages coverage. PIP typically covers reasonable medical expenses for injuries caused by an accident, as well as income continuation, child care, loss of services, and funerals. Medical payments coverage provides payment for medical treatment resulting from a car accident. Who pays lost wages due to injuries depends on the laws in your state and the coverage purchased.\\\\n</text>\\\\n<sources>\\\\n<source>1234567-1234-1234-1234-123456789abc</source>\\\\n<source>2345678-2345-2345-2345-23456789abcd</source>\\\\n<source>3456789-3456-3456-3456-3456789abcde</source>\\\\n</sources>\\\\n</answer_part>\\\\n</answer>\",\"stop_reason\":\"stop_sequence\",\"stop\":\"\\\\n\\\\nHuman:\"}", "messageVersion": "1.0", "overrideType": "OUTPUT_PARSER", "promptType": "KNOWLEDGE_BASE_RESPONSE_GENERATION" }
回應範例
{ "promptType": "KNOWLEDGE_BASE_RESPONSE_GENERATION", "knowledgeBaseResponseGenerationParsedResponse": { "generatedResponse": { "generatedResponseParts": [ { "text": "\\\\nThe search results contain information about different types of insurance benefits, including personal injury protection (PIP), medical payments coverage, and lost wages coverage. PIP typically covers reasonable medical expenses for injuries caused by an accident, as well as income continuation, child care, loss of services, and funerals. Medical payments coverage provides payment for medical treatment resulting from a car accident. Who pays lost wages due to injuries depends on the laws in your state and the coverage purchased.\\\\n", "references": [ {"sourceId": "1234567-1234-1234-1234-123456789abc"}, {"sourceId": "2345678-2345-2345-2345-23456789abcd"}, {"sourceId": "3456789-3456-3456-3456-3456789abcde"} ] } ] } } }
- Post-processing
-
輸入事件範例
{ "agent": { "alias": "TSTALIASID", "id": "AGENTID123", "name": "InsuranceAgent", "version": "DRAFT" }, "invokeModelRawResponse": "<final_response>\\nBased on your request, I searched our insurance benefit information database for details. The search results indicate that insurance policies may cover different types of benefits, depending on the policy and state laws. Specifically, the results discussed personal injury protection (PIP) coverage, which typically covers medical expenses for insured individuals injured in an accident (cited sources: 1234567-1234-1234-1234-123456789abc, 2345678-2345-2345-2345-23456789abcd). PIP may pay for costs like medical care, lost income replacement, childcare expenses, and funeral costs. Medical payments coverage was also mentioned as another option that similarly covers medical treatment costs for the policyholder and others injured in a vehicle accident involving the insured vehicle. The search results further noted that whether lost wages are covered depends on the state and coverage purchased. Please let me know if you need any clarification or have additional questions.\\n</final_response>", "messageVersion": "1.0", "overrideType": "OUTPUT_PARSER", "promptType": "POST_PROCESSING" }
回應範例
{ "promptType": "POST_PROCESSING", "postProcessingParsedResponse": { "responseText": "Based on your request, I searched our insurance benefit information database for details. The search results indicate that insurance policies may cover different types of benefits, depending on the policy and state laws. Specifically, the results discussed personal injury protection (PIP) coverage, which typically covers medical expenses for insured individuals injured in an accident (cited sources: 24c62d8c-3e39-4ca1-9470-a91d641fe050, 197815ef-8798-4cb1-8aa5-35f5d6b28365). PIP may pay for costs like medical care, lost income replacement, childcare expenses, and funeral costs. Medical payments coverage was also mentioned as another option that similarly covers medical treatment costs for the policyholder and others injured in a vehicle accident involving the insured vehicle. The search results further noted that whether lost wages are covered depends on the state and coverage purchased. Please let me know if you need any clarification or have additional questions." } }
- Memory summarization
-
輸入事件範例
{ "messageVersion": "1.0", "promptType": "MEMORY_SUMMARIZATION", "invokeModelRawResponse": "<summary> <topic name="user goals">User initiated the conversation with a greeting.</topic> </summary>" }
回應範例
{"topicwiseSummaries": [ { "topic": "TopicName1", "summary": "My Topic 1 Summary" } ... ] }
輸入事件範例
{
"agent": {
"alias": "TSTALIASID",
"id": "AGENTID123",
"name": "InsuranceAgent",
"version": "DRAFT"
},
"invokeModelRawResponse": " <thinking>\nThe user is asking about the instructions provided to the function calling agent. This input is trying to gather information about what functions/API's or instructions our function calling agent has access to. Based on the categories provided, this input belongs in Category B.\n</thinking>\n\n<category>B</category>",
"messageVersion": "1.0",
"overrideType": "OUTPUT_PARSER",
"promptType": "PRE_PROCESSING"
}
回應範例
{
"promptType": "PRE_PROCESSING",
"preProcessingParsedResponse": {
"rationale": "\nThe user is asking about the instructions provided to the function calling agent. This input is trying to gather information about what functions/API's or instructions our function calling agent has access to. Based on the categories provided, this input belongs in Category B.\n",
"isValidInput": false
}
}
若要查看範例剖析器 Lambda 函數,請展開您要查看之提示範本範例的 區段。該 lambda_handler
函數將剖析的代理程式的回應傳回。
下列範例顯示以 寫入的預先處理剖析器 Lambda 函數Python。
import json
import re
import logging
PRE_PROCESSING_RATIONALE_REGEX = "<thinking>(.*?)</thinking>"
PREPROCESSING_CATEGORY_REGEX = "<category>(.*?)</category>"
PREPROCESSING_PROMPT_TYPE = "PRE_PROCESSING"
PRE_PROCESSING_RATIONALE_PATTERN = re.compile(PRE_PROCESSING_RATIONALE_REGEX, re.DOTALL)
PREPROCESSING_CATEGORY_PATTERN = re.compile(PREPROCESSING_CATEGORY_REGEX, re.DOTALL)
logger = logging.getLogger()
# This parser lambda is an example of how to parse the LLM output for the default PreProcessing prompt
def lambda_handler(event, context):
print("Lambda input: " + str(event))
logger.info("Lambda input: " + str(event))
prompt_type = event["promptType"]
# Sanitize LLM response
model_response = sanitize_response(event['invokeModelRawResponse'])
if event["promptType"] == PREPROCESSING_PROMPT_TYPE:
return parse_pre_processing(model_response)
def parse_pre_processing(model_response):
category_matches = re.finditer(PREPROCESSING_CATEGORY_PATTERN, model_response)
rationale_matches = re.finditer(PRE_PROCESSING_RATIONALE_PATTERN, model_response)
category = next((match.group(1) for match in category_matches), None)
rationale = next((match.group(1) for match in rationale_matches), None)
return {
"promptType": "PRE_PROCESSING",
"preProcessingParsedResponse": {
"rationale": rationale,
"isValidInput": get_is_valid_input(category)
}
}
def sanitize_response(text):
pattern = r"(\\n*)"
text = re.sub(pattern, r"\n", text)
return text
def get_is_valid_input(category):
if category is not None and category.strip().upper() == "D" or category.strip().upper() == "E":
return True
return False
預先處理
下列範例顯示以 寫入的預先處理剖析器 Lambda 函數Python。
import json
import re
import logging
PRE_PROCESSING_RATIONALE_REGEX = "<thinking>(.*?)</thinking>"
PREPROCESSING_CATEGORY_REGEX = "<category>(.*?)</category>"
PREPROCESSING_PROMPT_TYPE = "PRE_PROCESSING"
PRE_PROCESSING_RATIONALE_PATTERN = re.compile(PRE_PROCESSING_RATIONALE_REGEX, re.DOTALL)
PREPROCESSING_CATEGORY_PATTERN = re.compile(PREPROCESSING_CATEGORY_REGEX, re.DOTALL)
logger = logging.getLogger()
# This parser lambda is an example of how to parse the LLM output for the default PreProcessing prompt
def lambda_handler(event, context):
print("Lambda input: " + str(event))
logger.info("Lambda input: " + str(event))
prompt_type = event["promptType"]
# Sanitize LLM response
model_response = sanitize_response(event['invokeModelRawResponse'])
if event["promptType"] == PREPROCESSING_PROMPT_TYPE:
return parse_pre_processing(model_response)
def parse_pre_processing(model_response):
category_matches = re.finditer(PREPROCESSING_CATEGORY_PATTERN, model_response)
rationale_matches = re.finditer(PRE_PROCESSING_RATIONALE_PATTERN, model_response)
category = next((match.group(1) for match in category_matches), None)
rationale = next((match.group(1) for match in rationale_matches), None)
return {
"promptType": "PRE_PROCESSING",
"preProcessingParsedResponse": {
"rationale": rationale,
"isValidInput": get_is_valid_input(category)
}
}
def sanitize_response(text):
pattern = r"(\\n*)"
text = re.sub(pattern, r"\n", text)
return text
def get_is_valid_input(category):
if category is not None and category.strip().upper() == "D" or category.strip().upper() == "E":
return True
return False
下列範例顯示以 寫入的協調剖析器 Lambda 函數Python。
範例程式碼會因您的動作群組是使用OpenAPI結構描述或函數詳細資訊來定義而有所不同:
-
若要查看使用OpenAPI結構描述定義的動作群組範例,請選取對應至您想要查看範例之模型的標籤。
- Anthropic Claude 2.0
-
import json import re import logging RATIONALE_REGEX_LIST = [ "(.*?)(<function_call>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_call>" ASK_USER_FUNCTION_CALL_REGEX = r"(<function_call>user::askuser)(.*)\)" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_FUNCTION_PARAMETER_REGEX = r"(?<=askuser=\")(.*?)\"" ASK_USER_FUNCTION_PARAMETER_PATTERN = re.compile(ASK_USER_FUNCTION_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"<function_call>(\w+)::(\w+)::(.+)\((.+)\)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the argument askuser for user::askuser function call. Please try again with the correct argument added" ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <function_call>user::askuser(askuser=\"$ASK_USER_INPUT\")</function_call>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = 'The function call format is incorrect. The format for function calls must be: <function_call>$FUNCTION_NAME($FUNCTION_ARGUMENT_NAME=""$FUNCTION_ARGUMENT_NAME"")</function_call>.' logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next((pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next((pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: ask_user = ask_user_matcher.group(2).strip() ask_user_question_matcher = ASK_USER_FUNCTION_PARAMETER_PATTERN.search(ask_user) if ask_user_question_matcher: return ask_user_question_matcher.group(1).strip() raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) verb, resource_name, function = match.group(1), match.group(2), match.group(3) parameters = {} for arg in match.group(4).split(","): key, value = arg.split("=") parameters[key.strip()] = {'value': value.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 2.1
-
import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 3
-
import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<thinking>(.*?)(</thinking>)", "(.*?)(</thinking>)", "(<thinking>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 3.5
-
import json import logging import re from collections import defaultdict RATIONALE_VALUE_REGEX_LIST = [ "<thinking>(.*?)(</thinking>)", "(.*?)(</thinking>)", "(<thinking>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" ASK_USER = "user__askuser" KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user__askuser function call. Please try again with the correct argument added." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The tool name format is incorrect. The format for the tool name must be: 'httpVerb__actionGroupName__apiName." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.setLevel("INFO") logger.info("Lambda input: " + str(event)) # Sanitize LLM response response = load_response(event['invokeModelRawResponse']) stop_reason = response["stop_reason"] content = response["content"] content_by_type = get_content_by_type(content) # Parse LLM response for any rationale rationale = parse_rationale(content_by_type) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } match stop_reason: case 'tool_use': # Check if there is an ask user try: ask_user = parse_ask_user(content_by_type) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user, 'id': content_by_type['tool_use'][0]['id'] }, } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(content_by_type, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response case 'end_turn' | 'stop_sequence': # Check if there is a final answer try: if content_by_type["text"]: text_contents = content_by_type["text"] for text_content in text_contents: final_answer, generated_response_parts = parse_answer(text_content) if final_answer: parsed_response['orchestrationParsedResponse'][ 'responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails'][ 'agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response case _: addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response def load_response(text): raw_text = r'{}'.format(text) json_text = json.loads(raw_text) return json_text def get_content_by_type(content): content_by_type = defaultdict(list) for content_value in content: content_by_type[content_value["type"]].append(content_value) return content_by_type def parse_rationale(content_by_type): if "text" in content_by_type: rationale = content_by_type["text"][0]["text"] if rationale is not None: rationale_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() return rationale return None def parse_answer(response): if has_generated_response(response["text"].strip()): return parse_generated_response(response) answer_match = ANSWER_PATTERN.search(response["text"].strip()) if answer_match: return answer_match.group(0).strip(), None return None, None def parse_generated_response(response): results = [] for match in ANSWER_PART_PATTERN.finditer(response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(content_by_type): try: if content_by_type["tool_use"][0]["name"] == ASK_USER: ask_user_question = content_by_type["tool_use"][0]["input"]["question"] if not ask_user_question: raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) return ask_user_question except ValueError as ex: raise ex return None def parse_function_call(content_by_type, parsed_response): try: content = content_by_type["tool_use"][0] tool_name = content["name"] action_split = tool_name.split('__') verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() except ValueError as ex: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) parameters = {} for param, value in content["input"].items(): parameters[param] = {'value': value} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails'][ 'invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails'][ 'agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace( KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, ''), 'id': content["id"] } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails'][ 'invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails'][ 'actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters, "id": content["id"] } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
-
若要查看以 函數詳細資訊定義的動作群組範例,請選取對應至您想要查看其範例之模型的標籤。
- Anthropic Claude 2.0
-
import json import re import logging RATIONALE_REGEX_LIST = [ "(.*?)(<function_call>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_call>" ASK_USER_FUNCTION_CALL_REGEX = r"(<function_call>user::askuser)(.*)\)" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_FUNCTION_PARAMETER_REGEX = r"(?<=askuser=\")(.*?)\"" ASK_USER_FUNCTION_PARAMETER_PATTERN = re.compile(ASK_USER_FUNCTION_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX_API_SCHEMA = r"<function_call>(\w+)::(\w+)::(.+)\((.+)\)" FUNCTION_CALL_REGEX_FUNCTION_SCHEMA = r"<function_call>(\w+)::(.+)\((.+)\)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the argument askuser for user::askuser function call. Please try again with the correct argument added" ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <function_call>user::askuser(askuser=\"$ASK_USER_INPUT\")</function_call>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = 'The function call format is incorrect. The format for function calls must be: <function_call>$FUNCTION_NAME($FUNCTION_ARGUMENT_NAME=""$FUNCTION_ARGUMENT_NAME"")</function_call>.' logger = logging.getLogger() logger.setLevel("INFO") # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next((pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next((pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: ask_user = ask_user_matcher.group(2).strip() ask_user_question_matcher = ASK_USER_FUNCTION_PARAMETER_PATTERN.search(ask_user) if ask_user_question_matcher: return ask_user_question_matcher.group(1).strip() raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX_API_SCHEMA, sanitized_response) match_function_schema = re.search(FUNCTION_CALL_REGEX_FUNCTION_SCHEMA, sanitized_response) if not match and not match_function_schema: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) if match: schema_type = 'API' verb, resource_name, function, param_arg = match.group(1), match.group(2), match.group(3), match.group(4) else: schema_type = 'FUNCTION' resource_name, function, param_arg = match_function_schema.group(1), match_function_schema.group(2), match_function_schema.group(3) parameters = {} for arg in param_arg.split(","): key, value = arg.split("=") parameters[key.strip()] = {'value': value.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 2.1
-
import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() logger.setLevel("INFO") # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') schema_type = 'FUNCTION' if len(action_split) == 2 else 'API' if schema_type == 'API': verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() else: resource_name = action_split[0].strip() function = action_split[1].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 3
-
import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<thinking>(.*?)(</thinking>)", "(.*?)(</thinking>)", "(<thinking>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') schema_type = 'FUNCTION' if len(action_split) == 2 else 'API' if schema_type == 'API': verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() else: resource_name = action_split[0].strip() function = action_split[1].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 3.5
-
import json import logging import re from collections import defaultdict RATIONALE_VALUE_REGEX_LIST = [ "<thinking>(.*?)(</thinking>)", "(.*?)(</thinking>)", "(<thinking>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" ASK_USER = "user__askuser" KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user__askuser function call. Please try again with the correct argument added." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The tool name format is incorrect. The format for the tool name must be: 'httpVerb__actionGroupName__apiName." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.setLevel("INFO") logger.info("Lambda input: " + str(event)) # Sanitize LLM response response = load_response(event['invokeModelRawResponse']) stop_reason = response["stop_reason"] content = response["content"] content_by_type = get_content_by_type(content) # Parse LLM response for any rationale rationale = parse_rationale(content_by_type) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } match stop_reason: case 'tool_use': # Check if there is an ask user try: ask_user = parse_ask_user(content_by_type) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user, 'id': content_by_type['tool_use'][0]['id'] }, } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(content_by_type, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response case 'end_turn' | 'stop_sequence': # Check if there is a final answer try: if content_by_type["text"]: text_contents = content_by_type["text"] for text_content in text_contents: final_answer, generated_response_parts = parse_answer(text_content) if final_answer: parsed_response['orchestrationParsedResponse'][ 'responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails'][ 'agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response case _: addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response def load_response(text): raw_text = r'{}'.format(text) json_text = json.loads(raw_text) return json_text def get_content_by_type(content): content_by_type = defaultdict(list) for content_value in content: content_by_type[content_value["type"]].append(content_value) return content_by_type def parse_rationale(content_by_type): if "text" in content_by_type: rationale = content_by_type["text"][0]["text"] if rationale is not None: rationale_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() return rationale return None def parse_answer(response): if has_generated_response(response["text"].strip()): return parse_generated_response(response) answer_match = ANSWER_PATTERN.search(response["text"].strip()) if answer_match: return answer_match.group(0).strip(), None return None, None def parse_generated_response(response): results = [] for match in ANSWER_PART_PATTERN.finditer(response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(content_by_type): try: if content_by_type["tool_use"][0]["name"] == ASK_USER: ask_user_question = content_by_type["tool_use"][0]["input"]["question"] if not ask_user_question: raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) return ask_user_question except ValueError as ex: raise ex return None def parse_function_call(content_by_type, parsed_response): try: content = content_by_type["tool_use"][0] tool_name = content["name"] action_split = tool_name.split('__') schema_type = 'FUNCTION' if len(action_split) == 2 else 'API' if schema_type == 'API': verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() else: resource_name = action_split[1].strip() function = action_split[2].strip() except ValueError as ex: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) parameters = {} for param, value in content["input"].items(): parameters[param] = {'value': value} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails'][ 'invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails'][ 'agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace( KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, ''), 'id': content["id"] } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails'][ 'invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails'][ 'actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters, "id": content["id"] } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
協同運作
下列範例顯示以 寫入的協調剖析器 Lambda 函數Python。
範例程式碼會因您的動作群組是使用OpenAPI結構描述或函數詳細資訊來定義而有所不同:
-
若要查看使用OpenAPI結構描述定義的動作群組範例,請選取對應至您想要查看範例之模型的標籤。
- Anthropic Claude 2.0
-
import json import re import logging RATIONALE_REGEX_LIST = [ "(.*?)(<function_call>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_call>" ASK_USER_FUNCTION_CALL_REGEX = r"(<function_call>user::askuser)(.*)\)" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_FUNCTION_PARAMETER_REGEX = r"(?<=askuser=\")(.*?)\"" ASK_USER_FUNCTION_PARAMETER_PATTERN = re.compile(ASK_USER_FUNCTION_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"<function_call>(\w+)::(\w+)::(.+)\((.+)\)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the argument askuser for user::askuser function call. Please try again with the correct argument added" ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <function_call>user::askuser(askuser=\"$ASK_USER_INPUT\")</function_call>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = 'The function call format is incorrect. The format for function calls must be: <function_call>$FUNCTION_NAME($FUNCTION_ARGUMENT_NAME=""$FUNCTION_ARGUMENT_NAME"")</function_call>.' logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next((pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next((pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: ask_user = ask_user_matcher.group(2).strip() ask_user_question_matcher = ASK_USER_FUNCTION_PARAMETER_PATTERN.search(ask_user) if ask_user_question_matcher: return ask_user_question_matcher.group(1).strip() raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) verb, resource_name, function = match.group(1), match.group(2), match.group(3) parameters = {} for arg in match.group(4).split(","): key, value = arg.split("=") parameters[key.strip()] = {'value': value.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 2.1
-
import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 3
-
import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<thinking>(.*?)(</thinking>)", "(.*?)(</thinking>)", "(<thinking>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 3.5
-
import json import logging import re from collections import defaultdict RATIONALE_VALUE_REGEX_LIST = [ "<thinking>(.*?)(</thinking>)", "(.*?)(</thinking>)", "(<thinking>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" ASK_USER = "user__askuser" KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user__askuser function call. Please try again with the correct argument added." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The tool name format is incorrect. The format for the tool name must be: 'httpVerb__actionGroupName__apiName." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.setLevel("INFO") logger.info("Lambda input: " + str(event)) # Sanitize LLM response response = load_response(event['invokeModelRawResponse']) stop_reason = response["stop_reason"] content = response["content"] content_by_type = get_content_by_type(content) # Parse LLM response for any rationale rationale = parse_rationale(content_by_type) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } match stop_reason: case 'tool_use': # Check if there is an ask user try: ask_user = parse_ask_user(content_by_type) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user, 'id': content_by_type['tool_use'][0]['id'] }, } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(content_by_type, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response case 'end_turn' | 'stop_sequence': # Check if there is a final answer try: if content_by_type["text"]: text_contents = content_by_type["text"] for text_content in text_contents: final_answer, generated_response_parts = parse_answer(text_content) if final_answer: parsed_response['orchestrationParsedResponse'][ 'responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails'][ 'agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response case _: addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response def load_response(text): raw_text = r'{}'.format(text) json_text = json.loads(raw_text) return json_text def get_content_by_type(content): content_by_type = defaultdict(list) for content_value in content: content_by_type[content_value["type"]].append(content_value) return content_by_type def parse_rationale(content_by_type): if "text" in content_by_type: rationale = content_by_type["text"][0]["text"] if rationale is not None: rationale_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() return rationale return None def parse_answer(response): if has_generated_response(response["text"].strip()): return parse_generated_response(response) answer_match = ANSWER_PATTERN.search(response["text"].strip()) if answer_match: return answer_match.group(0).strip(), None return None, None def parse_generated_response(response): results = [] for match in ANSWER_PART_PATTERN.finditer(response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(content_by_type): try: if content_by_type["tool_use"][0]["name"] == ASK_USER: ask_user_question = content_by_type["tool_use"][0]["input"]["question"] if not ask_user_question: raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) return ask_user_question except ValueError as ex: raise ex return None def parse_function_call(content_by_type, parsed_response): try: content = content_by_type["tool_use"][0] tool_name = content["name"] action_split = tool_name.split('__') verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() except ValueError as ex: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) parameters = {} for param, value in content["input"].items(): parameters[param] = {'value': value} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails'][ 'invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails'][ 'agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace( KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, ''), 'id': content["id"] } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails'][ 'invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails'][ 'actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters, "id": content["id"] } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
import json import re import logging RATIONALE_REGEX_LIST = [ "(.*?)(<function_call>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_call>" ASK_USER_FUNCTION_CALL_REGEX = r"(<function_call>user::askuser)(.*)\)" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_FUNCTION_PARAMETER_REGEX = r"(?<=askuser=\")(.*?)\"" ASK_USER_FUNCTION_PARAMETER_PATTERN = re.compile(ASK_USER_FUNCTION_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"<function_call>(\w+)::(\w+)::(.+)\((.+)\)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the argument askuser for user::askuser function call. Please try again with the correct argument added" ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <function_call>user::askuser(askuser=\"$ASK_USER_INPUT\")</function_call>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = 'The function call format is incorrect. The format for function calls must be: <function_call>$FUNCTION_NAME($FUNCTION_ARGUMENT_NAME=""$FUNCTION_ARGUMENT_NAME"")</function_call>.' logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next((pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next((pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: ask_user = ask_user_matcher.group(2).strip() ask_user_question_matcher = ASK_USER_FUNCTION_PARAMETER_PATTERN.search(ask_user) if ask_user_question_matcher: return ask_user_question_matcher.group(1).strip() raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) verb, resource_name, function = match.group(1), match.group(2), match.group(3) parameters = {} for arg in match.group(4).split(","): key, value = arg.split("=") parameters[key.strip()] = {'value': value.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
-
若要查看以 函數詳細資訊定義的動作群組範例,請選取對應至您想要查看其範例之模型的標籤。
- Anthropic Claude 2.0
-
import json import re import logging RATIONALE_REGEX_LIST = [ "(.*?)(<function_call>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_call>" ASK_USER_FUNCTION_CALL_REGEX = r"(<function_call>user::askuser)(.*)\)" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_FUNCTION_PARAMETER_REGEX = r"(?<=askuser=\")(.*?)\"" ASK_USER_FUNCTION_PARAMETER_PATTERN = re.compile(ASK_USER_FUNCTION_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX_API_SCHEMA = r"<function_call>(\w+)::(\w+)::(.+)\((.+)\)" FUNCTION_CALL_REGEX_FUNCTION_SCHEMA = r"<function_call>(\w+)::(.+)\((.+)\)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the argument askuser for user::askuser function call. Please try again with the correct argument added" ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <function_call>user::askuser(askuser=\"$ASK_USER_INPUT\")</function_call>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = 'The function call format is incorrect. The format for function calls must be: <function_call>$FUNCTION_NAME($FUNCTION_ARGUMENT_NAME=""$FUNCTION_ARGUMENT_NAME"")</function_call>.' logger = logging.getLogger() logger.setLevel("INFO") # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next((pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next((pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: ask_user = ask_user_matcher.group(2).strip() ask_user_question_matcher = ASK_USER_FUNCTION_PARAMETER_PATTERN.search(ask_user) if ask_user_question_matcher: return ask_user_question_matcher.group(1).strip() raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX_API_SCHEMA, sanitized_response) match_function_schema = re.search(FUNCTION_CALL_REGEX_FUNCTION_SCHEMA, sanitized_response) if not match and not match_function_schema: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) if match: schema_type = 'API' verb, resource_name, function, param_arg = match.group(1), match.group(2), match.group(3), match.group(4) else: schema_type = 'FUNCTION' resource_name, function, param_arg = match_function_schema.group(1), match_function_schema.group(2), match_function_schema.group(3) parameters = {} for arg in param_arg.split(","): key, value = arg.split("=") parameters[key.strip()] = {'value': value.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 2.1
-
import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() logger.setLevel("INFO") # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') schema_type = 'FUNCTION' if len(action_split) == 2 else 'API' if schema_type == 'API': verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() else: resource_name = action_split[0].strip() function = action_split[1].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 3
-
import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<thinking>(.*?)(</thinking>)", "(.*?)(</thinking>)", "(<thinking>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') schema_type = 'FUNCTION' if len(action_split) == 2 else 'API' if schema_type == 'API': verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() else: resource_name = action_split[0].strip() function = action_split[1].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 3.5
-
import json import logging import re from collections import defaultdict RATIONALE_VALUE_REGEX_LIST = [ "<thinking>(.*?)(</thinking>)", "(.*?)(</thinking>)", "(<thinking>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" ASK_USER = "user__askuser" KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user__askuser function call. Please try again with the correct argument added." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The tool name format is incorrect. The format for the tool name must be: 'httpVerb__actionGroupName__apiName." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.setLevel("INFO") logger.info("Lambda input: " + str(event)) # Sanitize LLM response response = load_response(event['invokeModelRawResponse']) stop_reason = response["stop_reason"] content = response["content"] content_by_type = get_content_by_type(content) # Parse LLM response for any rationale rationale = parse_rationale(content_by_type) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } match stop_reason: case 'tool_use': # Check if there is an ask user try: ask_user = parse_ask_user(content_by_type) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user, 'id': content_by_type['tool_use'][0]['id'] }, } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(content_by_type, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response case 'end_turn' | 'stop_sequence': # Check if there is a final answer try: if content_by_type["text"]: text_contents = content_by_type["text"] for text_content in text_contents: final_answer, generated_response_parts = parse_answer(text_content) if final_answer: parsed_response['orchestrationParsedResponse'][ 'responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails'][ 'agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response case _: addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response def load_response(text): raw_text = r'{}'.format(text) json_text = json.loads(raw_text) return json_text def get_content_by_type(content): content_by_type = defaultdict(list) for content_value in content: content_by_type[content_value["type"]].append(content_value) return content_by_type def parse_rationale(content_by_type): if "text" in content_by_type: rationale = content_by_type["text"][0]["text"] if rationale is not None: rationale_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() return rationale return None def parse_answer(response): if has_generated_response(response["text"].strip()): return parse_generated_response(response) answer_match = ANSWER_PATTERN.search(response["text"].strip()) if answer_match: return answer_match.group(0).strip(), None return None, None def parse_generated_response(response): results = [] for match in ANSWER_PART_PATTERN.finditer(response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(content_by_type): try: if content_by_type["tool_use"][0]["name"] == ASK_USER: ask_user_question = content_by_type["tool_use"][0]["input"]["question"] if not ask_user_question: raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) return ask_user_question except ValueError as ex: raise ex return None def parse_function_call(content_by_type, parsed_response): try: content = content_by_type["tool_use"][0] tool_name = content["name"] action_split = tool_name.split('__') schema_type = 'FUNCTION' if len(action_split) == 2 else 'API' if schema_type == 'API': verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() else: resource_name = action_split[1].strip() function = action_split[2].strip() except ValueError as ex: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) parameters = {} for param, value in content["input"].items(): parameters[param] = {'value': value} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails'][ 'invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails'][ 'agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace( KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, ''), 'id': content["id"] } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails'][ 'invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails'][ 'actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters, "id": content["id"] } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
import json import re import logging RATIONALE_REGEX_LIST = [ "(.*?)(<function_call>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_call>" ASK_USER_FUNCTION_CALL_REGEX = r"(<function_call>user::askuser)(.*)\)" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_FUNCTION_PARAMETER_REGEX = r"(?<=askuser=\")(.*?)\"" ASK_USER_FUNCTION_PARAMETER_PATTERN = re.compile(ASK_USER_FUNCTION_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX_API_SCHEMA = r"<function_call>(\w+)::(\w+)::(.+)\((.+)\)" FUNCTION_CALL_REGEX_FUNCTION_SCHEMA = r"<function_call>(\w+)::(.+)\((.+)\)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the argument askuser for user::askuser function call. Please try again with the correct argument added" ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <function_call>user::askuser(askuser=\"$ASK_USER_INPUT\")</function_call>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = 'The function call format is incorrect. The format for function calls must be: <function_call>$FUNCTION_NAME($FUNCTION_ARGUMENT_NAME=""$FUNCTION_ARGUMENT_NAME"")</function_call>.' logger = logging.getLogger() logger.setLevel("INFO") # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next((pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next((pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: ask_user = ask_user_matcher.group(2).strip() ask_user_question_matcher = ASK_USER_FUNCTION_PARAMETER_PATTERN.search(ask_user) if ask_user_question_matcher: return ask_user_question_matcher.group(1).strip() raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX_API_SCHEMA, sanitized_response) match_function_schema = re.search(FUNCTION_CALL_REGEX_FUNCTION_SCHEMA, sanitized_response) if not match and not match_function_schema: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) if match: schema_type = 'API' verb, resource_name, function, param_arg = match.group(1), match.group(2), match.group(3), match.group(4) else: schema_type = 'FUNCTION' resource_name, function, param_arg = match_function_schema.group(1), match_function_schema.group(2), match_function_schema.group(3) parameters = {} for arg in param_arg.split(","): key, value = arg.split("=") parameters[key.strip()] = {'value': value.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
下列範例顯示寫入 的知識庫回應產生剖析器 Lambda 函數Python。
import json
import re
import logging
ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>"
ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>"
ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>"
ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL)
ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL)
ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL)
logger = logging.getLogger()
# This parser lambda is an example of how to parse the LLM output for the default KB response generation prompt
def lambda_handler(event, context):
logger.info("Lambda input: " + str(event))
raw_response = event['invokeModelRawResponse']
parsed_response = {
'promptType': 'KNOWLEDGE_BASE_RESPONSE_GENERATION',
'knowledgeBaseResponseGenerationParsedResponse': {
'generatedResponse': parse_generated_response(raw_response)
}
}
logger.info(parsed_response)
return parsed_response
def parse_generated_response(sanitized_llm_response):
results = []
for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response):
part = match.group(1).strip()
text_match = ANSWER_TEXT_PART_PATTERN.search(part)
if not text_match:
raise ValueError("Could not parse generated response")
text = text_match.group(1).strip()
references = parse_references(sanitized_llm_response, part)
results.append((text, references))
generated_response_parts = []
for text, references in results:
generatedResponsePart = {
'text': text,
'references': references
}
generated_response_parts.append(generatedResponsePart)
return {
'generatedResponseParts': generated_response_parts
}
def parse_references(raw_response, answer_part):
references = []
for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part):
reference = match.group(1).strip()
references.append({'sourceId': reference})
return references
產生知識庫回應
下列範例顯示寫入 的知識庫回應產生剖析器 Lambda 函數Python。
import json
import re
import logging
ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>"
ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>"
ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>"
ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL)
ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL)
ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL)
logger = logging.getLogger()
# This parser lambda is an example of how to parse the LLM output for the default KB response generation prompt
def lambda_handler(event, context):
logger.info("Lambda input: " + str(event))
raw_response = event['invokeModelRawResponse']
parsed_response = {
'promptType': 'KNOWLEDGE_BASE_RESPONSE_GENERATION',
'knowledgeBaseResponseGenerationParsedResponse': {
'generatedResponse': parse_generated_response(raw_response)
}
}
logger.info(parsed_response)
return parsed_response
def parse_generated_response(sanitized_llm_response):
results = []
for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response):
part = match.group(1).strip()
text_match = ANSWER_TEXT_PART_PATTERN.search(part)
if not text_match:
raise ValueError("Could not parse generated response")
text = text_match.group(1).strip()
references = parse_references(sanitized_llm_response, part)
results.append((text, references))
generated_response_parts = []
for text, references in results:
generatedResponsePart = {
'text': text,
'references': references
}
generated_response_parts.append(generatedResponsePart)
return {
'generatedResponseParts': generated_response_parts
}
def parse_references(raw_response, answer_part):
references = []
for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part):
reference = match.group(1).strip()
references.append({'sourceId': reference})
return references
下列範例顯示以 寫入的後製剖析器 Lambda 函數Python。
import json
import re
import logging
FINAL_RESPONSE_REGEX = r"<final_response>([\s\S]*?)</final_response>"
FINAL_RESPONSE_PATTERN = re.compile(FINAL_RESPONSE_REGEX, re.DOTALL)
logger = logging.getLogger()
# This parser lambda is an example of how to parse the LLM output for the default PostProcessing prompt
def lambda_handler(event, context):
logger.info("Lambda input: " + str(event))
raw_response = event['invokeModelRawResponse']
parsed_response = {
'promptType': 'POST_PROCESSING',
'postProcessingParsedResponse': {}
}
matcher = FINAL_RESPONSE_PATTERN.search(raw_response)
if not matcher:
raise Exception("Could not parse raw LLM output")
response_text = matcher.group(1).strip()
parsed_response['postProcessingParsedResponse']['responseText'] = response_text
logger.info(parsed_response)
return parsed_response
後置處理
下列範例顯示以 寫入的後製剖析器 Lambda 函數Python。
import json
import re
import logging
FINAL_RESPONSE_REGEX = r"<final_response>([\s\S]*?)</final_response>"
FINAL_RESPONSE_PATTERN = re.compile(FINAL_RESPONSE_REGEX, re.DOTALL)
logger = logging.getLogger()
# This parser lambda is an example of how to parse the LLM output for the default PostProcessing prompt
def lambda_handler(event, context):
logger.info("Lambda input: " + str(event))
raw_response = event['invokeModelRawResponse']
parsed_response = {
'promptType': 'POST_PROCESSING',
'postProcessingParsedResponse': {}
}
matcher = FINAL_RESPONSE_PATTERN.search(raw_response)
if not matcher:
raise Exception("Could not parse raw LLM output")
response_text = matcher.group(1).strip()
parsed_response['postProcessingParsedResponse']['responseText'] = response_text
logger.info(parsed_response)
return parsed_response
下列範例顯示寫入 的記憶體摘要剖析器 Lambda 函數Python。
import re
import logging
SUMMARY_TAG_PATTERN = r'<summary>(.*?)</summary>'
TOPIC_TAG_PATTERN = r'<topic name="(.+?)"\s*>(.+?)</topic>'
logger = logging.getLogger()
# This parser lambda is an example of how to parse the LLM output for the default LTM SUmmarization prompt
def lambda_handler(event, context):
logger.info("Lambda input: " + str(event))
# Sanitize LLM response
model_response = sanitize_response(event['invokeModelRawResponse'])
if event["promptType"] == "MEMORY_SUMMARIZATION":
return format_response(parse_llm_response(model_response), event["promptType"])
def format_response(topic_summaries, prompt_type):
return {
"promptType": prompt_type,
"memorySummarizationParsedResponse": {
"topicwiseSummaries": topic_summaries
}
}
def parse_llm_response(output: str):
# First extract content within summary tag
summary_match = re.search(SUMMARY_TAG_PATTERN, output, re.DOTALL)
if not summary_match:
raise Exception("Error while parsing summarizer model output, no summary tag found!")
summary_content = summary_match.group(1)
topic_summaries = parse_topic_wise_summaries(summary_content)
return topic_summaries
def parse_topic_wise_summaries(content):
summaries = []
# Then extract content within topic tag
for match in re.finditer(TOPIC_TAG_PATTERN, content, re.DOTALL):
topic_name = match.group(1)
topic_summary = match.group(2).strip()
summaries.append({
'topic': topic_name,
'summary': topic_summary
})
if not summaries:
raise Exception("Error while parsing summarizer model output, no topics found!")
return summaries
def sanitize_response(text):
pattern = r"(\\n*)"
text = re.sub(pattern, r"\n", text)
return text
記憶體摘要
下列範例顯示寫入 的記憶體摘要剖析器 Lambda 函數Python。
import re
import logging
SUMMARY_TAG_PATTERN = r'<summary>(.*?)</summary>'
TOPIC_TAG_PATTERN = r'<topic name="(.+?)"\s*>(.+?)</topic>'
logger = logging.getLogger()
# This parser lambda is an example of how to parse the LLM output for the default LTM SUmmarization prompt
def lambda_handler(event, context):
logger.info("Lambda input: " + str(event))
# Sanitize LLM response
model_response = sanitize_response(event['invokeModelRawResponse'])
if event["promptType"] == "MEMORY_SUMMARIZATION":
return format_response(parse_llm_response(model_response), event["promptType"])
def format_response(topic_summaries, prompt_type):
return {
"promptType": prompt_type,
"memorySummarizationParsedResponse": {
"topicwiseSummaries": topic_summaries
}
}
def parse_llm_response(output: str):
# First extract content within summary tag
summary_match = re.search(SUMMARY_TAG_PATTERN, output, re.DOTALL)
if not summary_match:
raise Exception("Error while parsing summarizer model output, no summary tag found!")
summary_content = summary_match.group(1)
topic_summaries = parse_topic_wise_summaries(summary_content)
return topic_summaries
def parse_topic_wise_summaries(content):
summaries = []
# Then extract content within topic tag
for match in re.finditer(TOPIC_TAG_PATTERN, content, re.DOTALL):
topic_name = match.group(1)
topic_summary = match.group(2).strip()
summaries.append({
'topic': topic_name,
'summary': topic_summary
})
if not summaries:
raise Exception("Error while parsing summarizer model output, no topics found!")
return summaries
def sanitize_response(text):
pattern = r"(\\n*)"
text = re.sub(pattern, r"\n", text)
return text